Skip to content

Commit 14928cb

Browse files
author
Francesco Komauli
committed
Stream for subscribing to Postgres connection notices
[resolves pgjdbc#570] Postgres may send notice messages through a connection, which may contain log information or metadata related to the submitted commands. Notices travel on the same backend messaging subsystem used by notifications. The implementation processes NoticeResponse in a way similar to NotificationResponse, and exposes a coherent API.
1 parent 87a058e commit 14928cb

9 files changed

+294
-10
lines changed

src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java

+84-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.r2dbc.postgresql.api.CopyInBuilder;
2020
import io.r2dbc.postgresql.api.ErrorDetails;
21+
import io.r2dbc.postgresql.api.Notice;
2122
import io.r2dbc.postgresql.api.Notification;
2223
import io.r2dbc.postgresql.api.PostgresTransactionDefinition;
2324
import io.r2dbc.postgresql.api.PostgresqlResult;
@@ -31,6 +32,8 @@
3132
import io.r2dbc.postgresql.message.backend.BackendMessage;
3233
import io.r2dbc.postgresql.message.backend.CommandComplete;
3334
import io.r2dbc.postgresql.message.backend.ErrorResponse;
35+
import io.r2dbc.postgresql.message.backend.Field;
36+
import io.r2dbc.postgresql.message.backend.NoticeResponse;
3437
import io.r2dbc.postgresql.message.backend.NotificationResponse;
3538
import io.r2dbc.postgresql.util.Assert;
3639
import io.r2dbc.postgresql.util.Operators;
@@ -53,11 +56,14 @@
5356
import reactor.util.annotation.Nullable;
5457

5558
import java.time.Duration;
59+
import java.util.EnumMap;
60+
import java.util.Map;
5661
import java.util.concurrent.atomic.AtomicReference;
5762
import java.util.function.Function;
5863

5964
import static io.r2dbc.postgresql.client.TransactionStatus.IDLE;
6065
import static io.r2dbc.postgresql.client.TransactionStatus.OPEN;
66+
import org.reactivestreams.Subscriber;
6167

6268
/**
6369
* An implementation of {@link Connection} for connecting to a PostgreSQL database.
@@ -78,6 +84,8 @@ final class PostgresqlConnection implements io.r2dbc.postgresql.api.PostgresqlCo
7884

7985
private final AtomicReference<NotificationAdapter> notificationAdapter = new AtomicReference<>();
8086

87+
private final AtomicReference<NoticeAdapter> noticeAdapter = new AtomicReference<>();
88+
8189
private volatile IsolationLevel isolationLevel;
8290

8391
private volatile IsolationLevel previousIsolationLevel;
@@ -181,6 +189,12 @@ public Mono<Void> close() {
181189
if (notificationAdapter != null && this.notificationAdapter.compareAndSet(notificationAdapter, null)) {
182190
notificationAdapter.dispose();
183191
}
192+
193+
NoticeAdapter noticeAdapter = this.noticeAdapter.get();
194+
195+
if (noticeAdapter != null && this.noticeAdapter.compareAndSet(noticeAdapter, null)) {
196+
noticeAdapter.dispose();
197+
}
184198
}).then(Mono.empty());
185199
}
186200

@@ -282,6 +296,24 @@ public Flux<Notification> getNotifications() {
282296
return notifications.getEvents();
283297
}
284298

299+
@Override
300+
public Flux<Notice> getNotices() {
301+
NoticeAdapter notices = this.noticeAdapter.get();
302+
303+
if (notices == null) {
304+
305+
notices = new NoticeAdapter();
306+
307+
if (this.noticeAdapter.compareAndSet(null, notices)) {
308+
notices.register(this.client);
309+
} else {
310+
notices = this.noticeAdapter.get();
311+
}
312+
}
313+
314+
return notices.getEvents();
315+
}
316+
285317
@Override
286318
public PostgresqlConnectionMetadata getMetadata() {
287319
return new PostgresqlConnectionMetadata(this.client.getVersion());
@@ -486,11 +518,14 @@ private void cleanupIsolationLevel() {
486518
}
487519

488520
/**
489-
* Adapter to publish {@link Notification}s.
521+
* Generic adapter that maps {@link BackendMessage}s received by subscribing
522+
* to a {@link Client}
523+
* @param <T> the exposed message type
524+
* @param <M> the source message type
490525
*/
491-
static class NotificationAdapter {
526+
static abstract class BackendMessageAdapter<T, M extends BackendMessage> {
492527

493-
private final Sinks.Many<Notification> sink = Sinks.many().multicast().directBestEffort();
528+
private final Sinks.Many<T> sink = Sinks.many().multicast().directBestEffort();
494529

495530
@Nullable
496531
private volatile Disposable subscription = null;
@@ -501,40 +536,79 @@ void dispose() {
501536
subscription.dispose();
502537
}
503538
}
539+
540+
abstract T mapMessage(M message);
541+
542+
abstract void registerSubscriber(Client client, Subscriber<M> subscriber);
504543

505544
void register(Client client) {
506545

507-
BaseSubscriber<NotificationResponse> subscriber = new BaseSubscriber<NotificationResponse>() {
546+
BaseSubscriber<M> subscriber = new BaseSubscriber<M>() {
508547

509548
@Override
510549
protected void hookOnSubscribe(Subscription subscription) {
511550
subscription.request(Long.MAX_VALUE);
512551
}
513552

514553
@Override
515-
public void hookOnNext(NotificationResponse notificationResponse) {
516-
NotificationAdapter.this.sink.emitNext(new NotificationResponseWrapper(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
554+
public void hookOnNext(M notificationResponse) {
555+
BackendMessageAdapter.this.sink.emitNext(mapMessage(notificationResponse), Sinks.EmitFailureHandler.FAIL_FAST);
517556
}
518557

519558
@Override
520559
public void hookOnError(Throwable throwable) {
521-
NotificationAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
560+
BackendMessageAdapter.this.sink.emitError(throwable, Sinks.EmitFailureHandler.FAIL_FAST);
522561
}
523562

524563
@Override
525564
public void hookOnComplete() {
526-
NotificationAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
565+
BackendMessageAdapter.this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
527566
}
528567
};
529568

530569
this.subscription = subscriber;
531-
client.addNotificationListener(subscriber);
570+
registerSubscriber(client, subscriber);
532571
}
533572

534-
Flux<Notification> getEvents() {
573+
Flux<T> getEvents() {
535574
return this.sink.asFlux();
536575
}
576+
}
577+
578+
/**
579+
* Adapter to publish {@link Notification}s.
580+
*/
581+
static class NotificationAdapter extends BackendMessageAdapter<Notification, NotificationResponse> {
582+
583+
@Override
584+
Notification mapMessage(NotificationResponse message) {
585+
return new NotificationResponseWrapper(message);
586+
}
587+
588+
@Override
589+
void registerSubscriber(Client client, Subscriber<NotificationResponse> subscriber) {
590+
client.addNotificationListener(subscriber);
591+
}
592+
}
537593

594+
/**
595+
* Adapter to publish {@link Notice}s.
596+
*/
597+
static class NoticeAdapter extends BackendMessageAdapter<Notice, NoticeResponse> {
598+
599+
@Override
600+
Notice mapMessage(NoticeResponse message) {
601+
final Notice notice = new Notice(new EnumMap<>(Field.FieldType.class));
602+
for (Field field : message.getFields()) {
603+
notice.fields.put(field.getType(), field.getValue());
604+
}
605+
return notice;
606+
}
607+
608+
@Override
609+
void registerSubscriber(Client client, Subscriber<NoticeResponse> subscriber) {
610+
client.addNoticeListener(subscriber);
611+
}
538612
}
539613

540614
enum EmptyTransactionDefinition implements TransactionDefinition {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.r2dbc.postgresql.api;
2+
3+
import io.r2dbc.postgresql.message.backend.Field;
4+
5+
import java.util.Map;
6+
7+
/**
8+
* Postgres notice.
9+
*/
10+
public class Notice {
11+
12+
/**
13+
* Notice messages by {@link Field.FieldType}.
14+
*/
15+
public final Map<Field.FieldType, String> fields;
16+
17+
/**
18+
* @param fields notice messages by {@link Field.FieldType}
19+
*/
20+
public Notice(Map<Field.FieldType, String> fields) {
21+
this.fields = fields;
22+
}
23+
}

src/main/java/io/r2dbc/postgresql/api/PostgresqlConnection.java

+9
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ default Mono<Long> copyIn(String sql, Publisher<ByteBuf> stdin) {
110110
*/
111111
Flux<Notification> getNotifications();
112112

113+
/**
114+
* Return a {@link Flux} of {@link Notice} received from the connection. The stream is a hot stream producing messages as they are received. Notices received by this
115+
* connection are published as they are received. When the client gets {@link #close() closed}, the subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport
116+
* connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
117+
*
118+
* @return a hot {@link Flux} of {@link Notice Notices}
119+
*/
120+
Flux<Notice> getNotices();
121+
113122
/**
114123
* Cancel currently running query by sending {@link CancelRequest} to a server.
115124
*

src/main/java/io/r2dbc/postgresql/client/Client.java

+23
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.netty.buffer.ByteBufAllocator;
2020
import io.r2dbc.postgresql.message.backend.BackendMessage;
21+
import io.r2dbc.postgresql.message.backend.NoticeResponse;
2122
import io.r2dbc.postgresql.message.backend.NotificationResponse;
2223
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
2324
import io.r2dbc.postgresql.message.frontend.CancelRequest;
@@ -60,6 +61,28 @@ public interface Client {
6061
*/
6162
void addNotificationListener(Subscriber<NotificationResponse> consumer);
6263

64+
/**
65+
* Add a consumer of notices. Notices received by this connection are sent to the {@link Consumer notice consumer}. Note that connection errors and events such as
66+
* disconnects are not visible to the {@link Consumer notice consumer}.
67+
*
68+
* @param consumer the consumer of notices
69+
* @return a new {@link Disposable} that can be used to cancel the underlying subscription
70+
* @throws IllegalArgumentException if {@code consumer} is {@code null}
71+
* @since 1.1.0
72+
*/
73+
Disposable addNoticeListener(Consumer<NoticeResponse> consumer);
74+
75+
/**
76+
* Add a consumer of notices. Notices received by this connection are sent to the {@link Subscriber notice listener}. When the client gets {@link #close() closed}, the
77+
* subscription {@link Subscriber#onComplete() completes normally}. Otherwise (transport connection disconnected unintentionally) with an {@link R2dbcNonTransientResourceException error}.
78+
*
79+
* @param consumer the consumer of notices
80+
* @return a new {@link Disposable} that can be used to cancel the underlying subscription
81+
* @throws IllegalArgumentException if {@code consumer} is {@code null}
82+
* @since 1.1.0
83+
*/
84+
void addNoticeListener(Subscriber<NoticeResponse> consumer);
85+
6386
/**
6487
* Release any resources held by the {@link Client}.
6588
*

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+15
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public final class ReactorNettyClient implements Client {
112112
private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer();
113113

114114
private final Sinks.Many<NotificationResponse> notificationProcessor = Sinks.many().multicast().directBestEffort();
115+
private final Sinks.Many<NoticeResponse> noticeProcessor = Sinks.many().multicast().directBestEffort();
115116

116117
private final AtomicBoolean isClosed = new AtomicBoolean(false);
117118

@@ -186,6 +187,7 @@ public Mono<Void> close() {
186187
return Mono.defer(() -> {
187188

188189
this.notificationProcessor.tryEmitComplete();
190+
this.noticeProcessor.tryEmitComplete();
189191

190192
drainError(EXPECTED);
191193

@@ -269,6 +271,8 @@ private boolean consumeMessage(BackendMessage message) {
269271

270272
if (message.getClass() == NoticeResponse.class) {
271273

274+
this.noticeProcessor.tryEmitNext((NoticeResponse) message);
275+
272276
this.settings.getNoticeLogLevel().log(logger, () -> this.context.getMessage(String.format("Notice: %s", toString(((NoticeResponse) message).getFields()))));
273277
return true;
274278
}
@@ -441,6 +445,16 @@ public void addNotificationListener(Subscriber<NotificationResponse> consumer) {
441445
this.notificationProcessor.asFlux().subscribe(consumer);
442446
}
443447

448+
@Override
449+
public Disposable addNoticeListener(Consumer<NoticeResponse> consumer) {
450+
return this.noticeProcessor.asFlux().subscribe(consumer);
451+
}
452+
453+
@Override
454+
public void addNoticeListener(Subscriber<NoticeResponse> consumer) {
455+
this.noticeProcessor.asFlux().subscribe(consumer);
456+
}
457+
444458
@Override
445459
public ByteBufAllocator getByteBufAllocator() {
446460
return this.byteBufAllocator;
@@ -530,6 +544,7 @@ private void drainError(Supplier<? extends Throwable> supplier) {
530544
this.messageSubscriber.close(supplier);
531545

532546
this.notificationProcessor.tryEmitError(supplier.get());
547+
this.noticeProcessor.tryEmitError(supplier.get());
533548
}
534549

535550
private final class EnsureSubscribersCompleteChannelHandler extends ChannelDuplexHandler {

0 commit comments

Comments
 (0)