Skip to content

Commit 090b19f

Browse files
authored
[fix](cloud) Fix migrate tablets between backends back and forth (#39792)
BUG: cloud rebalancer migrates tablets back and forth: move from A to B, then B to A, then A to B, ... The reason is that the tabletToInfightTask map tracking in-flight tasks ignored the multi-cluster scenario, and in the statRouteInfo function, the cluster information was lost, which led to inaccurate tablets statistics.
1 parent 96d614f commit 090b19f

File tree

3 files changed

+205
-32
lines changed

3 files changed

+205
-32
lines changed

be/src/cloud/cloud_backend_service.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
180180
const TCheckWarmUpCacheAsyncRequest& request) {
181181
std::map<int64_t, bool> task_done;
182182
_engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done);
183+
DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", {
184+
for (auto& it : task_done) {
185+
it.second = false;
186+
}
187+
});
183188
response.__set_task_done(task_done);
184189

185190
Status st = Status::OK();

fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java

+63-32
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
4747

4848
import com.google.common.base.Preconditions;
49+
import lombok.Getter;
4950
import org.apache.logging.log4j.LogManager;
5051
import org.apache.logging.log4j.Logger;
5152

@@ -54,6 +55,7 @@
5455
import java.util.HashSet;
5556
import java.util.List;
5657
import java.util.Map;
58+
import java.util.Objects;
5759
import java.util.Random;
5860
import java.util.Set;
5961
import java.util.concurrent.ConcurrentHashMap;
@@ -94,7 +96,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
9496

9597
private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new LinkedBlockingQueue<Pair<Long, Long>>();
9698

97-
private Map<Long, InfightTask> tabletToInfightTask = new HashMap<Long, InfightTask>();
99+
private Map<InfightTablet, InfightTask> tabletToInfightTask = new HashMap<>();
98100

99101
private long assignedErrNum = 0;
100102

@@ -115,12 +117,39 @@ public enum BalanceType {
115117
PARTITION
116118
}
117119

120+
@Getter
121+
private class InfightTablet {
122+
private final Long tabletId;
123+
private final String clusterId;
124+
125+
public InfightTablet(Long tabletId, String clusterId) {
126+
this.tabletId = tabletId;
127+
this.clusterId = clusterId;
128+
}
129+
130+
@Override
131+
public boolean equals(Object o) {
132+
if (this == o) {
133+
return true;
134+
}
135+
if (o == null || getClass() != o.getClass()) {
136+
return false;
137+
}
138+
InfightTablet that = (InfightTablet) o;
139+
return tabletId.equals(that.tabletId) && clusterId.equals(that.clusterId);
140+
}
141+
142+
@Override
143+
public int hashCode() {
144+
return Objects.hash(tabletId, clusterId);
145+
}
146+
}
147+
118148
private class InfightTask {
119149
public Tablet pickedTablet;
120150
public long srcBe;
121151
public long destBe;
122152
public boolean isGlobal;
123-
public String clusterId;
124153
public Map<Long, List<Tablet>> beToTablets;
125154
public long startTimestamp;
126155
BalanceType balanceType;
@@ -343,41 +372,44 @@ public void globalBalance() {
343372
}
344373

345374
public void checkInflghtWarmUpCacheAsync() {
346-
Map<Long, List<Long>> beToTabletIds = new HashMap<Long, List<Long>>();
375+
Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long, List<InfightTask>>();
347376

348-
for (Map.Entry<Long, InfightTask> entry : tabletToInfightTask.entrySet()) {
349-
beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<Long>());
350-
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
377+
for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
378+
beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
379+
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
351380
}
352381

353382
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
354-
for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
383+
for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
355384
LOG.info("before pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
356385
Backend destBackend = cloudSystemInfoService.getBackend(entry.getKey());
357386
if (destBackend == null) {
358-
for (long tabletId : entry.getValue()) {
359-
tabletToInfightTask.remove(tabletId);
387+
for (InfightTask task : entry.getValue()) {
388+
for (InfightTablet key : tabletToInfightTask.keySet()) {
389+
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), key.clusterId));
390+
}
360391
}
361392
continue;
362393
}
363-
364-
Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(entry.getValue(), entry.getKey());
394+
List<Long> tablets = entry.getValue().stream()
395+
.map(task -> task.pickedTablet.getId()).collect(Collectors.toList());
396+
Map<Long, Boolean> taskDone = sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
365397
if (taskDone == null) {
366398
LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {}, inFight tasks {}",
367399
entry.getKey(), entry.getValue());
368400
continue;
369401
}
370-
402+
String clusterId = cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId();
371403
for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
372-
InfightTask task = tabletToInfightTask.get(result.getKey());
373-
if (result.getValue()
374-
|| System.currentTimeMillis() / 1000 - task.startTimestamp
375-
> Config.cloud_pre_heating_time_limit_sec) {
404+
InfightTask task = tabletToInfightTask
405+
.getOrDefault(new InfightTablet(result.getKey(), clusterId), null);
406+
if (task != null && (result.getValue() || System.currentTimeMillis() / 1000 - task.startTimestamp
407+
> Config.cloud_pre_heating_time_limit_sec)) {
376408
if (!result.getValue()) {
377409
LOG.info("{} pre cache timeout, forced to change the mapping", result.getKey());
378410
}
379-
updateClusterToBeMap(task.pickedTablet, task.destBe, task.clusterId, infos);
380-
tabletToInfightTask.remove(result.getKey());
411+
updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);
412+
tabletToInfightTask.remove(new InfightTablet(task.pickedTablet.getId(), clusterId));
381413
}
382414
}
383415
}
@@ -393,13 +425,13 @@ public void checkInflghtWarmUpCacheAsync() {
393425
}
394426

395427
// recalculate inflight beToTablets, just for print the log
396-
beToTabletIds = new HashMap<Long, List<Long>>();
397-
for (Map.Entry<Long, InfightTask> entry : tabletToInfightTask.entrySet()) {
398-
beToTabletIds.putIfAbsent(entry.getValue().destBe, new ArrayList<Long>());
399-
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
428+
beToInfightTasks.clear();
429+
for (Map.Entry<InfightTablet, InfightTask> entry : tabletToInfightTask.entrySet()) {
430+
beToInfightTasks.putIfAbsent(entry.getValue().destBe, new ArrayList<>());
431+
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
400432
}
401433

402-
for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
434+
for (Map.Entry<Long, List<InfightTask>> entry : beToInfightTasks.entrySet()) {
403435
LOG.info("after pre cache check dest be {} inflight task num {}", entry.getKey(), entry.getValue().size());
404436
}
405437
}
@@ -449,7 +481,7 @@ public void checkDecommissionState(Map<String, List<Long>> clusterToBes) {
449481
}
450482
LOG.info("notify decommission response: {} ", response);
451483
} catch (RpcException e) {
452-
LOG.info("failed to notify decommission {}", e);
484+
LOG.info("failed to notify decommission", e);
453485
return;
454486
}
455487
beToDecommissionedTime.put(beId, System.currentTimeMillis() / 1000);
@@ -552,8 +584,10 @@ public void statRouteInfo() {
552584
fillBeToTablets(bes.get(0), table.getId(), partition.getId(), index.getId(), tablet,
553585
tmpBeToTabletsGlobal, beToTabletsInTable, this.partitionToTablets);
554586

555-
if (tabletToInfightTask.containsKey(tablet.getId())) {
556-
InfightTask task = tabletToInfightTask.get(tablet.getId());
587+
InfightTask task = tabletToInfightTask
588+
.getOrDefault(new InfightTablet(tablet.getId(), cluster), null);
589+
590+
if (task != null) {
557591
fillBeToTablets(task.destBe, table.getId(), partition.getId(), index.getId(), tablet,
558592
futureBeToTabletsGlobal, futureBeToTabletsInTable, futurePartitionToTablets);
559593
} else {
@@ -808,9 +842,7 @@ private boolean isConflict(long srcBe, long destBe, CloudReplica cloudReplica, B
808842
List<Tablet> destBeTablets = beToTabletsInParts.get(cloudReplica.getPartitionId())
809843
.get(cloudReplica.getIndexId()).get(destBe);
810844
long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
811-
if (minBeSize >= maxBeSize) {
812-
return true;
813-
}
845+
return minBeSize >= maxBeSize;
814846
}
815847

816848
return false;
@@ -881,10 +913,9 @@ private void balanceImpl(List<Long> bes, String clusterId, Map<Long, List<Tablet
881913
task.srcBe = srcBe;
882914
task.destBe = destBe;
883915
task.balanceType = balanceType;
884-
task.clusterId = clusterId;
885916
task.beToTablets = beToTablets;
886917
task.startTimestamp = System.currentTimeMillis() / 1000;
887-
tabletToInfightTask.put(pickedTablet.getId(), task);
918+
tabletToInfightTask.put(new InfightTablet(pickedTablet.getId(), clusterId), task);
888919

889920
LOG.info("pre cache {} from {} to {}, cluster {} minNum {} maxNum {} beNum {} tabletsNum {}, part {}",
890921
pickedTablet.getId(), srcBe, destBe, clusterId,
@@ -936,7 +967,7 @@ private void migrateTablets(Long srcBe, Long dstBe) {
936967
CloudReplica cloudReplica = (CloudReplica) tablet.getReplicas().get(0);
937968
Backend be = cloudSystemInfoService.getBackend(srcBe);
938969
if (be == null) {
939-
LOG.info("backend {} not found", be);
970+
LOG.info("src backend {} not found", srcBe);
940971
continue;
941972
}
942973
String clusterId = be.getCloudClusterId();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
import groovy.json.JsonSlurper
20+
import org.awaitility.Awaitility;
21+
import static java.util.concurrent.TimeUnit.SECONDS;
22+
import org.codehaus.groovy.runtime.IOGroovyMethods
23+
24+
suite('test_warmup_rebalance_in_cloud', 'multi_cluster') {
25+
if (!isCloudMode()) {
26+
return;
27+
}
28+
def options = new ClusterOptions()
29+
options.feConfigs += [
30+
'cloud_cluster_check_interval_second=1',
31+
'enable_cloud_warm_up_for_rebalance=true',
32+
'cloud_tablet_rebalancer_interval_second=1',
33+
'cloud_balance_tablet_percent_per_run=0.5',
34+
'sys_log_verbose_modules=org',
35+
'cloud_pre_heating_time_limit_sec=600'
36+
]
37+
options.setFeNum(2)
38+
options.setBeNum(3)
39+
options.cloudMode = true
40+
options.enableDebugPoints()
41+
def check = { String feLogPath ->
42+
log.info("search fe log path: {}", feLogPath)
43+
Map<String, List<String>> circularRebalanceMap = [:]
44+
boolean isCircularRebalanceDetected = false
45+
46+
new File(feLogPath).text.tokenize('\n')
47+
.findAll { it =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/ }
48+
.each { line ->
49+
def (tabletId, fromBe, toBe, clusterId) = (line =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/)[0][1..-1]
50+
51+
String clusterPreCacheKey = "$clusterId-$tabletId"
52+
53+
if (!circularRebalanceMap.containsKey(clusterPreCacheKey)) {
54+
circularRebalanceMap[clusterPreCacheKey] = new ArrayList<>()
55+
}
56+
57+
List<String> paths = circularRebalanceMap[clusterPreCacheKey]
58+
59+
if (paths.contains(toBe)) {
60+
isCircularRebalanceDetected = true
61+
log.info("Circular rebalance detected for tabletId: {}, clusterId: {}", tabletId, clusterId)
62+
assertFalse(true)
63+
}
64+
65+
paths << fromBe
66+
circularRebalanceMap[clusterPreCacheKey] = paths
67+
68+
if (!paths.contains(toBe)) {
69+
paths << (toBe as String)
70+
}
71+
}
72+
73+
if (!isCircularRebalanceDetected) {
74+
log.info("No circular rebalance detected.")
75+
}
76+
}
77+
78+
docker(options) {
79+
def clusterName = "newcluster1"
80+
// 添加一个新的cluster add_new_cluster
81+
cluster.addBackend(2, clusterName)
82+
83+
def ret = sql_return_maparray """show clusters"""
84+
log.info("show clusters: {}", ret)
85+
assertEquals(2, ret.size())
86+
87+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
88+
sql """set global forward_to_master=false"""
89+
90+
sql """
91+
CREATE TABLE table100 (
92+
class INT,
93+
id INT,
94+
score INT SUM
95+
)
96+
AGGREGATE KEY(class, id)
97+
DISTRIBUTED BY HASH(class) BUCKETS 48
98+
"""
99+
100+
sql """
101+
INSERT INTO table100 VALUES (1, 1, 100);
102+
"""
103+
104+
dockerAwaitUntil(5) {
105+
ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
106+
log.info("replica distribution table100: {}", ret)
107+
ret.size() == 5
108+
}
109+
110+
sql """use @newcluster1"""
111+
def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """
112+
assertEquals(5, result.size())
113+
int replicaNum = 0
114+
115+
for (def row : result) {
116+
log.info("replica distribution: ${row} ".toString())
117+
if (row.CloudClusterName == "newcluster1") {
118+
replicaNum = Integer.valueOf((String) row.ReplicaNum)
119+
assertTrue(replicaNum <= 25 && replicaNum >= 23)
120+
}
121+
}
122+
def fe1 = cluster.getFeByIndex(1)
123+
String feLogPath = fe1.getLogFilePath()
124+
// stop be id 1, 4
125+
cluster.stopBackends(1, 4)
126+
// check log
127+
sleep(10 * 1000)
128+
check feLogPath
129+
130+
// start be id 1, 4
131+
cluster.startBackends(1, 4)
132+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
133+
// check log
134+
sleep(10 * 1000)
135+
check feLogPath
136+
}
137+
}

0 commit comments

Comments
 (0)