Skip to content

Commit b8502da

Browse files
author
Francesco Komauli
committed
Stream for subscribing to Postgres connection notices
[resolves pgjdbc#570] Postgres may send notice messages through a connection, which may contain log information or metadata related to the submitted commands. Notices travel on the same backend messaging subsystem used by notifications. The implementation processes NoticeResponse in a way similar to NotificationResponse, and exposes a coherent API.
1 parent 964b075 commit b8502da

9 files changed

+295
-0
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+86
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.r2dbc.postgresql.api.CopyInBuilder;
2020
import io.r2dbc.postgresql.api.ErrorDetails;
21+
import io.r2dbc.postgresql.api.Notice;
2122
import io.r2dbc.postgresql.api.Notification;
2223
import io.r2dbc.postgresql.api.PostgresTransactionDefinition;
2324
import io.r2dbc.postgresql.api.PostgresqlResult;
@@ -31,6 +32,8 @@
3132
import io.r2dbc.postgresql.message.backend.BackendMessage;
3233
import io.r2dbc.postgresql.message.backend.CommandComplete;
3334
import io.r2dbc.postgresql.message.backend.ErrorResponse;
35+
import io.r2dbc.postgresql.message.backend.Field;
36+
import io.r2dbc.postgresql.message.backend.NoticeResponse;
3437
import io.r2dbc.postgresql.message.backend.NotificationResponse;
3538
import io.r2dbc.postgresql.util.Assert;
3639
import io.r2dbc.postgresql.util.Operators;
@@ -53,6 +56,8 @@
5356
import reactor.util.annotation.Nullable;
5457

5558
import java.time.Duration;
59+
import java.util.EnumMap;
60+
import java.util.Map;
5661
import java.util.concurrent.atomic.AtomicReference;
5762
import java.util.function.Function;
5863

@@ -78,6 +83,8 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo
7883

7984
private final AtomicReference<NotificationAdapter> notificationAdapter = new AtomicReference<>();
8085

86+
private final AtomicReference<NoticeAdapter> noticeAdapter = new AtomicReference<>();
87+
8188
private volatile IsolationLevel isolationLevel;
8289

8390
private volatile IsolationLevel previousIsolationLevel;
@@ -181,6 +188,12 @@ public Mono<Void> close() {
181188
if (notificationAdapter != null && this.notificationAdapter.compareAndSet(notificationAdapter, null)) {
182189
notificationAdapter.dispose();
183190
}
191+
192+
NoticeAdapter noticeAdapter = this.noticeAdapter.get();
193+
194+
if (noticeAdapter != null && this.noticeAdapter.compareAndSet(noticeAdapter, null)) {
195+
noticeAdapter.dispose();
196+
}
184197
}).then(Mono.empty());
185198
}
186199

@@ -282,6 +295,24 @@ public Flux<Notification> getNotifications() {
282295
return notifications.getEvents();
283296
}
284297

298+
@Override
299+
public Flux<Notice> getNotices() {
300+
NoticeAdapter notices = this.noticeAdapter.get();
301+
302+
if (notices == null) {
303+
304+
notices = new NoticeAdapter();
305+
306+
if (this.noticeAdapter.compareAndSet(null, notices)) {
307+
notices.register(this.client);
308+
} else {
309+
notices = this.noticeAdapter.get();
310+
}
311+
}
312+
313+
return notices.getEvents();
314+
}
315+
285316
@Override
286317
public PostgresqlConnectionMetadata getMetadata() {
287318
return new PostgresqlConnectionMetadata(this.client.getVersion());
@@ -537,6 +568,61 @@ Flux<Notification> getEvents() {
537568

538569
}
539570

571+
/**
572+
* Adapter to publish {@link Notice}s.
573+
*/
574+
static class NoticeAdapter {
575+
576+
private final Sinks.Many<Notice> sink = Sinks.many().multicast().directBestEffort();
577+
578+
@Nullable
579+
private volatile Disposable subscription = null;
580+
581+
void dispose() {
582+
Disposable subscription = this.subscription;
583+
if (subscription != null && !subscription.isDisposed()) {
584+
subscription.dispose();
585+
}
586+
}
587+
588+
void register(Client client) {
589+
590+
BaseSubscriber<NoticeResponse> subscriber = new BaseSubscriber<NoticeResponse>() {
591+
592+
@Override
593+
protected void hookOnSubscribe(Subscription subscription) {
594+
subscription.request(Long.MAX_VALUE);
595+
}
596+
597+
@Override
598+
public void hookOnNext(NoticeResponse noticeResponse) {
599+
final Map<Field.FieldType, String> fieldsMap = new EnumMap<>(Field.FieldType.class);
600+
for (Field field : noticeResponse.getFields()) {
601+
fieldsMap.put(field.getType(), field.getValue());
602+
}
603+
NoticeAdapter.this.sink.emitNext(new Notice(fieldsMap), Sinks.EmitFailureHandler.FAIL_FAST);
604+
}
605+
606+
@Override
607+
public void hookOnError(Throwable throwable) {
608+
NoticeAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
609+
}
610+
611+
@Override
612+
public void hookOnComplete() {
613+
NoticeAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
614+
}
615+
};
616+
617+
this.subscription = subscriber;
618+
client.addNoticeListener(subscriber);
619+
}
620+
621+
Flux<Notice> getEvents() {
622+
return this.sink.asFlux();
623+
}
624+
}
625+
540626
enum EmptyTransactionDefinition implements TransactionDefinition {
541627

542628
INSTANCE;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.r2dbc.postgresql.api;
2+
3+
import io.r2dbc.postgresql.message.backend.Field;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* Postgres notice.
9+
*/
10+
public class Notice {
11+
12+
/**
13+
* Notice messages by {@link Field.FieldType}.
14+
*/
15+
public final Map<Field.FieldType, String> fields;
16+
17+
/**
18+
* @param fields notice messages by {@link Field.FieldType}
19+
*/
20+
public Notice(Map<Field.FieldType, String> fields) {
21+
this.fields = fields;
22+
}
23+
}

src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java

+9
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ default Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin) {
110110
*/
111111
Flux<Notification> getNotifications();
112112

113+
/**
114+
* Return a {@link Flux} of {@link Notice} received from the connection. The stream is a hot stream producing messages as they are received. Notices received by this
115+
* connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport
116+
* connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
117+
*
118+
* @return a hot {@link Flux} of {@link Notice Notices}
119+
*/
120+
Flux<Notice> getNotices();
121+
113122
/**
114123
* Cancel currently running query by sending {@link CancelRequest} to a server.
115124
*

src/main/java/io/r2dbc/postgresql/client/Client.java

+22
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.netty.buffer.ByteBufAllocator;
2020
import io.r2dbc.postgresql.message.backend.BackendMessage;
21+
import io.r2dbc.postgresql.message.backend.NoticeResponse;
2122
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2223
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2324
import io.r2dbc.postgresql.message.frontend.CancelRequest;
@@ -60,6 +61,27 @@ public interface Client {
6061
*/
6162
void addNotificationListener(Subscriber<NotificationResponse> consumer);
6263

64+
/**
65+
* Add a consumer of notices. Notices received by this connection are sent to the {@link Consumer notice consumer}. Note that connection errors and events such as
66+
* disconnects are not visible to the {@link Consumer notice consumer}.
67+
*
68+
* @param consumer the consumer of notices
69+
* @return a new {@link Disposable} that can be used to cancel the underlying subscription
70+
* @throws IllegalArgumentException if {@code consumer} is {@code null}
71+
*/
72+
Disposable addNoticeListener(Consumer<NoticeResponse> consumer);
73+
74+
/**
75+
* Add a consumer of notices. Notices received by this connection are sent to the {@link Subscriber notice listener}. When the client gets {@link #close() closed}, the
76+
* subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
77+
*
78+
* @param consumer the consumer of notices
79+
* @return a new {@link Disposable} that can be used to cancel the underlying subscription
80+
* @throws IllegalArgumentException if {@code consumer} is {@code null}
81+
* @since 1.0.0
82+
*/
83+
void addNoticeListener(Subscriber<NoticeResponse> consumer);
84+
6385
/**
6486
* Release any resources held by the {@link Client}.
6587
*

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+15
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public final class ReactorNettyClient implements Client {
112112
private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();
113113

114114
private final Sinks.Many<NotificationResponse> notificationProcessor = Sinks.many().multicast().directBestEffort();
115+
private final Sinks.Many<NoticeResponse> noticeProcessor = Sinks.many().multicast().directBestEffort();
115116

116117
private final AtomicBoolean isClosed = new AtomicBoolean(false);
117118

@@ -186,6 +187,7 @@ public Mono<Void> close() {
186187
return Mono.defer(() -> {
187188

188189
this.notificationProcessor.tryEmitComplete();
190+
this.noticeProcessor.tryEmitComplete();
189191

190192
drainError(EXPECTED);
191193

@@ -269,6 +271,8 @@ private boolean consumeMessage(BackendMessage message) {
269271

270272
if (message.getClass() == NoticeResponse.class) {
271273

274+
this.noticeProcessor.tryEmitNext((NoticeResponse) message);
275+
272276
this.settings.getNoticeLogLevel().log(logger, () -> this.context.getMessage(String.format("Notice: %s", toString(((NoticeResponse) message).getFields()))));
273277
return true;
274278
}
@@ -441,6 +445,16 @@ public void addNotificationListener(Subscriber<NotificationResponse> consumer) {
441445
this.notificationProcessor.asFlux().subscribe(consumer);
442446
}
443447

448+
@Override
449+
public Disposable addNoticeListener(Consumer<NoticeResponse> consumer) {
450+
return this.noticeProcessor.asFlux().subscribe(consumer);
451+
}
452+
453+
@Override
454+
public void addNoticeListener(Subscriber<NoticeResponse> consumer) {
455+
this.noticeProcessor.asFlux().subscribe(consumer);
456+
}
457+
444458
@Override
445459
public ByteBufAllocator getByteBufAllocator() {
446460
return this.byteBufAllocator;
@@ -530,6 +544,7 @@ private void drainError(Supplier<? extends Throwable> supplier) {
530544
this.messageSubscriber.close(supplier);
531545

532546
this.notificationProcessor.tryEmitError(supplier.get());
547+
this.noticeProcessor.tryEmitError(supplier.get());
533548
}
534549

535550
private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.netty.channel.Channel;
20+
import io.r2dbc.postgresql.api.Notice;
21+
import io.r2dbc.postgresql.api.PostgresqlConnection;
22+
import io.r2dbc.postgresql.api.PostgresqlResult;
23+
import io.r2dbc.postgresql.message.backend.Field;
24+
import io.r2dbc.postgresql.util.ConnectionIntrospector;
25+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
26+
import java.time.Duration;
27+
import org.junit.jupiter.api.Test;
28+
import reactor.core.Disposable;
29+
import reactor.test.StepVerifier;
30+
31+
import java.util.concurrent.BlockingQueue;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.LinkedBlockingQueue;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
/**
39+
* Integration tests for {@link Notice} through {@link PostgresqlConnection#getNotices()}.
40+
*/
41+
final class PostgresNoticeIntegrationTests extends AbstractIntegrationTests {
42+
43+
private static final String RAISE_INFO_FUNCTION =
44+
"CREATE OR REPLACE FUNCTION raise_info(text)"
45+
+ " RETURNS void AS $$"
46+
+ " BEGIN"
47+
+ " RAISE INFO '%', $1;"
48+
+ " END;"
49+
+ " $$ LANGUAGE plpgsql;";
50+
51+
@Test
52+
void shouldReceivePubSubNotices() throws Exception {
53+
54+
BlockingQueue<Notice> notices = new LinkedBlockingQueue<>();
55+
56+
this.connectionFactory.create().flatMap(it -> {
57+
it.getNotices().doOnNext(notices::add).subscribe();
58+
return it.createStatement(RAISE_INFO_FUNCTION).execute().then()
59+
.then(it.createStatement("SELECT raise_info('Test Message')").execute().then());
60+
}).block(Duration.ofSeconds(10));
61+
62+
Notice notice = notices.poll(10, TimeUnit.SECONDS);
63+
64+
assertThat(notice).isNotNull();
65+
assertThat(notice.fields.containsKey(Field.FieldType.MESSAGE)).isTrue();
66+
assertThat(notice.fields.get(Field.FieldType.MESSAGE)).isEqualTo("Test Message");
67+
}
68+
69+
@Test
70+
void listenShouldCompleteOnConnectionClose() {
71+
72+
PostgresqlConnection connection = this.connectionFactory.create().block();
73+
74+
connection.getNotices().as(StepVerifier::create).expectSubscription()
75+
.then(() -> connection.close().subscribe())
76+
.verifyComplete();
77+
}
78+
79+
@Test
80+
void listenShouldFailOnConnectionDisconnected() {
81+
82+
PostgresqlConnection connection = this.connectionFactory.create().block();
83+
84+
connection.getNotices().as(StepVerifier::create).expectSubscription()
85+
.then(() -> {
86+
Channel channel = ConnectionIntrospector.of(connection).getChannel();
87+
channel.close();
88+
})
89+
.verifyError(R2dbcNonTransientResourceException.class);
90+
}
91+
}

src/test/java/io/r2dbc/postgresql/api/MockPostgresqlConnection.java

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ public Flux<Notification> getNotifications() {
7777
return Flux.empty();
7878
}
7979

80+
@Override
81+
public Flux<Notice> getNotices() {
82+
return Flux.empty();
83+
}
84+
8085
@Override
8186
public Mono<Void> cancelRequest() {
8287
return Mono.empty();

0 commit comments

Comments
 (0)