|
20 | 20 | import org.apache.doris.catalog.Env;
|
21 | 21 | import org.apache.doris.catalog.Partition;
|
22 | 22 | import org.apache.doris.catalog.Replica;
|
23 |
| -import org.apache.doris.catalog.Replica.ReplicaContext; |
24 | 23 | import org.apache.doris.cloud.system.CloudSystemInfoService;
|
25 | 24 | import org.apache.doris.common.Config;
|
26 | 25 | import org.apache.doris.common.DdlException;
|
@@ -60,11 +59,16 @@ public class CloudReplica extends Replica {
|
60 | 59 | private long indexId = -1;
|
61 | 60 | @SerializedName(value = "idx")
|
62 | 61 | private long idx = -1;
|
| 62 | + // no need serialize |
| 63 | + private long secondaryBe = -1; |
63 | 64 |
|
64 | 65 | private Random rand = new Random();
|
65 | 66 |
|
66 | 67 | private Map<String, List<Long>> memClusterToBackends = new ConcurrentHashMap<String, List<Long>>();
|
67 | 68 |
|
| 69 | + // badBeIP, badTime |
| 70 | + private Map<String, Long> badBeAndBadTime = new ConcurrentHashMap<>(); |
| 71 | + |
68 | 72 | public CloudReplica() {
|
69 | 73 | }
|
70 | 74 |
|
@@ -216,20 +220,44 @@ private long getBackendIdImpl(String cluster) {
|
216 | 220 | long backendId = clusterToBackends.get(clusterId).get(0);
|
217 | 221 | Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
|
218 | 222 | if (be != null && be.isQueryAvailable()) {
|
| 223 | + // be normal |
219 | 224 | if (LOG.isDebugEnabled()) {
|
220 | 225 | LOG.debug("backendId={} ", backendId);
|
221 | 226 | }
|
| 227 | + badBeAndBadTime.remove(be.getHost()); |
| 228 | + secondaryBe = -1; |
222 | 229 | return backendId;
|
223 | 230 | }
|
| 231 | + // be abnormal |
| 232 | + if (Config.enable_immediate_be_assign) { |
| 233 | + // rehash immediate |
| 234 | + return hashReplicaToBe(clusterId, false, true); |
| 235 | + } |
| 236 | + // be abnormal but use secondary |
| 237 | + if (be != null) { |
| 238 | + if (!badBeAndBadTime.containsKey(be.getHost())) { |
| 239 | + badBeAndBadTime.put(be.getHost(), System.currentTimeMillis()); |
| 240 | + } |
| 241 | + if (secondaryBe == -1) { |
| 242 | + secondaryBe = hashReplicaToBe(clusterId, false, false); |
| 243 | + } |
| 244 | + if (System.currentTimeMillis() - badBeAndBadTime.get(be.getHost()) |
| 245 | + > Config.secondary_be_validity_seconds * 1000L) { |
| 246 | + // set secondary be to tablet after secondary_be_validity_seconds |
| 247 | + setBeToTablet(clusterId, secondaryBe); |
| 248 | + secondaryBe = -1; |
| 249 | + } |
| 250 | + return secondaryBe; |
| 251 | + } |
224 | 252 | }
|
225 | 253 | if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) {
|
226 | 254 | LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.clusterToBackends");
|
227 | 255 | return -1;
|
228 | 256 | }
|
229 |
| - return hashReplicaToBe(clusterId, false); |
| 257 | + return hashReplicaToBe(clusterId, false, true); |
230 | 258 | }
|
231 | 259 |
|
232 |
| - public long hashReplicaToBe(String clusterId, boolean isBackGround) { |
| 260 | + public long hashReplicaToBe(String clusterId, boolean isBackGround, boolean setToTablet) { |
233 | 261 | // TODO(luwei) list should be sorted
|
234 | 262 | List<Backend> clusterBes = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
|
235 | 263 | .getBackendsByClusterId(clusterId);
|
@@ -270,12 +298,18 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) {
|
270 | 298 | pickedBeId, getId(), partitionId, availableBes.size(), idx, index,
|
271 | 299 | hashCode == null ? -1 : hashCode.asLong());
|
272 | 300 |
|
| 301 | + if (setToTablet) { |
| 302 | + setBeToTablet(clusterId, pickedBeId); |
| 303 | + } |
| 304 | + |
| 305 | + return pickedBeId; |
| 306 | + } |
| 307 | + |
| 308 | + private void setBeToTablet(String clusterId, long pickedBeId) { |
273 | 309 | // save to clusterToBackends map
|
274 | 310 | List<Long> bes = new ArrayList<Long>();
|
275 | 311 | bes.add(pickedBeId);
|
276 | 312 | clusterToBackends.put(clusterId, bes);
|
277 |
| - |
278 |
| - return pickedBeId; |
279 | 313 | }
|
280 | 314 |
|
281 | 315 | public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) {
|
|
0 commit comments