@@ -75,6 +75,7 @@ MetaServiceImpl::MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv,
75
75
resource_mgr_ = resource_mgr;
76
76
rate_limiter_ = rate_limiter;
77
77
rate_limiter_->init (this );
78
+ txn_lazy_committer_ = std::make_shared<TxnLazyCommitter>(txn_kv_);
78
79
}
79
80
80
81
MetaServiceImpl::~MetaServiceImpl () = default ;
@@ -237,46 +238,64 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,
237
238
partition_version_key ({instance_id, db_id, table_id, partition_id}, &ver_key);
238
239
}
239
240
240
- std::unique_ptr<Transaction> txn;
241
- TxnErrorCode err = txn_kv_->create_txn (&txn);
242
- if (err != TxnErrorCode::TXN_OK) {
243
- msg = " failed to create txn" ;
244
- code = cast_as<ErrCategory::CREATE>(err);
245
- return ;
246
- }
241
+ do {
242
+ code = MetaServiceCode::OK;
243
+ std::unique_ptr<Transaction> txn;
244
+ TxnErrorCode err = txn_kv_->create_txn (&txn);
245
+ if (err != TxnErrorCode::TXN_OK) {
246
+ msg = " failed to create txn" ;
247
+ code = cast_as<ErrCategory::CREATE>(err);
248
+ return ;
249
+ }
247
250
248
- std::string ver_val;
249
- // 0 for success get a key, 1 for key not found, negative for error
250
- err = txn->get (ver_key, &ver_val);
251
- VLOG_DEBUG << " xxx get version_key=" << hex (ver_key);
252
- if (err == TxnErrorCode::TXN_OK) {
253
- if (is_table_version) {
254
- int64_t version = 0 ;
255
- if (!txn->decode_atomic_int (ver_val, &version)) {
256
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
257
- msg = " malformed table version value" ;
258
- return ;
259
- }
260
- response->set_version (version);
261
- } else {
262
- VersionPB version_pb;
263
- if (!version_pb.ParseFromString (ver_val)) {
264
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
265
- msg = " malformed version value" ;
266
- return ;
251
+ std::string ver_val;
252
+ // 0 for success get a key, 1 for key not found, negative for error
253
+ err = txn->get (ver_key, &ver_val);
254
+ VLOG_DEBUG << " xxx get version_key=" << hex (ver_key);
255
+ if (err == TxnErrorCode::TXN_OK) {
256
+ if (is_table_version) {
257
+ int64_t version = 0 ;
258
+ if (!txn->decode_atomic_int (ver_val, &version)) {
259
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
260
+ msg = " malformed table version value" ;
261
+ return ;
262
+ }
263
+ response->set_version (version);
264
+ } else {
265
+ VersionPB version_pb;
266
+ if (!version_pb.ParseFromString (ver_val)) {
267
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
268
+ msg = " malformed version value" ;
269
+ return ;
270
+ }
271
+
272
+ if (version_pb.has_txn_id ()) {
273
+ txn.reset ();
274
+ std::shared_ptr<TxnLazyCommitTask> task =
275
+ txn_lazy_committer_->submit (instance_id, version_pb.txn_id ());
276
+ std::pair<MetaServiceCode, std::string> ret = task->wait ();
277
+ code = ret.first ;
278
+ msg = ret.second ;
279
+ if (code != MetaServiceCode::OK) {
280
+ LOG (WARNING)
281
+ << " wait txn lazy commit failed, txn_id=" << version_pb.txn_id ();
282
+ return ;
283
+ }
284
+ }
285
+
286
+ response->set_version (version_pb.version ());
287
+ response->add_version_update_time_ms (version_pb.update_time_ms ());
267
288
}
268
- response->set_version (version_pb.version ());
269
- response->add_version_update_time_ms (version_pb.update_time_ms ());
289
+ { TEST_SYNC_POINT_CALLBACK (" get_version_code" , &code); }
290
+ return ;
291
+ } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
292
+ msg = " not found" ;
293
+ code = MetaServiceCode::VERSION_NOT_FOUND;
294
+ return ;
270
295
}
271
- { TEST_SYNC_POINT_CALLBACK (" get_version_code" , &code); }
272
- return ;
273
- } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
274
- msg = " not found" ;
275
- code = MetaServiceCode::VERSION_NOT_FOUND;
276
- return ;
277
- }
278
- msg = fmt::format (" failed to get txn, err={}" , err);
279
- code = cast_as<ErrCategory::READ>(err);
296
+ msg = fmt::format (" failed to get txn, err={}" , err);
297
+ code = cast_as<ErrCategory::READ>(err);
298
+ } while (false );
280
299
}
281
300
282
301
void MetaServiceImpl::batch_get_version (::google::protobuf::RpcController* controller,
@@ -326,8 +345,13 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
326
345
std::vector<std::optional<std::string>> version_values;
327
346
version_keys.reserve (BATCH_SIZE);
328
347
version_values.reserve (BATCH_SIZE);
348
+
329
349
while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) &&
330
350
response->versions_size () < response->partition_ids_size ()) {
351
+ TRY_AGAIN:
352
+ response->clear_versions ();
353
+ code = MetaServiceCode::OK;
354
+
331
355
std::unique_ptr<Transaction> txn;
332
356
TxnErrorCode err = txn_kv_->create_txn (&txn);
333
357
if (err != TxnErrorCode::TXN_OK) {
@@ -387,10 +411,30 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
387
411
msg = " malformed version value" ;
388
412
break ;
389
413
}
414
+ if (version_pb.has_txn_id ()) {
415
+ txn.reset ();
416
+ std::shared_ptr<TxnLazyCommitTask> task =
417
+ txn_lazy_committer_->submit (instance_id, version_pb.txn_id ());
418
+ std::pair<MetaServiceCode, std::string> ret = task->wait ();
419
+ code = ret.first ;
420
+ msg = ret.second ;
421
+ if (code != MetaServiceCode::OK) {
422
+ LOG (WARNING) << " wait txn lazy commit failed, txn_id="
423
+ << version_pb.txn_id ();
424
+ break ;
425
+ }
426
+ goto TRY_AGAIN;
427
+ }
390
428
response->add_versions (version_pb.version ());
391
429
response->add_version_update_time_ms (version_pb.update_time_ms ());
392
430
}
393
431
}
432
+ if (code != MetaServiceCode::OK) {
433
+ break ;
434
+ }
435
+ }
436
+ if (code != MetaServiceCode::OK) {
437
+ break ;
394
438
}
395
439
}
396
440
if (code != MetaServiceCode::OK) {
0 commit comments