Skip to content

Commit 0cac9c5

Browse files
[feature](cloud) Add lazy commit mechanism for commit_txn
Motivation: * In cloud mode we use `foundationdb` for storing rowset meta, when we load data with many rowsets, `commit_txn` will failed because `foundationdb` currently limits all transactions to be below 10 MB in size, So we spilt `commit_txn` into several sub fdb txn to solve the problem How: Now `commit_txn` main flow like this sub txn 1: 1. update partition `VersionPB` with txn_id 2. submit async task to txn_lazy_committer txn_lazy_committer: sub txn 2: convert tmp rowset meta per batch sub txn 3: convert tmp rowset meta per batch ...... sub txn n: make txn visible and remove txn_id in `VersionPB`
1 parent 355ffa5 commit 0cac9c5

13 files changed

+1597
-452
lines changed

cloud/src/common/config.h

+4
Original file line numberDiff line numberDiff line change
@@ -201,4 +201,8 @@ CONF_Validator(s3_client_http_scheme, [](const std::string& config) -> bool {
201201
// Max retry times for object storage request
202202
CONF_mInt64(max_s3_client_retry, "10");
203203

204+
CONF_Bool(enable_txn_lazy_commit, "true");
205+
CONF_Int32(txn_lazy_commit_rowsets_thresold, "1");
206+
CONF_Int32(txn_lazy_commit_worker_num, "8");
207+
CONF_Int32(txn_lazy_max_rowsets_per_batch, "256");
204208
} // namespace doris::cloud::config

cloud/src/meta-service/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ add_library(MetaService
2424
doris_txn.cpp
2525
mem_txn_kv.cpp
2626
http_encode_key.cpp
27+
txn_lazy_committer.cpp
2728
)

cloud/src/meta-service/meta_service.cpp

+78-37
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ MetaServiceImpl::MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv,
7575
resource_mgr_ = resource_mgr;
7676
rate_limiter_ = rate_limiter;
7777
rate_limiter_->init(this);
78+
txn_lazy_committer_ = std::make_shared<TxnLazyCommitter>(txn_kv_);
7879
}
7980

8081
MetaServiceImpl::~MetaServiceImpl() = default;
@@ -237,46 +238,65 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,
237238
partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key);
238239
}
239240

240-
std::unique_ptr<Transaction> txn;
241-
TxnErrorCode err = txn_kv_->create_txn(&txn);
242-
if (err != TxnErrorCode::TXN_OK) {
243-
msg = "failed to create txn";
244-
code = cast_as<ErrCategory::CREATE>(err);
245-
return;
246-
}
241+
do {
242+
code = MetaServiceCode::OK;
243+
std::unique_ptr<Transaction> txn;
244+
TxnErrorCode err = txn_kv_->create_txn(&txn);
245+
if (err != TxnErrorCode::TXN_OK) {
246+
msg = "failed to create txn";
247+
code = cast_as<ErrCategory::CREATE>(err);
248+
return;
249+
}
247250

248-
std::string ver_val;
249-
// 0 for success get a key, 1 for key not found, negative for error
250-
err = txn->get(ver_key, &ver_val);
251-
VLOG_DEBUG << "xxx get version_key=" << hex(ver_key);
252-
if (err == TxnErrorCode::TXN_OK) {
253-
if (is_table_version) {
254-
int64_t version = 0;
255-
if (!txn->decode_atomic_int(ver_val, &version)) {
256-
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
257-
msg = "malformed table version value";
258-
return;
259-
}
260-
response->set_version(version);
261-
} else {
262-
VersionPB version_pb;
263-
if (!version_pb.ParseFromString(ver_val)) {
264-
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
265-
msg = "malformed version value";
266-
return;
251+
std::string ver_val;
252+
// 0 for success get a key, 1 for key not found, negative for error
253+
err = txn->get(ver_key, &ver_val);
254+
VLOG_DEBUG << "xxx get version_key=" << hex(ver_key);
255+
if (err == TxnErrorCode::TXN_OK) {
256+
if (is_table_version) {
257+
int64_t version = 0;
258+
if (!txn->decode_atomic_int(ver_val, &version)) {
259+
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
260+
msg = "malformed table version value";
261+
return;
262+
}
263+
response->set_version(version);
264+
} else {
265+
VersionPB version_pb;
266+
if (!version_pb.ParseFromString(ver_val)) {
267+
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
268+
msg = "malformed version value";
269+
return;
270+
}
271+
272+
if (version_pb.has_txn_id()) {
273+
txn.reset();
274+
std::shared_ptr<TxnLazyCommitTask> task =
275+
txn_lazy_committer_->submit(instance_id, version_pb.txn_id());
276+
std::pair<MetaServiceCode, std::string> ret = task->wait();
277+
code = ret.first;
278+
msg = ret.second;
279+
if (code != MetaServiceCode::OK) {
280+
LOG(WARNING)
281+
<< "wait txn lazy commit failed, txn_id=" << version_pb.txn_id();
282+
return;
283+
}
284+
continue;
285+
}
286+
287+
response->set_version(version_pb.version());
288+
response->add_version_update_time_ms(version_pb.update_time_ms());
267289
}
268-
response->set_version(version_pb.version());
269-
response->add_version_update_time_ms(version_pb.update_time_ms());
290+
{ TEST_SYNC_POINT_CALLBACK("get_version_code", &code); }
291+
return;
292+
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
293+
msg = "not found";
294+
code = MetaServiceCode::VERSION_NOT_FOUND;
295+
return;
270296
}
271-
{ TEST_SYNC_POINT_CALLBACK("get_version_code", &code); }
272-
return;
273-
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
274-
msg = "not found";
275-
code = MetaServiceCode::VERSION_NOT_FOUND;
276-
return;
277-
}
278-
msg = fmt::format("failed to get txn, err={}", err);
279-
code = cast_as<ErrCategory::READ>(err);
297+
msg = fmt::format("failed to get txn, err={}", err);
298+
code = cast_as<ErrCategory::READ>(err);
299+
} while (false);
280300
}
281301

282302
void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* controller,
@@ -326,8 +346,13 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
326346
std::vector<std::optional<std::string>> version_values;
327347
version_keys.reserve(BATCH_SIZE);
328348
version_values.reserve(BATCH_SIZE);
349+
329350
while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) &&
330351
response->versions_size() < response->partition_ids_size()) {
352+
TRY_AGAIN:
353+
response->clear_versions();
354+
code = MetaServiceCode::OK;
355+
331356
std::unique_ptr<Transaction> txn;
332357
TxnErrorCode err = txn_kv_->create_txn(&txn);
333358
if (err != TxnErrorCode::TXN_OK) {
@@ -387,11 +412,27 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
387412
msg = "malformed version value";
388413
break;
389414
}
415+
if (version_pb.has_txn_id()) {
416+
txn.reset();
417+
std::shared_ptr<TxnLazyCommitTask> task = txn_lazy_committer_->submit(
418+
instance_id, version_pb.txn_id());
419+
std::pair<MetaServiceCode, std::string> ret = task->wait();
420+
code = ret.first;
421+
msg = ret.second;
422+
if (code != MetaServiceCode::OK) {
423+
LOG(WARNING) << "wait txn lazy commit failed, txn_id="
424+
<< version_pb.txn_id();
425+
break;
426+
}
427+
goto TRY_AGAIN;
428+
}
390429
response->add_versions(version_pb.version());
391430
response->add_version_update_time_ms(version_pb.update_time_ms());
392431
}
393432
}
433+
if (code != MetaServiceCode::OK) break;
394434
}
435+
if (code != MetaServiceCode::OK) break;
395436
}
396437
if (code != MetaServiceCode::OK) {
397438
response->clear_partition_ids();

cloud/src/meta-service/meta_service.h

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "common/config.h"
3131
#include "cpp/sync_point.h"
3232
#include "meta-service/txn_kv.h"
33+
#include "meta-service/txn_lazy_committer.h"
3334
#include "rate-limiter/rate_limiter.h"
3435
#include "resource-manager/resource_manager.h"
3536

@@ -282,6 +283,7 @@ class MetaServiceImpl : public cloud::MetaService {
282283
std::shared_ptr<TxnKv> txn_kv_;
283284
std::shared_ptr<ResourceManager> resource_mgr_;
284285
std::shared_ptr<RateLimiter> rate_limiter_;
286+
std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
285287
};
286288

287289
class MetaServiceProxy final : public MetaService {

0 commit comments

Comments
 (0)