@@ -75,6 +75,7 @@ MetaServiceImpl::MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv,
75
75
resource_mgr_ = resource_mgr;
76
76
rate_limiter_ = rate_limiter;
77
77
rate_limiter_->init (this );
78
+ txn_lazy_committer_ = std::make_shared<TxnLazyCommitter>(txn_kv_);
78
79
}
79
80
80
81
MetaServiceImpl::~MetaServiceImpl () = default ;
@@ -237,46 +238,65 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,
237
238
partition_version_key ({instance_id, db_id, table_id, partition_id}, &ver_key);
238
239
}
239
240
240
- std::unique_ptr<Transaction> txn;
241
- TxnErrorCode err = txn_kv_->create_txn (&txn);
242
- if (err != TxnErrorCode::TXN_OK) {
243
- msg = " failed to create txn" ;
244
- code = cast_as<ErrCategory::CREATE>(err);
245
- return ;
246
- }
241
+ do {
242
+ code = MetaServiceCode::OK;
243
+ std::unique_ptr<Transaction> txn;
244
+ TxnErrorCode err = txn_kv_->create_txn (&txn);
245
+ if (err != TxnErrorCode::TXN_OK) {
246
+ msg = " failed to create txn" ;
247
+ code = cast_as<ErrCategory::CREATE>(err);
248
+ return ;
249
+ }
247
250
248
- std::string ver_val;
249
- // 0 for success get a key, 1 for key not found, negative for error
250
- err = txn->get (ver_key, &ver_val);
251
- VLOG_DEBUG << " xxx get version_key=" << hex (ver_key);
252
- if (err == TxnErrorCode::TXN_OK) {
253
- if (is_table_version) {
254
- int64_t version = 0 ;
255
- if (!txn->decode_atomic_int (ver_val, &version)) {
256
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
257
- msg = " malformed table version value" ;
258
- return ;
259
- }
260
- response->set_version (version);
261
- } else {
262
- VersionPB version_pb;
263
- if (!version_pb.ParseFromString (ver_val)) {
264
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
265
- msg = " malformed version value" ;
266
- return ;
251
+ std::string ver_val;
252
+ // 0 for success get a key, 1 for key not found, negative for error
253
+ err = txn->get (ver_key, &ver_val);
254
+ VLOG_DEBUG << " xxx get version_key=" << hex (ver_key);
255
+ if (err == TxnErrorCode::TXN_OK) {
256
+ if (is_table_version) {
257
+ int64_t version = 0 ;
258
+ if (!txn->decode_atomic_int (ver_val, &version)) {
259
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
260
+ msg = " malformed table version value" ;
261
+ return ;
262
+ }
263
+ response->set_version (version);
264
+ } else {
265
+ VersionPB version_pb;
266
+ if (!version_pb.ParseFromString (ver_val)) {
267
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
268
+ msg = " malformed version value" ;
269
+ return ;
270
+ }
271
+
272
+ if (version_pb.has_txn_id ()) {
273
+ txn.reset ();
274
+ std::shared_ptr<TxnLazyCommitTask> task =
275
+ txn_lazy_committer_->submit (instance_id, version_pb.txn_id ());
276
+ std::pair<MetaServiceCode, std::string> ret = task->wait ();
277
+ code = ret.first ;
278
+ msg = ret.second ;
279
+ if (code != MetaServiceCode::OK) {
280
+ LOG (WARNING)
281
+ << " wait txn lazy commit failed, txn_id=" << version_pb.txn_id ();
282
+ return ;
283
+ }
284
+ continue ;
285
+ }
286
+
287
+ response->set_version (version_pb.version ());
288
+ response->add_version_update_time_ms (version_pb.update_time_ms ());
267
289
}
268
- response->set_version (version_pb.version ());
269
- response->add_version_update_time_ms (version_pb.update_time_ms ());
290
+ { TEST_SYNC_POINT_CALLBACK (" get_version_code" , &code); }
291
+ return ;
292
+ } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
293
+ msg = " not found" ;
294
+ code = MetaServiceCode::VERSION_NOT_FOUND;
295
+ return ;
270
296
}
271
- { TEST_SYNC_POINT_CALLBACK (" get_version_code" , &code); }
272
- return ;
273
- } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
274
- msg = " not found" ;
275
- code = MetaServiceCode::VERSION_NOT_FOUND;
276
- return ;
277
- }
278
- msg = fmt::format (" failed to get txn, err={}" , err);
279
- code = cast_as<ErrCategory::READ>(err);
297
+ msg = fmt::format (" failed to get txn, err={}" , err);
298
+ code = cast_as<ErrCategory::READ>(err);
299
+ } while (false );
280
300
}
281
301
282
302
void MetaServiceImpl::batch_get_version (::google::protobuf::RpcController* controller,
@@ -326,8 +346,13 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
326
346
std::vector<std::optional<std::string>> version_values;
327
347
version_keys.reserve (BATCH_SIZE);
328
348
version_values.reserve (BATCH_SIZE);
349
+
329
350
while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) &&
330
351
response->versions_size () < response->partition_ids_size ()) {
352
+ TRY_AGAIN:
353
+ response->clear_versions ();
354
+ code = MetaServiceCode::OK;
355
+
331
356
std::unique_ptr<Transaction> txn;
332
357
TxnErrorCode err = txn_kv_->create_txn (&txn);
333
358
if (err != TxnErrorCode::TXN_OK) {
@@ -387,11 +412,27 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
387
412
msg = " malformed version value" ;
388
413
break ;
389
414
}
415
+ if (version_pb.has_txn_id ()) {
416
+ txn.reset ();
417
+ std::shared_ptr<TxnLazyCommitTask> task = txn_lazy_committer_->submit (
418
+ instance_id, version_pb.txn_id ());
419
+ std::pair<MetaServiceCode, std::string> ret = task->wait ();
420
+ code = ret.first ;
421
+ msg = ret.second ;
422
+ if (code != MetaServiceCode::OK) {
423
+ LOG (WARNING) << " wait txn lazy commit failed, txn_id="
424
+ << version_pb.txn_id ();
425
+ break ;
426
+ }
427
+ goto TRY_AGAIN;
428
+ }
390
429
response->add_versions (version_pb.version ());
391
430
response->add_version_update_time_ms (version_pb.update_time_ms ());
392
431
}
393
432
}
433
+ if (code != MetaServiceCode::OK) break ;
394
434
}
435
+ if (code != MetaServiceCode::OK) break ;
395
436
}
396
437
if (code != MetaServiceCode::OK) {
397
438
response->clear_partition_ids ();
0 commit comments