Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues/3340 object array buffer implementation #3688

Closed

Conversation

MikkelHJuul
Copy link

This PR replace the internal buffer-implementation of the SizeBoundReplayBuffer to be an Atomic reference to an Object-array in stead of an atomic linked list.

I didn't manage to produce sane performance metrics (but my jmh also ran weird). I did however try implementing a dummy test where an atomic linked list was read vs. an atomic object array, that showed the array to be better performance (4 times better).

The implementation is obviously more heavy on the GC (but it's easily collectible garbage).

There is a thing to say on the Sinks.Many Normal/Unsafe interfaces, and this is obviously a lot more safe in concurrent usage.

SinksManyReplayLatest stress test looks like:
Results across all configurations:

  RESULT     SAMPLES     FREQ       EXPECT  DESCRIPTION
       1          12   <0,01%  Interesting  Signals are lost before replay
       2      45.432    0,34%  Interesting  Signals are lost before replay
       3      87.442    0,66%  Interesting  Signals are lost before replay
       4      81.540    0,62%  Interesting  Signals are lost before replay
       5      84.897    0,64%  Interesting  Signals are lost before replay
       6  12.875.461   97,73%   Acceptable  all signals go through

while for the SizeBoundReplayBuffer looks like this:

  RESULT     SAMPLES     FREQ       EXPECT  DESCRIPTION
       1      38.318    0,28%  Interesting  Signals are lost before replay
       2      43.811    0,32%  Interesting  Signals are lost before replay
       3      36.974    0,27%  Interesting  Signals are lost before replay
       4      43.125    0,32%  Interesting  Signals are lost before replay
       5      81.231    0,60%  Interesting  Signals are lost before replay
       6  13.371.645   98,21%   Acceptable  all signals go through

This was after further fixes to the add method, like so:

		@Override
		public void add(T value) {
			final Node<T> tail = this.tail;
			final Node<T> n = new Node<>(tail.index + 1, value);
			tail.set(n);
			this.tail = n;
			int s = size;
			if (s == limit) {
				head.set(Optional.ofNullable(head.get()).map(AtomicReference::get).orElse(n)); // changes here
			}
			else {
				size = s + 1;
			}
		}

This is very surprising since the first line has a racy non-atomic read and there are multiple racing atomic operations. I think having the head be final helped a bunch.

@MikkelHJuul MikkelHJuul requested a review from a team as a code owner January 9, 2024 18:40
@OlegDokuka
Copy link
Contributor

Hi, @MikkelHJuul!

Thank you for your hard effort putting all potential improvements in multiple PRs. We have reviewed all of them and estimated the impact on the existing behaviour. To understand the impact please consider the following sample:

public static void main(String[] args) {
        BaseSubscriber<String> subscriber = new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(1);
            }
        };

        Sinks.Many<String> sink = Sinks.many()
                                       .replay()
                                       .latest();

       sink.asFlux().log("SUB1").subscribe(subscriber);

        System.out.println("sending first");

        sink.tryEmitNext("1");

        System.out.println("sending more");

        sink.tryEmitNext("2");
        sink.tryEmitNext("3");
        sink.tryEmitNext("4");
        sink.tryEmitNext("5");
        
        System.out.println("another subscriber");
        sink.asFlux().log("SUB2").subscribe();
        
        System.out.println("sub 1 more demand");
        subscriber.request(4);

once the sample is executed, the output is the following:

12:39:36.831 [main] INFO  SUB1 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.840 [main] INFO  SUB1 - | request(1)
sending first
12:39:36.841 [main] INFO  SUB1 - | onNext(1)
sending more
another subscriber
12:39:36.844 [main] INFO  SUB2 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.844 [main] INFO  SUB2 - | request(unbounded)
12:39:36.844 [main] INFO  SUB2 - | onNext(5)
sub 1 more demand
12:39:36.844 [main] INFO  SUB1 - | request(4)
12:39:36.844 [main] INFO  SUB1 - | onNext(2)
12:39:36.844 [main] INFO  SUB1 - | onNext(3)
12:39:36.844 [main] INFO  SUB1 - | onNext(4)
12:39:36.844 [main] INFO  SUB1 - | onNext(5)

as it could be noticed, all 4 elements that was not observed by subscriber 1 due to lack of demand are delivered (this is because of linked list data structure used right now)

with your suggestion we have the following logs output:

12:39:36.831 [main] INFO  SUB1 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.840 [main] INFO  SUB1 - | request(1)
sending first
12:39:36.841 [main] INFO  SUB1 - | onNext(1)
sending more
another subscriber
12:39:36.844 [main] INFO  SUB2 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
12:39:36.844 [main] INFO  SUB2 - | request(unbounded)
12:39:36.844 [main] INFO  SUB2 - | onNext(5)
sub 1 more demand
12:39:36.844 [main] INFO  SUB1 - | request(4)
12:39:36.844 [main] INFO  SUB1 - | onNext(5)

as it could be noticed only the last element is delivered while the others are dropped.
The most recent behaviour could be valid and expected for some but for the others it could be a breaking change.

For now, all of the PRs introduces behaviour change which is not acceptable for this operator (it stays in the codebase from the very first release of the 3.x line with this behaviour). To mitigate that breaking change we need to add an extra builder step in the MulticastReplaySpec which would let one to decide transparently which behaviour is expected. Consider the following configurations code sample:

       BaseSubscriber<String> subscriber = new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(1);
            }
        };
        Sinks.Many<String> sink = Sinks.many()
                                       .replay()
                                       .allOrNothing() // we use the Array based replay buffer / single element replay buffer
                                       .latest();
                                       
        sink.asFlux().log("SUB1").subscribe(subscriber);

        System.out.println("sending first");

        sink.tryEmitNext("1");

        System.out.println("sending more");

        sink.tryEmitNext("2"); / / returned EmitResult is FAIL_OVERFLOW
        sink.tryEmitNext("3");  / / returned EmitResult is FAIL_OVERFLOW
        sink.tryEmitNext("4");  / / returned EmitResult is FAIL_OVERFLOW
        sink.tryEmitNext("5");  / / returned EmitResult is FAIL_OVERFLOW                      
        
        subscriber.request(4); // nothing happens after that since no items are stored                             

or

       BaseSubscriber<String> subscriber = new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(1);
            }
        };
       Sinks.Many<String> sink = Sinks.many()
                                       .replay()
                                       .bestEffort() // we use the Array based replay buffer / single element replay buffer
                                       .latest();
                                       
        sink.asFlux().log("SUB1").subscribe(subscriber);

        System.out.println("sending first");

        sink.tryEmitNext("1");

        System.out.println("sending more");

        sink.tryEmitNext("2");  / / returned EmitResult is OK
        sink.tryEmitNext("3");  / / returned EmitResult is OK
        sink.tryEmitNext("4");  / / returned EmitResult is OK
        sink.tryEmitNext("5");  / / returned EmitResult is OK 

       
        subscriber.request(4); // element 5 is delivered                   
                                       

and

       BaseSubscriber<String> subscriber = new BaseSubscriber<String>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                subscription.request(1);
            }
        };
       Sinks.Many<String> sink = Sinks.many()
                                       .replay() // we use the old linked list based replay buffer
                                      // nothing selected so we need to preserve old behaviour 
                                       .latest();
                                       
        sink.asFlux().log("SUB1").subscribe(subscriber);

        System.out.println("sending first");

        sink.tryEmitNext("1");

        System.out.println("sending more");

        sink.tryEmitNext("2");  / / returned EmitResult is OK
        sink.tryEmitNext("3");  / / returned EmitResult is OK
        sink.tryEmitNext("4");  / / returned EmitResult is OK
        sink.tryEmitNext("5");  / / returned EmitResult is OK 

       
        subscriber.request(4); // element 2, 3, 4, 5 are delivered                   
                                       

Would this solution work for you?
If so, we can close the original issue and open a new one with the specification expressed above.
Also, are you interested in opening new PR and add required spec changes (in such a case we don't need a separate issue)?

Thanks,
Oleh

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants