@@ -505,7 +505,17 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
505
505
}
506
506
507
507
final CommitTxnRequest commitTxnRequest = builder .build ();
508
- commitTxn (commitTxnRequest , transactionId , is2PC , dbId , tableList );
508
+ try {
509
+ commitTxn (commitTxnRequest , transactionId , is2PC , dbId , tableList );
510
+ } catch (UserException e ) {
511
+ // For routine load, it is necessary to release the write lock when commit transaction fails,
512
+ // otherwise it will cause the lock added in beforeCommitted to not be released.
513
+ if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment ) {
514
+ RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment ) txnCommitAttachment ;
515
+ Env .getCurrentEnv ().getRoutineLoadManager ().getJob (rlTaskTxnCommitAttachment .getJobId ()).writeUnlock ();
516
+ }
517
+ throw e ;
518
+ }
509
519
}
510
520
511
521
private void commitTxn (CommitTxnRequest commitTxnRequest , long transactionId , boolean is2PC , long dbId ,
@@ -1037,6 +1047,12 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,
1037
1047
Preconditions .checkNotNull (abortTxnResponse .getStatus ());
1038
1048
} catch (RpcException e ) {
1039
1049
LOG .warn ("abortTxn failed, transactionId:{}, Exception" , transactionId , e );
1050
+ // For routine load, it is necessary to release the write lock when abort transaction fails,
1051
+ // otherwise it will cause the lock added in beforeAborted to not be released.
1052
+ if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment ) {
1053
+ RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment ) txnCommitAttachment ;
1054
+ Env .getCurrentEnv ().getRoutineLoadManager ().getJob (rlTaskTxnCommitAttachment .getJobId ()).writeUnlock ();
1055
+ }
1040
1056
throw new UserException ("abortTxn failed, errMsg:" + e .getMessage ());
1041
1057
}
1042
1058
0 commit comments