Skip to content

Commit 4eb9b67

Browse files
committed
Add support for extended transaction definitions
[resolves #104] Signed-off-by: Mark Paluch <[email protected]>
1 parent 869d93f commit 4eb9b67

File tree

3 files changed

+27
-2
lines changed

3 files changed

+27
-2
lines changed

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
<mockito.version>3.5.15</mockito.version>
4141
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4242
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
43-
<r2dbc-spi.version>0.8.1.RELEASE</r2dbc-spi.version>
43+
<r2dbc-spi.version>0.9.0.BUILD-SNAPSHOT</r2dbc-spi.version>
4444
<reactor.version>Dysprosium-SR12</reactor.version>
4545
</properties>
4646

src/main/java/io/r2dbc/pool/PooledConnection.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.r2dbc.spi.ConnectionMetadata;
2222
import io.r2dbc.spi.IsolationLevel;
2323
import io.r2dbc.spi.Statement;
24+
import io.r2dbc.spi.TransactionDefinition;
2425
import io.r2dbc.spi.ValidationDepth;
2526
import io.r2dbc.spi.Wrapped;
2627
import org.reactivestreams.Publisher;
@@ -69,7 +70,13 @@ final class PooledConnection implements Connection, Wrapped<Connection> {
6970
@Override
7071
public Mono<Void> beginTransaction() {
7172
assertNotClosed();
72-
return Mono.from(this.connection.beginTransaction()).doOnSubscribe(ignore -> this.inTransaction = true);
73+
return Mono.from(this.connection.beginTransaction()).doOnSubscribe(ignore -> this.inTransaction = true).doOnError(e -> this.inTransaction = false);
74+
}
75+
76+
@Override
77+
public Mono<Void> beginTransaction(TransactionDefinition definition) {
78+
assertNotClosed();
79+
return Mono.from(this.connection.beginTransaction(definition)).doOnSubscribe(ignore -> this.inTransaction = true).doOnError(e -> this.inTransaction = false);
7380
}
7481

7582
@Override

src/test/java/io/r2dbc/pool/PooledConnectionUnitTests.java

+18
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.r2dbc.pool;
1818

1919
import io.r2dbc.spi.Connection;
20+
import io.r2dbc.spi.TransactionDefinition;
2021
import io.r2dbc.spi.ValidationDepth;
2122
import org.junit.jupiter.api.BeforeEach;
2223
import org.junit.jupiter.api.Test;
@@ -28,6 +29,7 @@
2829
import java.util.concurrent.atomic.AtomicInteger;
2930

3031
import static org.assertj.core.api.Assertions.assertThat;
32+
import static org.mockito.ArgumentMatchers.any;
3133
import static org.mockito.Mockito.mock;
3234
import static org.mockito.Mockito.never;
3335
import static org.mockito.Mockito.reset;
@@ -52,6 +54,7 @@ void setUp() {
5254
when(pooledRefMock.poolable()).thenReturn(connectionMock);
5355
when(pooledRefMock.release()).thenReturn(Mono.empty());
5456
when(connectionMock.beginTransaction()).thenReturn(Mono.empty());
57+
when(connectionMock.beginTransaction(any())).thenReturn(Mono.empty());
5558
when(connectionMock.close()).thenReturn(Mono.empty());
5659
when(connectionMock.validate(ValidationDepth.LOCAL)).thenReturn(Mono.empty());
5760
}
@@ -71,6 +74,21 @@ void shouldRollbackUnfinishedTransaction() {
7174
assertThat(wasCalled).isTrue();
7275
}
7376

77+
@Test
78+
void shouldRollbackUnfinishedExtendedTransaction() {
79+
80+
AtomicBoolean wasCalled = new AtomicBoolean();
81+
when(connectionMock.rollbackTransaction()).thenReturn(Mono.<Void>empty().doOnSuccess(o -> wasCalled.set(true)));
82+
83+
PooledConnection connection = new PooledConnection(pooledRefMock);
84+
connection.beginTransaction(mock(TransactionDefinition.class)).as(StepVerifier::create).verifyComplete();
85+
86+
connection.close().as(StepVerifier::create).verifyComplete();
87+
88+
verify(connectionMock).rollbackTransaction();
89+
assertThat(wasCalled).isTrue();
90+
}
91+
7492
@Test
7593
void shouldPristineTransactionLeavesTransactionalStateAsIs() {
7694

0 commit comments

Comments
 (0)