Skip to content

Commit a5a52dd

Browse files
sollhuicjj2010
authored andcommitted
[fix](cloud) ensure afterCommit/afterAbort will always be called when commit/abort transaction fails (apache#41267)
Ensure afterCommit/afterAbort will always be called when commit/abort transaction fails. Otherwise, it may cause some problems, such as the routing load getting stuck.
1 parent 5535407 commit a5a52dd

File tree

1 file changed

+79
-38
lines changed

1 file changed

+79
-38
lines changed

fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java

+79-38
Original file line numberDiff line numberDiff line change
@@ -512,22 +512,35 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
512512
}
513513

514514
final CommitTxnRequest commitTxnRequest = builder.build();
515+
boolean txnOperated = false;
516+
TransactionState txnState = null;
517+
TxnStateChangeCallback cb = null;
518+
long callbackId = 0L;
515519
try {
516-
commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
517-
} catch (UserException e) {
518-
// For routine load, it is necessary to release the write lock when commit transaction fails,
519-
// otherwise it will cause the lock added in beforeCommitted to not be released.
520+
txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
521+
txnOperated = true;
522+
} finally {
520523
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
521524
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
522-
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
525+
callbackId = rlTaskTxnCommitAttachment.getJobId();
526+
} else if (txnState != null) {
527+
callbackId = txnState.getCallbackId();
528+
}
529+
530+
cb = callbackFactory.getCallback(callbackId);
531+
if (cb != null) {
532+
LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}",
533+
transactionId, callbackId, txnState);
534+
cb.afterCommitted(txnState, txnOperated);
535+
cb.afterVisible(txnState, txnOperated);
523536
}
524-
throw e;
525537
}
526538
}
527539

528-
private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId,
540+
private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId,
529541
List<Table> tableList) throws UserException {
530542
CommitTxnResponse commitTxnResponse = null;
543+
TransactionState txnState = null;
531544
int retryTime = 0;
532545

533546
try {
@@ -578,19 +591,13 @@ private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, bo
578591
throw new UserException("internal error, " + internalMsgBuilder.toString());
579592
}
580593

581-
TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
582-
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
583-
if (cb != null) {
584-
LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}",
585-
txnState.getTransactionId(), txnState.getCallbackId(), txnState);
586-
cb.afterCommitted(txnState, true);
587-
cb.afterVisible(txnState, true);
588-
}
594+
txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
589595
if (MetricRepo.isInit) {
590596
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
591597
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() - txnState.getPrepareTime());
592598
}
593599
afterCommitTxnResp(commitTxnResponse);
600+
return txnState;
594601
}
595602

596603
private List<OlapTable> getMowTableList(List<Table> tableList) {
@@ -990,9 +997,24 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
990997
}
991998

992999
final CommitTxnRequest commitTxnRequest = builder.build();
993-
commitTxn(commitTxnRequest, transactionId, false, db.getId(),
994-
subTransactionStates.stream().map(SubTransactionState::getTable)
1000+
TransactionState txnState = null;
1001+
boolean txnOperated = false;
1002+
try {
1003+
txnState = commitTxn(commitTxnRequest, transactionId, false, db.getId(),
1004+
subTransactionStates.stream().map(SubTransactionState::getTable)
9951005
.collect(Collectors.toList()));
1006+
txnOperated = true;
1007+
} finally {
1008+
if (txnState != null) {
1009+
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
1010+
if (cb != null) {
1011+
LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}",
1012+
txnState.getTransactionId(), txnState.getCallbackId(), txnState);
1013+
cb.afterCommitted(txnState, txnOperated);
1014+
cb.afterVisible(txnState, txnOperated);
1015+
}
1016+
}
1017+
}
9961018
return true;
9971019
}
9981020

@@ -1042,8 +1064,6 @@ public void abortTransaction(Long dbId, Long transactionId, String reason) throw
10421064
@Override
10431065
public void abortTransaction(Long dbId, Long transactionId, String reason,
10441066
TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
1045-
LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId);
1046-
10471067
if (txnCommitAttachment != null) {
10481068
if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
10491069
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
@@ -1058,6 +1078,18 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,
10581078
}
10591079
}
10601080

1081+
AbortTxnResponse abortTxnResponse = null;
1082+
try {
1083+
abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null, null);
1084+
} finally {
1085+
handleAfterAbort(abortTxnResponse, txnCommitAttachment, transactionId);
1086+
}
1087+
}
1088+
1089+
private AbortTxnResponse abortTransactionImpl(Long dbId, Long transactionId, String reason,
1090+
TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
1091+
LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId);
1092+
10611093
AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder();
10621094
builder.setDbId(dbId);
10631095
builder.setTxnId(transactionId);
@@ -1089,27 +1121,43 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,
10891121
Preconditions.checkNotNull(abortTxnResponse.getStatus());
10901122
} catch (RpcException e) {
10911123
LOG.warn("abortTxn failed, transactionId:{}, Exception", transactionId, e);
1092-
// For routine load, it is necessary to release the write lock when abort transaction fails,
1093-
// otherwise it will cause the lock added in beforeAborted to not be released.
1094-
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
1095-
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
1096-
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
1097-
}
10981124
throw new UserException("abortTxn failed, errMsg:" + e.getMessage());
10991125
}
11001126
afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), txnCommitAttachment);
1127+
return abortTxnResponse;
1128+
}
1129+
1130+
private void handleAfterAbort(AbortTxnResponse abortTxnResponse, TxnCommitAttachment txnCommitAttachment,
1131+
long transactionId) throws UserException {
1132+
TransactionState txnState = new TransactionState();
1133+
boolean txnOperated = false;
1134+
long callbackId = 0L;
1135+
TxnStateChangeCallback cb = null;
1136+
String abortReason = "";
1137+
1138+
if (abortTxnResponse != null) {
1139+
txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
1140+
txnOperated = abortTxnResponse.getStatus().getCode() == MetaServiceCode.OK;
1141+
callbackId = txnState.getCallbackId();
1142+
abortReason = txnState.getReason();
1143+
}
1144+
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
1145+
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
1146+
callbackId = rlTaskTxnCommitAttachment.getJobId();
1147+
}
1148+
1149+
cb = callbackFactory.getCallback(callbackId);
1150+
if (cb != null) {
1151+
LOG.info("run txn callback, txnId:{} callbackId:{}, txnState:{}",
1152+
transactionId, callbackId, txnState);
1153+
cb.afterAborted(txnState, txnOperated, abortReason);
1154+
}
11011155
}
11021156

11031157
private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOrLabel,
11041158
TxnCommitAttachment txnCommitAttachment) throws UserException {
11051159
if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) {
11061160
LOG.warn("abortTxn failed, transaction:{}, response:{}", txnIdOrLabel, abortTxnResponse);
1107-
// For routine load, it is necessary to release the write lock when abort transaction fails,
1108-
// otherwise it will cause the lock added in beforeAborted to not be released.
1109-
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
1110-
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
1111-
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
1112-
}
11131161
switch (abortTxnResponse.getStatus().getCode()) {
11141162
case TXN_ID_NOT_FOUND:
11151163
case TXN_LABEL_NOT_FOUND:
@@ -1125,13 +1173,6 @@ private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOr
11251173
}
11261174
}
11271175

1128-
TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
1129-
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
1130-
if (cb != null) {
1131-
LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(),
1132-
txnState.getCallbackId());
1133-
cb.afterAborted(txnState, true, txnState.getReason());
1134-
}
11351176
if (MetricRepo.isInit) {
11361177
MetricRepo.COUNTER_TXN_FAILED.increase(1L);
11371178
}

0 commit comments

Comments
 (0)