Skip to content

Commit cd7718d

Browse files
authored
[enhance](cloud) Add a policy when be abnormal, tablet delay switch be (#40371)
1 parent c84426f commit cd7718d

File tree

5 files changed

+71
-38
lines changed

5 files changed

+71
-38
lines changed

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

+3
Original file line numberDiff line numberDiff line change
@@ -3030,6 +3030,9 @@ public static int metaServiceRpcRetryTimes() {
30303030
@ConfField(mutable = true, description = {"存算分离模式下是否开启大事务提交,默认false"})
30313031
public static boolean enable_cloud_txn_lazy_commit = false;
30323032

3033+
@ConfField(mutable = true, description = {"存算分离模式下,当tablet分布的be异常,是否立即映射tablet到新的be上,默认true"})
3034+
public static boolean enable_immediate_be_assign = true;
3035+
30333036
// ATTN: DONOT add any config not related to cloud mode here
30343037
// ATTN: DONOT add any config not related to cloud mode here
30353038
// ATTN: DONOT add any config not related to cloud mode here

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java

+54-25
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.doris.catalog.Env;
2121
import org.apache.doris.catalog.Partition;
2222
import org.apache.doris.catalog.Replica;
23-
import org.apache.doris.catalog.Replica.ReplicaContext;
2423
import org.apache.doris.cloud.system.CloudSystemInfoService;
2524
import org.apache.doris.common.Config;
2625
import org.apache.doris.common.DdlException;
@@ -49,7 +48,7 @@ public class CloudReplica extends Replica {
4948

5049
// In the future, a replica may be mapped to multiple BEs in a cluster,
5150
// so this value is be list
52-
private Map<String, List<Long>> clusterToBackends = new ConcurrentHashMap<String, List<Long>>();
51+
private Map<String, List<Long>> primaryClusterToBackends = new ConcurrentHashMap<String, List<Long>>();
5352
@SerializedName(value = "dbId")
5453
private long dbId = -1;
5554
@SerializedName(value = "tableId")
@@ -65,6 +64,9 @@ public class CloudReplica extends Replica {
6564

6665
private Map<String, List<Long>> memClusterToBackends = new ConcurrentHashMap<String, List<Long>>();
6766

67+
// clusterId, secondaryBe, changeTimestamp
68+
private Map<String, List<Long>> secondaryClusterToBackends = new ConcurrentHashMap<String, List<Long>>();
69+
6870
public CloudReplica() {
6971
}
7072

@@ -186,8 +188,8 @@ private long getBackendIdImpl(String cluster) {
186188
backendId = memClusterToBackends.get(clusterId).get(indexRand);
187189
}
188190

189-
if (!replicaEnough && !allowColdRead && clusterToBackends.containsKey(clusterId)) {
190-
backendId = clusterToBackends.get(clusterId).get(0);
191+
if (!replicaEnough && !allowColdRead && primaryClusterToBackends.containsKey(clusterId)) {
192+
backendId = primaryClusterToBackends.get(clusterId).get(0);
191193
}
192194

193195
if (backendId > 0) {
@@ -212,21 +214,47 @@ private long getBackendIdImpl(String cluster) {
212214
}
213215
}
214216

215-
if (clusterToBackends.containsKey(clusterId)) {
216-
long backendId = clusterToBackends.get(clusterId).get(0);
217-
Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
218-
if (be != null && be.isQueryAvailable()) {
219-
if (LOG.isDebugEnabled()) {
220-
LOG.debug("backendId={} ", backendId);
221-
}
222-
return backendId;
217+
// use primaryClusterToBackends, if find be normal
218+
long pickBeId = getAvaliableBeId(clusterId, primaryClusterToBackends);
219+
if (pickBeId != -1) {
220+
return pickBeId;
221+
}
222+
223+
if (!Config.enable_immediate_be_assign) {
224+
// use secondaryClusterToBackends, if find be normal
225+
pickBeId = getAvaliableBeId(clusterId, secondaryClusterToBackends);
226+
if (pickBeId != -1) {
227+
return pickBeId;
223228
}
224229
}
225-
if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.clusterToBackends")) {
226-
LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.clusterToBackends");
230+
231+
if (DebugPointUtil.isEnable("CloudReplica.getBackendIdImpl.primaryClusterToBackends")) {
232+
LOG.info("Debug Point enable CloudReplica.getBackendIdImpl.primaryClusterToBackends");
233+
return -1;
234+
}
235+
236+
// be abnormal, rehash it. configure settings to different maps
237+
pickBeId = hashReplicaToBe(clusterId, false);
238+
updateClusterToBe(clusterId, pickBeId, Config.enable_immediate_be_assign);
239+
return pickBeId;
240+
}
241+
242+
private long getAvaliableBeId(String clusterId, Map<String, List<Long>> clusterToBackends) {
243+
List<Long> backendIds = clusterToBackends.get(clusterId);
244+
if (backendIds == null || backendIds.isEmpty()) {
227245
return -1;
228246
}
229-
return hashReplicaToBe(clusterId, false);
247+
248+
long backendId = backendIds.get(0);
249+
Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
250+
if (be != null && be.isQueryAvailable()) {
251+
// be normal
252+
if (LOG.isDebugEnabled()) {
253+
LOG.debug("backendId={} ", backendId);
254+
}
255+
return backendId;
256+
}
257+
return -1;
230258
}
231259

232260
public long hashReplicaToBe(String clusterId, boolean isBackGround) {
@@ -270,14 +298,10 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) {
270298
pickedBeId, getId(), partitionId, availableBes.size(), idx, index,
271299
hashCode == null ? -1 : hashCode.asLong());
272300

273-
// save to clusterToBackends map
274-
List<Long> bes = new ArrayList<Long>();
275-
bes.add(pickedBeId);
276-
clusterToBackends.put(clusterId, bes);
277-
278301
return pickedBeId;
279302
}
280303

304+
281305
public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, int replicaNum) {
282306
// TODO(luwei) list should be sorted
283307
List<Backend> clusterBes = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
@@ -375,7 +399,7 @@ public void readFields(DataInput in) throws IOException {
375399
long beId = in.readLong();
376400
List<Long> bes = new ArrayList<Long>();
377401
bes.add(beId);
378-
clusterToBackends.put(clusterId, bes);
402+
primaryClusterToBackends.put(clusterId, bes);
379403
}
380404
}
381405

@@ -399,14 +423,19 @@ public long getIdx() {
399423
return idx;
400424
}
401425

402-
public Map<String, List<Long>> getClusterToBackends() {
403-
return clusterToBackends;
426+
public Map<String, List<Long>> getprimaryClusterToBackends() {
427+
return primaryClusterToBackends;
404428
}
405429

406-
public void updateClusterToBe(String cluster, long beId) {
430+
// save to primaryClusterToBackends or secondaryClusterToBackends map
431+
public void updateClusterToBe(String cluster, long beId, boolean isUpdatePrimary) {
407432
// write lock
408433
List<Long> bes = new ArrayList<Long>();
409434
bes.add(beId);
410-
clusterToBackends.put(cluster, bes);
435+
if (isUpdatePrimary) {
436+
primaryClusterToBackends.put(cluster, bes);
437+
} else {
438+
secondaryClusterToBackends.put(cluster, bes);
439+
}
411440
}
412441
}

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -499,23 +499,24 @@ private void completeRouteInfo() {
499499
List<Long> tabletIds = new ArrayList<Long>();
500500
for (Tablet tablet : index.getTablets()) {
501501
for (Replica replica : tablet.getReplicas()) {
502-
Map<String, List<Long>> clusterToBackends =
503-
((CloudReplica) replica).getClusterToBackends();
504-
if (!clusterToBackends.containsKey(cluster)) {
502+
Map<String, List<Long>> primaryClusterToBackends =
503+
((CloudReplica) replica).getprimaryClusterToBackends();
504+
if (!primaryClusterToBackends.containsKey(cluster)) {
505505
long beId = ((CloudReplica) replica).hashReplicaToBe(cluster, true);
506+
((CloudReplica) replica).updateClusterToBe(cluster, beId, true);
506507
if (beId <= 0) {
507508
assignedErrNum++;
508509
continue;
509510
}
510511
List<Long> bes = new ArrayList<Long>();
511512
bes.add(beId);
512-
clusterToBackends.put(cluster, bes);
513+
primaryClusterToBackends.put(cluster, bes);
513514

514515
assigned = true;
515516
beIds.add(beId);
516517
tabletIds.add(tablet.getId());
517518
} else {
518-
beIds.add(clusterToBackends.get(cluster).get(0));
519+
beIds.add(primaryClusterToBackends.get(cluster).get(0));
519520
tabletIds.add(tablet.getId());
520521
}
521522
}
@@ -569,9 +570,9 @@ public void statRouteInfo() {
569570
loopCloudReplica((Database db, Table table, Partition partition, MaterializedIndex index, String cluster) -> {
570571
for (Tablet tablet : index.getTablets()) {
571572
for (Replica replica : tablet.getReplicas()) {
572-
Map<String, List<Long>> clusterToBackends =
573-
((CloudReplica) replica).getClusterToBackends();
574-
for (Map.Entry<String, List<Long>> entry : clusterToBackends.entrySet()) {
573+
Map<String, List<Long>> primaryClusterToBackends =
574+
((CloudReplica) replica).getprimaryClusterToBackends();
575+
for (Map.Entry<String, List<Long>> entry : primaryClusterToBackends.entrySet()) {
575576
if (!cluster.equals(entry.getKey())) {
576577
continue;
577578
}
@@ -729,7 +730,7 @@ private void updateBeToTablets(Tablet pickedTablet, long srcBe, long destBe, Bal
729730
private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String clusterId,
730731
List<UpdateCloudReplicaInfo> infos) {
731732
CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0);
732-
cloudReplica.updateClusterToBe(clusterId, destBe);
733+
cloudReplica.updateClusterToBe(clusterId, destBe, true);
733734
Database db = Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
734735
if (db == null) {
735736
return;
@@ -973,7 +974,7 @@ private void migrateTablets(Long srcBe, Long dstBe) {
973974
String clusterId = be.getCloudClusterId();
974975
String clusterName = be.getCloudClusterName();
975976
// update replica location info
976-
cloudReplica.updateClusterToBe(clusterId, dstBe);
977+
cloudReplica.updateClusterToBe(clusterId, dstBe, true);
977978
LOG.info("cloud be migrate tablet {} from srcBe={} to dstBe={}, clusterId={}, clusterName={}",
978979
tablet.getId(), srcBe, dstBe, clusterId, clusterName);
979980

fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,7 @@ private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplica
10361036
clusterId = realClusterId;
10371037
}
10381038

1039-
((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeId());
1039+
((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeId(), true);
10401040

10411041
LOG.debug("update single cloud replica cluster {} replica {} be {}", info.getClusterId(),
10421042
replica.getId(), info.getBeId());
@@ -1062,7 +1062,7 @@ private void unprotectUpdateCloudReplica(OlapTable olapTable, UpdateCloudReplica
10621062

10631063
LOG.debug("update cloud replica cluster {} replica {} be {}", info.getClusterId(),
10641064
replica.getId(), info.getBeIds().get(i));
1065-
((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeIds().get(i));
1065+
((CloudReplica) replica).updateClusterToBe(clusterId, info.getBeIds().get(i), true);
10661066
}
10671067
}
10681068
} catch (Exception e) {

regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster') {
6363
PARTITION p1998 VALUES [("19980101"), ("19990101")))
6464
DISTRIBUTED BY HASH(k1) BUCKETS 3
6565
"""
66-
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.clusterToBackends");
66+
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
6767
sql """set global forward_to_master=false"""
6868

6969
// add a be

0 commit comments

Comments
 (0)