Skip to content

Commit a00aeb4

Browse files
committed
add case
1 parent 976a192 commit a00aeb4

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed

be/src/cloud/cloud_backend_service.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
180180
const TCheckWarmUpCacheAsyncRequest& request) {
181181
std::map<int64_t, bool> task_done;
182182
_engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done);
183+
DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", {
184+
for (auto& it : task_done) {
185+
it.second = false;
186+
}
187+
});
183188
response.__set_task_done(task_done);
184189

185190
Status st = Status::OK();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
import org.apache.doris.regression.suite.ClusterOptions
19+
import groovy.json.JsonSlurper
20+
import org.awaitility.Awaitility;
21+
import static java.util.concurrent.TimeUnit.SECONDS;
22+
import org.codehaus.groovy.runtime.IOGroovyMethods
23+
24+
suite('test_warmup_rebalance_in_cloud', 'multi_cluster') {
25+
if (!isCloudMode()) {
26+
return;
27+
}
28+
def options = new ClusterOptions()
29+
options.feConfigs += [
30+
'cloud_cluster_check_interval_second=1',
31+
'enable_cloud_warm_up_for_rebalance=true',
32+
'cloud_tablet_rebalancer_interval_second=1',
33+
'cloud_balance_tablet_percent_per_run=0.5',
34+
'sys_log_verbose_modules=org',
35+
'cloud_pre_heating_time_limit_sec=600'
36+
]
37+
options.setFeNum(2)
38+
options.setBeNum(3)
39+
options.cloudMode = true
40+
options.enableDebugPoints()
41+
def check = { String feLogPath ->
42+
log.info("search fe log path: {}", feLogPath)
43+
Map<String, List<String>> circularRebalanceMap = [:]
44+
boolean isCircularRebalanceDetected = false
45+
46+
new File(feLogPath).text.tokenize('\n')
47+
.findAll { it =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/ }
48+
.each { line ->
49+
def (tabletId, fromBe, toBe, clusterId) = (line =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/)[0][1..-1]
50+
51+
String clusterPreCacheKey = "$clusterId-$tabletId"
52+
53+
if (!circularRebalanceMap.containsKey(clusterPreCacheKey)) {
54+
circularRebalanceMap[clusterPreCacheKey] = new ArrayList<>()
55+
}
56+
57+
List<String> paths = circularRebalanceMap[clusterPreCacheKey]
58+
59+
if (paths.contains(toBe)) {
60+
isCircularRebalanceDetected = true
61+
log.info("Circular rebalance detected for tabletId: {}, clusterId: {}", tabletId, clusterId)
62+
assertFalse(true)
63+
}
64+
65+
paths << fromBe
66+
circularRebalanceMap[clusterPreCacheKey] = paths
67+
68+
if (!paths.contains(toBe)) {
69+
paths << (toBe as String)
70+
}
71+
}
72+
73+
if (!isCircularRebalanceDetected) {
74+
log.info("No circular rebalance detected.")
75+
}
76+
}
77+
78+
docker(options) {
79+
def clusterName = "newcluster1"
80+
// 添加一个新的cluster add_new_cluster
81+
cluster.addBackend(2, clusterName)
82+
83+
def ret = sql_return_maparray """show clusters"""
84+
log.info("show clusters: {}", ret)
85+
assertEquals(2, ret.size())
86+
87+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
88+
sql """set global forward_to_master=false"""
89+
90+
sql """
91+
CREATE TABLE table100 (
92+
class INT,
93+
id INT,
94+
score INT SUM
95+
)
96+
AGGREGATE KEY(class, id)
97+
DISTRIBUTED BY HASH(class) BUCKETS 48
98+
"""
99+
100+
sql """
101+
INSERT INTO table100 VALUES (1, 1, 100);
102+
"""
103+
104+
dockerAwaitUntil(5) {
105+
ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
106+
log.info("replica distribution table100: {}", ret)
107+
ret.size() == 5
108+
}
109+
110+
sql """use @newcluster1"""
111+
def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION FROM table100; """
112+
assertEquals(5, result.size())
113+
int replicaNum = 0
114+
115+
for (def row : result) {
116+
log.info("replica distribution: ${row} ".toString())
117+
if (row.CloudClusterName == "newcluster1") {
118+
replicaNum = Integer.valueOf((String) row.ReplicaNum)
119+
assertTrue(replicaNum <= 25 && replicaNum >= 23)
120+
}
121+
}
122+
def fe1 = cluster.getFeByIndex(1)
123+
String feLogPath = fe1.getLogFilePath()
124+
// stop be id 1, 4
125+
cluster.stopBackends(1, 4)
126+
// check log
127+
sleep(10 * 1000)
128+
check feLogPath
129+
130+
// start be id 1, 4
131+
cluster.startBackends(1, 4)
132+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
133+
// check log
134+
sleep(10 * 1000)
135+
check feLogPath
136+
}
137+
}

0 commit comments

Comments
 (0)