Skip to content

Commit 22c0dae

Browse files
justfortastedataroaring
authored andcommitted
[bugfix](backup)(cooldown) cancel backup properly when be backup failed (#38724)
Currently, when a backup job failed, but it still at the state of SNAPSHOTING. Cancel the cancel backup properly when be backup failed
1 parent 26cc922 commit 22c0dae

File tree

5 files changed

+286
-0
lines changed

5 files changed

+286
-0
lines changed

be/src/common/status.cpp

+7
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ void Status::to_thrift(TStatus* s) const {
3434
// << "The error code has to > 0 because TStatusCode need it > 0, it's actual value is "
3535
// << _code;
3636
s->status_code = (int16_t)_code > 0 ? (TStatusCode::type)_code : TStatusCode::INTERNAL_ERROR;
37+
38+
if (_code == ErrorCode::VERSION_ALREADY_MERGED) {
39+
s->status_code = TStatusCode::OLAP_ERR_VERSION_ALREADY_MERGED;
40+
} else if (_code == ErrorCode::TABLE_NOT_FOUND) {
41+
s->status_code = TStatusCode::TABLET_MISSING;
42+
}
43+
3744
s->error_msgs.push_back(fmt::format("({})[{}]{}", BackendOptions::get_localhost(),
3845
code_as_string(), _err_msg ? _err_msg->_msg : ""));
3946
s->__isset.error_msgs = true;

be/src/olap/snapshot_manager.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s
8383
}
8484

8585
TabletSharedPtr ref_tablet = _engine.tablet_manager()->get_tablet(request.tablet_id);
86+
87+
DBUG_EXECUTE_IF("SnapshotManager::make_snapshot.inject_failure", { ref_tablet = nullptr; })
88+
8689
if (ref_tablet == nullptr) {
8790
return Status::Error<TABLE_NOT_FOUND>("failed to get tablet. tablet={}", request.tablet_id);
8891
}

be/src/olap/tablet.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,14 @@ Status Tablet::capture_consistent_versions_unlocked(const Version& spec_version,
861861
}
862862
}
863863
}
864+
865+
DBUG_EXECUTE_IF("TTablet::capture_consistent_versions.inject_failure", {
866+
auto tablet_id = dp->param<int64>("tablet_id", -1);
867+
if (tablet_id != -1 && tablet_id == _tablet_meta->tablet_id()) {
868+
status = Status::Error<VERSION_ALREADY_MERGED>("version already merged");
869+
}
870+
});
871+
864872
return status;
865873
}
866874

fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java

+69
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,61 @@ public BackupContent getContent() {
169169
return BackupContent.ALL;
170170
}
171171

172+
private synchronized boolean tryNewTabletSnapshotTask(SnapshotTask task) {
173+
Table table = env.getInternalCatalog().getTableByTableId(task.getTableId());
174+
if (table == null) {
175+
return false;
176+
}
177+
OlapTable tbl = (OlapTable) table;
178+
tbl.readLock();
179+
try {
180+
if (tbl.getId() != task.getTableId()) {
181+
return false;
182+
}
183+
Partition partition = tbl.getPartition(task.getPartitionId());
184+
if (partition == null) {
185+
return false;
186+
}
187+
MaterializedIndex index = partition.getIndex(task.getIndexId());
188+
if (index == null) {
189+
return false;
190+
}
191+
Tablet tablet = index.getTablet(task.getTabletId());
192+
if (tablet == null) {
193+
return false;
194+
}
195+
Replica replica = chooseReplica(tablet, task.getVersion());
196+
if (replica == null) {
197+
return false;
198+
}
199+
200+
//clear old task
201+
AgentTaskQueue.removeTaskOfType(TTaskType.MAKE_SNAPSHOT, task.getTabletId());
202+
unfinishedTaskIds.remove(task.getTabletId());
203+
taskProgress.remove(task.getTabletId());
204+
taskErrMsg.remove(task.getTabletId());
205+
206+
SnapshotTask newTask = new SnapshotTask(null, replica.getBackendId(), task.getTabletId(),
207+
task.getJobId(), task.getDbId(), tbl.getId(), task.getPartitionId(),
208+
task.getIndexId(), task.getTabletId(),
209+
task.getVersion(),
210+
task.getSchemaHash(), timeoutMs, false /* not restore task */);
211+
AgentBatchTask batchTask = new AgentBatchTask();
212+
batchTask.addTask(newTask);
213+
unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
214+
215+
//send task
216+
AgentTaskQueue.addTask(newTask);
217+
AgentTaskExecutor.submit(batchTask);
218+
219+
} finally {
220+
tbl.readUnlock();
221+
}
222+
223+
return true;
224+
}
225+
226+
172227
public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
173228
Preconditions.checkState(task.getJobId() == jobId);
174229

@@ -181,6 +236,20 @@ public synchronized boolean finishTabletSnapshotTask(SnapshotTask task, TFinishT
181236
"make snapshot failed, version already merged");
182237
cancelInternal();
183238
}
239+
240+
if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING
241+
&& !tryNewTabletSnapshotTask(task)) {
242+
status = new Status(ErrCode.NOT_FOUND,
243+
"make snapshot failed, failed to ge tablet, table will be droped or truncated");
244+
cancelInternal();
245+
}
246+
247+
if (request.getTaskStatus().getStatusCode() == TStatusCode.NOT_IMPLEMENTED_ERROR) {
248+
status = new Status(ErrCode.COMMON_ERROR,
249+
"make snapshot failed, currently not support backup tablet with cooldowned remote data");
250+
cancelInternal();
251+
}
252+
184253
return false;
185254
}
186255

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_backup_cancelled", "backup_cancelled") {
19+
String suiteName = "test_backup_cancelled"
20+
String repoName = "${suiteName}_repo"
21+
String dbName = "${suiteName}_db"
22+
String tableName = "${suiteName}_table"
23+
String snapshotName = "${suiteName}_snapshot"
24+
String snapshotName_1 = "${suiteName}_snapshot1"
25+
26+
def syncer = getSyncer()
27+
syncer.createS3Repository(repoName)
28+
29+
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
30+
sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
31+
sql """
32+
CREATE TABLE ${dbName}.${tableName} (
33+
`id` LARGEINT NOT NULL,
34+
`count` LARGEINT SUM DEFAULT "0")
35+
AGGREGATE KEY(`id`)
36+
DISTRIBUTED BY HASH(`id`) BUCKETS 2
37+
PROPERTIES
38+
(
39+
"replication_num" = "1"
40+
)
41+
"""
42+
43+
List<String> values = []
44+
for (int i = 1; i <= 10; ++i) {
45+
values.add("(${i}, ${i})")
46+
}
47+
sql "INSERT INTO ${dbName}.${tableName} VALUES ${values.join(",")}"
48+
def result = sql "SELECT * FROM ${dbName}.${tableName}"
49+
assertEquals(result.size(), values.size());
50+
51+
result = sql_return_maparray """show tablets from ${dbName}.${tableName}"""
52+
assertNotNull(result)
53+
def tabletId = null
54+
for (def res : result) {
55+
tabletId = res.TabletId
56+
break
57+
}
58+
59+
// test failed to get tablet when truncate or drop table
60+
61+
GetDebugPoint().enableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure", [tablet_id:"${tabletId}", execute:3]);
62+
63+
64+
sql """
65+
BACKUP SNAPSHOT ${dbName}.${snapshotName}
66+
TO `${repoName}`
67+
ON (${tableName})
68+
"""
69+
70+
syncer.waitSnapshotFinish(dbName)
71+
72+
73+
GetDebugPoint().disableDebugPointForAllBEs("SnapshotManager::make_snapshot.inject_failure")
74+
75+
76+
77+
78+
// test missing versions when compaction or balance
79+
80+
GetDebugPoint().enableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure", [tablet_id:"${tabletId}", execute:1]);
81+
82+
sql """
83+
BACKUP SNAPSHOT ${dbName}.${snapshotName_1}
84+
TO `${repoName}`
85+
ON (${tableName})
86+
"""
87+
88+
syncer.waitSnapshotFinish(dbName)
89+
90+
GetDebugPoint().disableDebugPointForAllBEs("Tablet::capture_consistent_versions.inject_failure");
91+
92+
93+
def snapshot = syncer.getSnapshotTimestamp(repoName, snapshotName)
94+
assertTrue(snapshot != null)
95+
96+
sql "TRUNCATE TABLE ${dbName}.${tableName}"
97+
98+
sql """
99+
RESTORE SNAPSHOT ${dbName}.${snapshotName}
100+
FROM `${repoName}`
101+
ON ( `${tableName}`)
102+
PROPERTIES
103+
(
104+
"backup_timestamp" = "${snapshot}",
105+
"reserve_replica" = "true"
106+
)
107+
"""
108+
109+
syncer.waitAllRestoreFinish(dbName)
110+
111+
result = sql "SELECT * FROM ${dbName}.${tableName}"
112+
assertEquals(result.size(), values.size());
113+
114+
sql "DROP TABLE ${dbName}.${tableName} FORCE"
115+
sql "DROP DATABASE ${dbName} FORCE"
116+
sql "DROP REPOSITORY `${repoName}`"
117+
}
118+
119+
120+
suite("test_backup_cooldown_cancelled", "backup_cooldown_cancelled") {
121+
122+
String suiteName = "test_backup_cooldown_cancelled"
123+
String resource_name = "resource_${suiteName}"
124+
String policy_name= "policy_${suiteName}"
125+
String dbName = "${suiteName}_db"
126+
String tableName = "${suiteName}_table"
127+
String snapshotName = "${suiteName}_snapshot"
128+
String repoName = "${suiteName}_repo"
129+
130+
def syncer = getSyncer()
131+
syncer.createS3Repository(repoName)
132+
133+
134+
135+
sql """
136+
CREATE RESOURCE IF NOT EXISTS "${resource_name}"
137+
PROPERTIES(
138+
"type"="s3",
139+
"AWS_ENDPOINT" = "${getS3Endpoint()}",
140+
"AWS_REGION" = "${getS3Region()}",
141+
"AWS_ROOT_PATH" = "regression/cooldown",
142+
"AWS_ACCESS_KEY" = "${getS3AK()}",
143+
"AWS_SECRET_KEY" = "${getS3SK()}",
144+
"AWS_MAX_CONNECTIONS" = "50",
145+
"AWS_REQUEST_TIMEOUT_MS" = "3000",
146+
"AWS_CONNECTION_TIMEOUT_MS" = "1000",
147+
"AWS_BUCKET" = "${getS3BucketName()}",
148+
"s3_validity_check" = "true"
149+
);
150+
"""
151+
152+
sql """
153+
CREATE STORAGE POLICY IF NOT EXISTS ${policy_name}
154+
PROPERTIES(
155+
"storage_resource" = "${resource_name}",
156+
"cooldown_ttl" = "300"
157+
)
158+
"""
159+
160+
sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
161+
sql "DROP TABLE IF EXISTS ${dbName}.${tableName}"
162+
163+
sql """
164+
CREATE TABLE ${dbName}.${tableName}
165+
(
166+
k1 BIGINT,
167+
v1 VARCHAR(48)
168+
)
169+
DUPLICATE KEY(k1)
170+
DISTRIBUTED BY HASH (k1) BUCKETS 3
171+
PROPERTIES(
172+
"storage_policy" = "${policy_name}",
173+
"replication_allocation" = "tag.location.default: 1"
174+
);
175+
"""
176+
177+
178+
// test backup cooldown table and should be cancelled
179+
sql """
180+
BACKUP SNAPSHOT ${dbName}.${snapshotName}
181+
TO `${repoName}`
182+
ON (${tableName})
183+
"""
184+
185+
syncer.waitSnapshotFinish(dbName)
186+
187+
//cleanup
188+
sql "DROP TABLE ${dbName}.${tableName} FORCE"
189+
sql "DROP DATABASE ${dbName} FORCE"
190+
sql "DROP REPOSITORY `${repoName}`"
191+
192+
sql """
193+
drop storage policy ${policy_name};
194+
"""
195+
196+
sql """
197+
drop resource ${resource_name};
198+
"""
199+
}

0 commit comments

Comments
 (0)