Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce sync subscribers on EventRouter #1782

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
public class EventRouterImpl implements EventRouter {

private final List<EventSubscriber> subscribers = new ArrayList<>();
private final List<EventSubscriber> syncSubscribers = new ArrayList<>();
private final Monitor monitor;
private final ExecutorService executor;

@@ -38,13 +39,20 @@ public EventRouterImpl(Monitor monitor, ExecutorService executor) {
this.executor = executor;
}

@Override
public void registerSync(EventSubscriber subscriber) {
this.syncSubscribers.add(subscriber);
}

@Override
public void register(EventSubscriber subscriber) {
subscribers.add(subscriber);
}

@Override
public void publish(Event event) {
syncSubscribers.forEach(subscriber -> subscriber.on(event));

subscribers.stream()
.map(subscriber -> runAsync(() -> subscriber.on(event), executor).thenApply(v -> subscriber))
.forEach(future -> future.whenComplete((subscriber, throwable) -> {
Original file line number Diff line number Diff line change
@@ -24,12 +24,14 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

class EventRouterImplTest {

@@ -39,14 +41,17 @@ class EventRouterImplTest {

@Test
void shouldPublishToAllSubscribers() {
var syncSubscriber = mock(EventSubscriber.class);
var subscriberA = mock(EventSubscriber.class);
var subscriberB = mock(EventSubscriber.class);
eventRouter.registerSync(syncSubscriber);
eventRouter.register(subscriberA);
eventRouter.register(subscriberB);

eventRouter.publish(TestEvent.Builder.newInstance().at(clock.millis()).build());

await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
verify(syncSubscriber).on(isA(TestEvent.class));
verify(subscriberA).on(isA(TestEvent.class));
verify(subscriberB).on(isA(TestEvent.class));
});
@@ -68,7 +73,22 @@ void shouldNotInterruptPublishingWhenSubscriberThrowsException() {
});
}

@Test
void shouldInterruptPublishingWhenSyncSubscriberThrowsException() {
var subscriberA = mock(EventSubscriber.class);
var subscriberB = mock(EventSubscriber.class);
doThrow(new RuntimeException("unexpected exception")).when(subscriberA).on(any());
eventRouter.registerSync(subscriberA);
eventRouter.register(subscriberB);

assertThatThrownBy(() -> eventRouter.publish(TestEvent.Builder.newInstance().at(clock.millis()).build()))
.isInstanceOf(RuntimeException.class)
.hasMessage("unexpected exception");
verifyNoInteractions(subscriberB);
}

private static class TestEvent extends Event<TestEvent.Payload> {

public static class Builder extends Event.Builder<TestEvent, Payload, Builder> {

public static Builder newInstance() {
13 changes: 12 additions & 1 deletion docs/developer/events.md
Original file line number Diff line number Diff line change
@@ -6,6 +6,16 @@ emitted from the core of the EDC and also emit custom events.
## Subscribe to events
The entry point for event listening is the `EventRouter` component, on which an `EventSubscriber` can be registered.

Actually, there are two ways to register an `EventSubscriber`:
- **async**: every event will be sent to the subscriber in an asynchronous way. Features:
- fast, as the main thread won't be blocked during dispatchment.
- not-reliable, as an eventual subscriber dispatch failure won't get handled.
- to be used for notifications and for send-and-forget event dispatchment.
- **sync**: every event will be sent to the subscriber in a synchronous way. Features:
- slow, as the subscriber will block the main thread until the event is dispatched
- reliable, an eventual exception will be thrown to the caller, and it could make a transactional context fail
- to be used for event persistence and to satisfy the "at-least-one" rule.

Extension example:
```java
public class ExampleEventSubscriptionExtension implements ServiceExtension {
@@ -14,7 +24,8 @@ public class ExampleEventSubscriptionExtension implements ServiceExtension {

@Override
public void initialize(ServiceExtensionContext context) {
eventRouter.register(new ExampleEventSubscriber());
eventRouter.register(new ExampleEventSubscriber()); // asynchronous dispatch
eventRouter.registerSync(new ExampleEventSubscriber()); // synchronous dispatch
}
}
```
Original file line number Diff line number Diff line change
@@ -21,7 +21,16 @@
public interface EventRouter {

/**
* Register a new subscriber to the events
* Register a new synchronous subscriber.
* The synchronous subscribers are supposed to being notified before the standard subscribers in a synchronous way.
* If a synchronous subscriber thrown an exception, this will stop the publish operation.
*
* @param subscriber that will receive every published event
*/
void registerSync(EventSubscriber subscriber);

/**
* Register a new asynchronous subscriber to the events
*
* @param subscriber that will receive every published event
*/