Skip to content

Commit 976a192

Browse files
committed
[fix](cloud) Fix migrate tablets between backends back and forth
BUG: cloud rebalancer migrates tablets back and forth: move from A to B, then B to A, then A to B, ... The reason is that the tabletToInfightTask map tracking in-flight tasks ignored the multi-cluster scenario, and in the statRouteInfo function, the cluster information was lost, which led to inaccurate tablets statistics.
1 parent 43c9f67 commit 976a192

File tree

1 file changed

+63
-32
lines changed

1 file changed

+63
-32
lines changed

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

+63-32
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
4747

4848
import com.google.common.base.Preconditions;
49+
import lombok.Getter;
4950
import org.apache.logging.log4j.LogManager;
5051
import org.apache.logging.log4j.Logger;
5152

@@ -54,6 +55,7 @@
5455
import java.util.HashSet;
5556
import java.util.List;
5657
import java.util.Map;
58+
import java.util.Objects;
5759
import java.util.Random;
5860
import java.util.Set;
5961
import java.util.concurrent.ConcurrentHashMap;
@@ -94,7 +96,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
9496

9597
private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new LinkedBlockingQueue<Pair<Long, Long>>();
9698

97-
private Map<Long, InfightTask> tabletToInfightTask = new HashMap<Long, InfightTask>();
99+
private Map<InfightTablet, InfightTask> tabletToInfightTask = new HashMap<>();
98100

99101
private long assignedErrNum = 0;
100102

@@ -115,12 +117,39 @@ public enum BalanceType {
115117
PARTITION
116118
}
117119

120+
@Getter
121+
private class InfightTablet {
122+
private final Long tabletId;
123+
private final String clusterId;
124+
125+
public InfightTablet(Long tabletId, String clusterId) {
126+
this.tabletId = tabletId;
127+
this.clusterId = clusterId;
128+
}
129+
130+
@Override
131+
public boolean equals(Object o) {
132+
if (this == o) {
133+
return true;
134+
}
135+
if (o == null || getClass() != o.getClass()) {
136+
return false;
137+
}
138+
InfightTablet that = (InfightTablet) o;
139+
return tabletId.equals(that.tabletId) && clusterId.equals(that.clusterId);
140+
}
141+
142+
@Override
143+
public int hashCode() {
144+
return Objects.hash(tabletId, clusterId);
145+
}
146+
}
147+
118148
private class InfightTask {
119149
public Tablet pickedTablet;
120150
public long srcBe;
121151
public long destBe;
122152
public boolean isGlobal;
123-
public String clusterId;
124153
public Map<Long, List<Tablet>> beToTablets;
125154
public long startTimestamp;
126155
BalanceType balanceType;
@@ -343,41 +372,44 @@ public void globalBalance() {
343372
}
344373

345374
public void checkInflghtWarmUpCacheAsync() {
346-
Map<Long, List<Long>> beToTabletIds = new HashMap<Long, List<Long>>();
375+
Map<Long, List<InfightTask>> beToTabletIds = new HashMap<Long, List<InfightTask>>();
347376

348-
for (Map.Entry<Long, InfightTask> entry : tabletToInfightTask.entrySet()) {
349-
beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<Long>());
350-
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
377+
for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
378+
beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
379+
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue());
351380
}
352381

353382
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
354-
for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
383+
for (Map.Entry<Long, List<InfightTask>> entry : beToTabletIds.entrySet()) {
355384
LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
356385
Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
357386
if (destBackend == null) {
358-
for (long tabletId : entry.getValue()) {
359-
tabletToInfightTask.remove(tabletId);
387+
for (InfightTask task : entry.getValue()) {
388+
for (InfightTablet key : tabletToInfightTask.keySet()) {
389+
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId));
390+
}
360391
}
361392
continue;
362393
}
363-
364-
Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(entry.getValue(), entry.getKey());
394+
List<Long> tablets = entry.getValue().stream()
395+
.map(task -> task.pickedTablet.getId()).collect(Collectors.toList());
396+
Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
365397
if (taskDone == null) {
366398
LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}",
367399
entry.getKey(), entry.getValue());
368400
continue;
369401
}
370-
402+
String clusterId = cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId();
371403
for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
372-
InfightTask task = tabletToInfightTask.get(result.getKey());
373-
if (result.getValue()
374-
|| System.currentTimeMillis() / 1000 - task.startTimestamp
375-
> Config.cloud_pre_heating_time_limit_sec) {
404+
InfightTask task = tabletToInfightTask
405+
.getOrDefault(new InfightTablet(result.getKey(), clusterId), null);
406+
if (task != null && (result.getValue() || System.currentTimeMillis() / 1000 - task.startTimestamp
407+
> Config.cloud_pre_heating_time_limit_sec)) {
376408
if (!result.getValue()) {
377409
LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey());
378410
}
379-
updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId, infos);
380-
tabletToInfightTask.remove(result.getKey());
411+
updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);
412+
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId));
381413
}
382414
}
383415
}
@@ -393,13 +425,13 @@ public void checkInflghtWarmUpCacheAsync() {
393425
}
394426

395427
// recalculate inflight beToTablets, just for print the log
396-
beToTabletIds = new HashMap<Long, List<Long>>();
397-
for (Map.Entry<Long, InfightTask> entry : tabletToInfightTask.entrySet()) {
398-
beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<Long>());
399-
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
428+
beToTabletIds.clear();
429+
for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
430+
beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
431+
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue());
400432
}
401433

402-
for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
434+
for (Map.Entry<Long, List<InfightTask>> entry : beToTabletIds.entrySet()) {
403435
LOG.info("after pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
404436
}
405437
}
@@ -449,7 +481,7 @@ public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
449481
}
450482
LOG.info("notify decommission response: {} ", response);
451483
} catch (RpcException e) {
452-
LOG.info("failed to notify decommission {}", e);
484+
LOG.info("failed to notify decommission", e);
453485
return;
454486
}
455487
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
@@ -552,8 +584,10 @@ public void statRouteInfo() {
552584
fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet,
553585
tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets);
554586

555-
if (tabletToInfightTask.containsKey(tablet.getId())) {
556-
InfightTask task = tabletToInfightTask.get(tablet.getId());
587+
InfightTask task = tabletToInfightTask
588+
.getOrDefault(new InfightTablet(tablet.getId(), cluster), null);
589+
590+
if (task != null) {
557591
fillBeToTablets(task.destBe, table.getId(), partition.getId(), index.getId(), tablet,
558592
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
559593
} else {
@@ -808,9 +842,7 @@ private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, B
808842
List<Tablet> destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId())
809843
.get(cloudReplica.getIndexId()).get(destBe);
810844
long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
811-
if (minBeSize >= maxBeSize) {
812-
return true;
813-
}
845+
return minBeSize >= maxBeSize;
814846
}
815847

816848
return false;
@@ -881,10 +913,9 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet
881913
task.srcBe = srcBe;
882914
task.destBe = destBe;
883915
task.balanceType = balanceType;
884-
task.clusterId = clusterId;
885916
task.beToTablets = beToTablets;
886917
task.startTimestamp = System.currentTimeMillis() / 1000;
887-
tabletToInfightTask.put(pickedTablet.getId(), task);
918+
tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), clusterId), task);
888919

889920
LOG.info("pre cache {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}",
890921
pickedTablet.getId(), srcBe, destBe, clusterId,
@@ -936,7 +967,7 @@ private void migrateTablets(Long srcBe, Long dstBe) {
936967
CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0);
937968
Backend be = cloudSystemInfoService.getBackend(srcBe);
938969
if (be == null) {
939-
LOG.info("backend {} not found", be);
970+
LOG.info("src backend {} not found", srcBe);
940971
continue;
941972
}
942973
String clusterId = be.getCloudClusterId();

0 commit comments

Comments
 (0)