From 70f960eab4f80e415931a0a3f2515aa831fa7b61 Mon Sep 17 00:00:00 2001 From: Lei Zhang <27994433+SWJTU-ZhangLei@users.noreply.github.com> Date: Sun, 8 Sep 2024 01:37:27 +0800 Subject: [PATCH] [fix](cloud) MS limit max aborted txn num for the same txn label (#40414) --- cloud/src/common/config.h | 3 + cloud/src/meta-service/meta_service_txn.cpp | 16 +- cloud/test/meta_service_test.cpp | 67 +++++++ .../apache/doris/load/loadv2/LoadManager.java | 33 ++-- .../test_stream_load_with_data_quality.csv | 2 + .../test_stream_load_with_data_quality2.csv | 2 + .../test_steam_load_with_data_quality.groovy | 187 ++++++++++++++++++ 7 files changed, 291 insertions(+), 19 deletions(-) create mode 100644 regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality.csv create mode 100644 regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality2.csv create mode 100644 regression-test/suites/load_p0/stream_load/test_steam_load_with_data_quality.groovy diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 78b682641d1a14..a28143bb705ef9 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -208,4 +208,7 @@ CONF_Validator(s3_client_http_scheme, [](const std::string& config) -> bool { // Max retry times for object storage request CONF_mInt64(max_s3_client_retry, "10"); +// Max aborted txn num for the same label name +CONF_mInt64(max_num_aborted_txn, "100"); + } // namespace doris::cloud::config diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index dfa633b270cab4..63f7030e0763d6 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -204,7 +204,8 @@ void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller, // 2. if there is a PREPARE transaction, check if this is a retry request. // 3. if there is a non-aborted transaction, throw label already used exception. - for (auto& cur_txn_id : label_pb.txn_ids()) { + for (auto it = label_pb.txn_ids().rbegin(); it != label_pb.txn_ids().rend(); ++it) { + int64_t cur_txn_id = *it; const std::string cur_info_key = txn_info_key({instance_id, db_id, cur_txn_id}); std::string cur_info_val; err = txn->get(cur_info_key, &cur_info_val); @@ -235,8 +236,19 @@ void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller, } VLOG_DEBUG << "cur_txn_info=" << cur_txn_info.ShortDebugString(); + LOG(INFO) << " size=" << label_pb.txn_ids().size() + << " status=" << cur_txn_info.status() << " txn_id=" << txn_id + << " label=" << label; if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) { - continue; + if (label_pb.txn_ids().size() >= config::max_num_aborted_txn) { + code = MetaServiceCode::INVALID_ARGUMENT; + ss << "too many aborted txn for label=" << label << " txn_id=" << txn_id + << ", please check your data quality"; + msg = ss.str(); + LOG(WARNING) << msg << " label_pb=" << label_pb.ShortDebugString(); + return; + } + break; } if (cur_txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED || diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 90fb0ad09230cc..3a04122e487d9f 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -1012,6 +1012,73 @@ TEST(MetaServiceTest, BeginTxnTest) { ASSERT_GT(res.txn_id(), txn_id); } } + + { + // test reuse label exceed max_num_aborted_txn + + std::string cloud_unique_id = "test_cloud_unique_id"; + int64_t db_id = 124343989; + int64_t table_id = 12897811; + int64_t txn_id = -1; + std::string label = "test_max_num_aborted_txn_label"; + for (int i = 0; i < config::max_num_aborted_txn; i++) { + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id(cloud_unique_id); + TxnInfoPB txn_info; + txn_info.set_db_id(db_id); + txn_info.set_label(label); + txn_info.add_table_ids(table_id); + txn_info.set_timeout_ms(36000); + UniqueIdPB unique_id_pb; + unique_id_pb.set_hi(100); + unique_id_pb.set_lo(10); + txn_info.mutable_request_id()->CopyFrom(unique_id_pb); + req.mutable_txn_info()->CopyFrom(txn_info); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + txn_id = res.txn_id(); + } + // abort txn + { + brpc::Controller cntl; + AbortTxnRequest req; + req.set_cloud_unique_id(cloud_unique_id); + ASSERT_GT(txn_id, 0); + req.set_txn_id(txn_id); + req.set_reason("test"); + AbortTxnResponse res; + meta_service->abort_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + ASSERT_EQ(res.txn_info().status(), TxnStatusPB::TXN_STATUS_ABORTED); + } + } + { + brpc::Controller cntl; + BeginTxnRequest req; + req.set_cloud_unique_id(cloud_unique_id); + TxnInfoPB txn_info; + txn_info.set_db_id(db_id); + txn_info.set_label(label); + txn_info.add_table_ids(table_id); + UniqueIdPB unique_id_pb; + unique_id_pb.set_hi(100); + unique_id_pb.set_lo(10); + txn_info.mutable_request_id()->CopyFrom(unique_id_pb); + txn_info.set_timeout_ms(36000); + req.mutable_txn_info()->CopyFrom(txn_info); + BeginTxnResponse res; + meta_service->begin_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + ASSERT_TRUE(res.status().msg().find("too many aborted txn for label") != + std::string::npos); + } + } } TEST(MetaServiceTest, PrecommitTest1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1026689d8f1448..fa446db2144452 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -851,25 +851,24 @@ private void cleanLabelInternal(long dbId, String label, boolean isReplay) { } } else { List jobs = labelToJob.get(label); - if (jobs == null) { - // no job for this label, just return - return; - } - Iterator iter = jobs.iterator(); - while (iter.hasNext()) { - LoadJob job = iter.next(); - if (!job.isCompleted()) { - continue; + if (jobs != null) { + // stream load labelToJob is null + Iterator iter = jobs.iterator(); + while (iter.hasNext()) { + LoadJob job = iter.next(); + if (!job.isCompleted()) { + continue; + } + if (job instanceof BulkLoadJob) { + ((BulkLoadJob) job).recycleProgress(); + } + iter.remove(); + idToLoadJob.remove(job.getId()); + ++counter; } - if (job instanceof BulkLoadJob) { - ((BulkLoadJob) job).recycleProgress(); + if (jobs.isEmpty()) { + labelToJob.remove(label); } - iter.remove(); - idToLoadJob.remove(job.getId()); - ++counter; - } - if (jobs.isEmpty()) { - labelToJob.remove(label); } } } diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality.csv b/regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality.csv new file mode 100644 index 00000000000000..87940f6901e5f3 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality.csv @@ -0,0 +1,2 @@ +1, NULL, "xxx", 1 +2, NULL, "yyy", 2 diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality2.csv b/regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality2.csv new file mode 100644 index 00000000000000..b790a8a3e91e2b --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_with_data_quality2.csv @@ -0,0 +1,2 @@ +1, 1, "xxx", 1 +2, 1, "yyy", 2 diff --git a/regression-test/suites/load_p0/stream_load/test_steam_load_with_data_quality.groovy b/regression-test/suites/load_p0/stream_load/test_steam_load_with_data_quality.groovy new file mode 100644 index 00000000000000..923285bf6cea45 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_steam_load_with_data_quality.groovy @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Random; + +suite("test_stream_load_with_data_quality", "p0") { + if (!isCloudMode()) { + return; + } + + def tableName = "test_stream_load_with_data_quality" + sql "DROP TABLE IF EXISTS ${tableName}" + + sql """ + CREATE TABLE ${tableName} + ( + siteid INT DEFAULT '10', + citycode SMALLINT NOT NULL, + username VARCHAR(32) DEFAULT '', + pv BIGINT SUM DEFAULT '0' + ) + AGGREGATE KEY(siteid, citycode, username) + DISTRIBUTED BY HASH(siteid) BUCKETS 1; + """ + + String label = UUID.randomUUID().toString().replaceAll("-", "") + + // meta-service max_num_aborted_txn is 100 + for (int i = 0; i < 100; i++) { + streamLoad { + set 'label', "${label}" + set 'column_separator', ',' + table "${tableName}" + time 10000 + file 'test_stream_load_with_data_quality.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(2, json.NumberTotalRows) + assertEquals(2, json.NumberFilteredRows) + } + } + } + + streamLoad { + set 'label', "${label}" + set 'column_separator', ',' + table "${tableName}" + time 10000 + file 'test_stream_load_with_data_quality.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("too many aborted txn")) + } + } + + String dbName = "regression_test_load_p0_stream_load" + test { + sql "clean label ${label} from ${dbName};" + } + + streamLoad { + set 'label', "${label}" + set 'column_separator', ',' + table "${tableName}" + time 10000 + file 'test_stream_load_with_data_quality2.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + + test { + sql "clean label ${label} from ${dbName};" + } + + // meta-service max_num_aborted_txn is 100 + for (int i = 0; i < 99; i++) { + streamLoad { + set 'label', "${label}" + set 'column_separator', ',' + table "${tableName}" + time 10000 + file 'test_stream_load_with_data_quality.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(2, json.NumberTotalRows) + assertEquals(2, json.NumberFilteredRows) + } + } + } + + streamLoad { + set 'label', "${label}" + set 'column_separator', ',' + table "${tableName}" + time 10000 + file 'test_stream_load_with_data_quality2.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + + streamLoad { + set 'label', "${label}" + set 'column_separator', ',' + table "${tableName}" + time 10000 + file 'test_stream_load_with_data_quality2.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("label already exists", json.Status.toLowerCase()) + assertTrue(json.Message.contains("has already been used")) + } + } + + test { + sql "clean label ${label} from ${dbName};" + } + + streamLoad { + set 'label', "${label}" + set 'column_separator', ',' + table "${tableName}" + time 10000 + file 'test_stream_load_with_data_quality2.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + + test { + sql "clean label ${label} from ${dbName};" + } + + sql "DROP TABLE IF EXISTS ${tableName}" +} +