Skip to content

Commit 7424903

Browse files
committed
Await ReadyForQuery before emitting errors from transactional control methods.
commitTransaction, rollbackTransaction and other methods now await completion of the exchange before emitting error signals to properly synchronize completion. Previously, error signals were emitted before updating the transaction state which could lead to invalid cleanup states if e.g. the commit failed. [resolves #541] Signed-off-by: Mark Paluch <[email protected]>
1 parent 9f4d2a6 commit 7424903

File tree

3 files changed

+66
-7
lines changed

3 files changed

+66
-7
lines changed

src/main/java/io/r2dbc/postgresql/ExceptionFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ static ExceptionFactory withSql(String sql) {
6060
* @return the {@link R2dbcException}.
6161
* @see ErrorResponse
6262
*/
63-
private static R2dbcException createException(ErrorResponse response, String sql) {
63+
static R2dbcException createException(ErrorResponse response, String sql) {
6464

6565
ErrorDetails errorDetails = new ErrorDetails(response.getFields());
6666

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+44-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.r2dbc.postgresql;
1818

19+
import io.r2dbc.postgresql.api.ErrorDetails;
1920
import io.r2dbc.postgresql.api.Notification;
2021
import io.r2dbc.postgresql.api.PostgresqlResult;
2122
import io.r2dbc.postgresql.api.PostgresqlStatement;
@@ -25,11 +26,15 @@
2526
import io.r2dbc.postgresql.client.SimpleQueryMessageFlow;
2627
import io.r2dbc.postgresql.client.TransactionStatus;
2728
import io.r2dbc.postgresql.codec.Codecs;
29+
import io.r2dbc.postgresql.message.backend.BackendMessage;
30+
import io.r2dbc.postgresql.message.backend.CommandComplete;
31+
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2832
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2933
import io.r2dbc.postgresql.util.Assert;
3034
import io.r2dbc.postgresql.util.Operators;
3135
import io.r2dbc.spi.Connection;
3236
import io.r2dbc.spi.IsolationLevel;
37+
import io.r2dbc.spi.R2dbcException;
3338
import io.r2dbc.spi.ValidationDepth;
3439
import org.reactivestreams.Publisher;
3540
import org.reactivestreams.Subscriber;
@@ -112,9 +117,33 @@ public Mono<Void> close() {
112117

113118
@Override
114119
public Mono<Void> commitTransaction() {
120+
121+
AtomicReference<R2dbcException> ref = new AtomicReference<>();
115122
return useTransactionStatus(transactionStatus -> {
116123
if (IDLE != transactionStatus) {
117-
return exchange("COMMIT");
124+
return Flux.from(exchange("COMMIT"))
125+
.filter(CommandComplete.class::isInstance)
126+
.cast(CommandComplete.class)
127+
.<BackendMessage>handle((message, sink) -> {
128+
129+
// Certain backend versions (e.g. 12.2, 11.7, 10.12, 9.6.17, 9.5.21, etc)
130+
// silently rollback the transaction in the response to COMMIT statement
131+
// in case the transaction has failed.
132+
// See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org
133+
134+
if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) {
135+
ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " +
136+
"failure is not known (check server logs?)");
137+
ref.set(new ExceptionFactory.PostgresqlRollbackException(details));
138+
return;
139+
}
140+
141+
sink.next(message);
142+
}).doOnComplete(() -> {
143+
if (ref.get() != null) {
144+
throw ref.get();
145+
}
146+
});
118147
} else {
119148
this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus);
120149
return Mono.empty();
@@ -358,9 +387,21 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {
358387

359388
@SuppressWarnings("unchecked")
360389
private <T> Publisher<T> exchange(String sql) {
361-
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
390+
AtomicReference<R2dbcException> ref = new AtomicReference<>();
362391
return (Publisher<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
363-
.handle(exceptionFactory::handleErrorResponse);
392+
.handle((backendMessage, synchronousSink) -> {
393+
394+
if (backendMessage instanceof ErrorResponse) {
395+
ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql));
396+
} else {
397+
synchronousSink.next(backendMessage);
398+
}
399+
})
400+
.doOnComplete(() -> {
401+
if (ref.get() != null) {
402+
throw ref.get();
403+
}
404+
});
364405
}
365406

366407
/**

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionErrorsIntegrationTests.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import io.r2dbc.postgresql.api.PostgresqlConnection;
2020
import io.r2dbc.postgresql.api.PostgresqlResult;
21+
import io.r2dbc.postgresql.client.Client;
22+
import io.r2dbc.postgresql.client.TransactionStatus;
2123
import io.r2dbc.spi.R2dbcBadGrammarException;
22-
import org.awaitility.Awaitility;
24+
import io.r2dbc.spi.R2dbcException;
2325
import org.junit.jupiter.api.Test;
2426
import reactor.test.StepVerifier;
2527

28+
import java.lang.reflect.Field;
29+
2630
import static org.assertj.core.api.Assertions.assertThat;
2731

2832
/**
@@ -36,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() {
3640
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
3741
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);
3842

39-
this.connection.commitTransaction().as(StepVerifier::create).verifyComplete();
43+
this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> {
44+
assertThat(throwable).isInstanceOf(R2dbcException.class);
45+
46+
Client client = extractClient();
47+
assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE);
48+
});
4049

41-
Awaitility.await().until(() -> this.connection.isAutoCommit());
4250
assertThat(this.connection.isAutoCommit()).isTrue();
4351
}
4452

53+
private Client extractClient() {
54+
try {
55+
Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client");
56+
field.setAccessible(true);
57+
return (Client) field.get(this.connection);
58+
} catch (ReflectiveOperationException e) {
59+
throw new RuntimeException(e);
60+
}
61+
}
62+
4563
@Test
4664
void rollbackShouldRecoverFromFailedTransaction() {
4765

0 commit comments

Comments
 (0)