diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index e55b808633..04f98a2aa2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -19,7 +19,7 @@ import io.reactivex.rxjava3.annotations.*; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.flowables.*; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.*; @@ -10421,13 +10421,16 @@ public final Disposable forEachWhile(final Predicate onNext, final Co * *
*
Backpressure:
- *
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} - * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on - * downstream consumption). Note that both the returned and its inner {@code Publisher}s use - * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may - * lead to {@code OutOfMemoryError}.
+ *
The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else + * this operator will signal {@link MissingBackpressureException}. To avoid this exception, make + * sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured. + * The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer + * nature of this operator, each group must be consumed so the whole operator can make progress and not hang.
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and + * all active inner {@code GroupedFlowable}s will signal the same exception.
*
* * @param keySelector @@ -10438,9 +10441,11 @@ public final Disposable forEachWhile(final Predicate onNext, final Co * unique key value and each of which emits those items from the source Publisher that share that * key value * @see ReactiveX operators documentation: GroupBy + * @see #groupBy(Function, boolean) + * @see #groupBy(Function, Function) */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) + @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> groupBy(Function keySelector) { return groupBy(keySelector, Functions.identity(), false, bufferSize()); @@ -10474,13 +10479,16 @@ public final Flowable> groupBy(Function *
Backpressure:
- *
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} - * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on - * downstream consumption). Note that both the returned and its inner {@code Publisher}s use - * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may - * lead to {@code OutOfMemoryError}.
+ *
The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else + * this operator will signal {@link MissingBackpressureException}. To avoid this exception, make + * sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured. + * The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer + * nature of this operator, each group must be consumed so the whole operator can make progress and not hang.
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and + * all active inner {@code GroupedFlowable}s will signal the same exception.
* * * @param keySelector @@ -10496,7 +10504,7 @@ public final Flowable> groupBy(FunctionReactiveX operators documentation: GroupBy */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) + @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> groupBy(Function keySelector, boolean delayError) { return groupBy(keySelector, Functions.identity(), delayError, bufferSize()); @@ -10530,13 +10538,16 @@ public final Flowable> groupBy(Function *
Backpressure:
- *
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} - * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on - * downstream consumption). Note that both the returned and its inner {@code Publisher}s use - * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may - * lead to {@code OutOfMemoryError}.
+ *
The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else + * this operator will signal {@link MissingBackpressureException}. To avoid this exception, make + * sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured. + * The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer + * nature of this operator, each group must be consumed so the whole operator can make progress and not hang.
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and + * all active inner {@code GroupedFlowable}s will signal the same exception.
* * * @param keySelector @@ -10551,9 +10562,12 @@ public final Flowable> groupBy(FunctionReactiveX operators documentation: GroupBy + * @see #groupBy(Function, Function, boolean) + * @see #groupBy(Function, Function, boolean, int) + * @see #groupBy(Function, Function, boolean, int, Function) */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) + @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> groupBy(Function keySelector, Function valueSelector) { @@ -10588,13 +10602,16 @@ public final Flowable> groupBy(Function *
Backpressure:
- *
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} - * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on - * downstream consumption). Note that both the returned and its inner {@code Publisher}s use - * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may - * lead to {@code OutOfMemoryError}.
+ *
The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else + * this operator will signal {@link MissingBackpressureException}. To avoid this exception, make + * sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured. + * The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer + * nature of this operator, each group must be consumed so the whole operator can make progress and not hang.
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and + * all active inner {@code GroupedFlowable}s will signal the same exception.
* * * @param keySelector @@ -10612,9 +10629,10 @@ public final Flowable> groupBy(FunctionReactiveX operators documentation: GroupBy + * @see #groupBy(Function, Function, boolean, int) */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) + @BackpressureSupport(BackpressureKind.ERROR) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable> groupBy(Function keySelector, Function valueSelector, boolean delayError) { @@ -10649,13 +10667,16 @@ public final Flowable> groupBy(Function *
Backpressure:
- *
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} - * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on - * downstream consumption). Note that both the returned and its inner {@code Publisher}s use - * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may - * lead to {@code OutOfMemoryError}.
+ *
The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else + * this operator will signal {@link MissingBackpressureException}. To avoid this exception, make + * sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured. + * The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer + * nature of this operator, each group must be consumed so the whole operator can make progress and not hang.
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and + * all active inner {@code GroupedFlowable}s will signal the same exception.
* * * @param keySelector @@ -10678,7 +10699,7 @@ public final Flowable> groupBy(Function Flowable> groupBy(Function keySelector, Function valueSelector, @@ -10759,13 +10780,16 @@ public final Flowable> groupBy(Function *
Backpressure:
- *
Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher} - * is consumed in a bounded mode (i.e., requested a fixed amount upfront and replenished based on - * downstream consumption). Note that both the returned and its inner {@code GroupedFlowable}s use - * unbounded internal buffers and if the source {@code Publisher} doesn't honor backpressure, that may - * lead to {@code OutOfMemoryError}.
+ *
The consumer of the returned {@code Flowable} has to be ready to receive new {@code GroupedFlowable}s or else + * this operator will signal {@link MissingBackpressureException}. To avoid this exception, make + * sure a combining operator (such as {@code flatMap}) has adequate amount of buffering/prefetch configured. + * The inner {@code GroupedFlowable}s honor backpressure but due to the single-source multiple consumer + * nature of this operator, each group must be consumed so the whole operator can make progress and not hang.
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the upstream signals or the callback(s) throw an exception, the returned {@code Flowable} and + * all active inner {@code GroupedFlowable}s will signal the same exception.
* *

History: 2.1.10 - beta * @param keySelector @@ -10796,7 +10820,7 @@ public final Flowable> groupBy(Function Flowable> groupBy(Function keySelector, Function valueSelector, diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java index 6b76558a5e..85c7d08289 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java @@ -21,7 +21,7 @@ import io.reactivex.rxjava3.annotations.Nullable; import io.reactivex.rxjava3.core.*; -import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.flowables.GroupedFlowable; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue; @@ -74,8 +74,8 @@ protected void subscribeActual(Subscriber> s) { } public static final class GroupBySubscriber - extends BasicIntQueueSubscription> - implements FlowableSubscriber { + extends AtomicLong + implements FlowableSubscriber, Subscription { private static final long serialVersionUID = -3688291656102519502L; @@ -83,9 +83,9 @@ public static final class GroupBySubscriber final Function keySelector; final Function valueSelector; final int bufferSize; + final int limit; final boolean delayError; final Map> groups; - final SpscLinkedArrayQueue> queue; final Queue> evictedGroups; static final Object NULL_KEY = new Object(); @@ -94,15 +94,13 @@ public static final class GroupBySubscriber final AtomicBoolean cancelled = new AtomicBoolean(); - final AtomicLong requested = new AtomicLong(); + long emittedGroups; final AtomicInteger groupCount = new AtomicInteger(1); - Throwable error; - volatile boolean finished; - boolean done; + final AtomicLong groupConsumed = new AtomicLong(); - boolean outputFused; + boolean done; public GroupBySubscriber(Subscriber> actual, Function keySelector, Function valueSelector, int bufferSize, boolean delayError, @@ -111,10 +109,10 @@ public GroupBySubscriber(Subscriber> actual, Funct this.keySelector = keySelector; this.valueSelector = valueSelector; this.bufferSize = bufferSize; + this.limit = bufferSize - (bufferSize >> 2); this.delayError = delayError; this.groups = groups; this.evictedGroups = evictedGroups; - this.queue = new SpscLinkedArrayQueue>(bufferSize); } @Override @@ -132,8 +130,6 @@ public void onNext(T t) { return; } - final SpscLinkedArrayQueue> q = this.queue; - K key; try { key = keySelector.apply(t); @@ -169,8 +165,14 @@ public void onNext(T t) { Exceptions.throwIfFatal(ex); upstream.cancel(); if (newGroup) { - q.offer(group); - drain(); + if (emittedGroups != get()) { + downstream.onNext(group); + } else { + MissingBackpressureException mbe = new MissingBackpressureException(groupHangWarning(emittedGroups)); + mbe.initCause(ex); + onError(mbe); + return; + } } onError(ex); return; @@ -181,18 +183,26 @@ public void onNext(T t) { completeEvictions(); if (newGroup) { - q.offer(group); - drain(); - - if (group.state.tryAbandon()) { - cancel(key); - group.onComplete(); - - upstream.request(1); + if (emittedGroups != get()) { + emittedGroups++; + downstream.onNext(group); + if (group.state.tryAbandon()) { + cancel(key); + group.onComplete(); + + requestGroup(1); + } + } else { + upstream.cancel(); + onError(new MissingBackpressureException(groupHangWarning(emittedGroups))); } } } + static String groupHangWarning(long n) { + return "Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed."; + } + @Override public void onError(Throwable t) { if (done) { @@ -207,9 +217,7 @@ public void onError(Throwable t) { if (evictedGroups != null) { evictedGroups.clear(); } - error = t; - finished = true; - drain(); + downstream.onError(t); } @Override @@ -223,16 +231,14 @@ public void onComplete() { evictedGroups.clear(); } done = true; - finished = true; - drain(); + downstream.onComplete(); } } @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { - BackpressureHelper.add(requested, n); - drain(); + BackpressureHelper.add(this, n); } } @@ -267,172 +273,38 @@ public void cancel(K key) { groups.remove(mapKey); if (groupCount.decrementAndGet() == 0) { upstream.cancel(); - - if (!outputFused && getAndIncrement() == 0) { - queue.clear(); - } - } - } - - void drain() { - if (getAndIncrement() != 0) { - return; - } - - if (outputFused) { - drainFused(); - } else { - drainNormal(); - } - } - - void drainFused() { - int missed = 1; - - final SpscLinkedArrayQueue> q = this.queue; - final Subscriber> a = this.downstream; - - for (;;) { - if (cancelled.get()) { - q.clear(); - return; - } - - boolean d = finished; - - if (d && !delayError) { - Throwable ex = error; - if (ex != null) { - q.clear(); - a.onError(ex); - return; - } - } - - a.onNext(null); - - if (d) { - Throwable ex = error; - if (ex != null) { - a.onError(ex); - } else { - a.onComplete(); - } - return; - } - - missed = addAndGet(-missed); - if (missed == 0) { - return; - } } } - void drainNormal() { - int missed = 1; - - final SpscLinkedArrayQueue> q = this.queue; - final Subscriber> a = this.downstream; - + void requestGroup(long n) { + // lots of atomics, save local + AtomicLong groupConsumed = this.groupConsumed; + int limit = this.limit; + // Concurrent groups can request at once, a CAS loop is needed for (;;) { + long currentConsumed = groupConsumed.get(); + long newConsumed = BackpressureHelper.addCap(currentConsumed, n); + // Accumulate the consumed amounts and atomically update the total + if (groupConsumed.compareAndSet(currentConsumed, newConsumed)) { + // if successful, let's see if the prefetch limit has been surpassed + for (;;) { + if (newConsumed < limit) { + // no further actions to be taken + return; + } - long r = requested.get(); - long e = 0L; - - while (e != r) { - boolean d = finished; - - GroupedFlowable t = q.poll(); - - boolean empty = t == null; - - if (checkTerminated(d, empty, a, q)) { - return; - } - - if (empty) { - break; - } - - a.onNext(t); - - e++; - } - - if (e == r && checkTerminated(finished, q.isEmpty(), a, q)) { - return; - } - - if (e != 0L) { - if (r != Long.MAX_VALUE) { - requested.addAndGet(-e); - } - upstream.request(e); - } - - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - } - } - - boolean checkTerminated(boolean d, boolean empty, Subscriber a, SpscLinkedArrayQueue q) { - if (cancelled.get()) { - q.clear(); - return true; - } - - if (delayError) { - if (d && empty) { - Throwable ex = error; - if (ex != null) { - a.onError(ex); - } else { - a.onComplete(); - } - return true; - } - } else { - if (d) { - Throwable ex = error; - if (ex != null) { - q.clear(); - a.onError(ex); - return true; - } else if (empty) { - a.onComplete(); - return true; + // Yes, remove one limit from total consumed + long newConsumedAfterLimit = newConsumed - limit; + // Only one thread should subtract + if (groupConsumed.compareAndSet(newConsumed, newConsumedAfterLimit)) { + // Then request up to limit + upstream.request(limit); + } + // We don't quit but loop to see if we are still above the prefetch limit + newConsumed = groupConsumed.get(); } } } - - return false; - } - - @Override - public int requestFusion(int mode) { - if ((mode & ASYNC) != 0) { - outputFused = true; - return ASYNC; - } - return NONE; - } - - @Nullable - @Override - public GroupedFlowable poll() { - return queue.poll(); - } - - @Override - public void clear() { - queue.clear(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); } } @@ -501,7 +373,6 @@ static final class State extends BasicIntQueueSubscription implements P final AtomicReference> actual = new AtomicReference>(); boolean outputFused; - int produced; final AtomicInteger once = new AtomicInteger(); @@ -655,7 +526,7 @@ void drainNormal() { T v = q.poll(); boolean empty = v == null; - if (checkTerminated(d, empty, a, delayError)) { + if (checkTerminated(d, empty, a, delayError, e)) { return; } @@ -668,17 +539,14 @@ void drainNormal() { e++; } - if (e == r && checkTerminated(done, q.isEmpty(), a, delayError)) { + if (e == r && checkTerminated(done, q.isEmpty(), a, delayError, e)) { return; } if (e != 0L) { - if (r != Long.MAX_VALUE) { - requested.addAndGet(-e); - } - if ((once.get() & ABANDONED) == 0) { - parent.upstream.request(e); - } + BackpressureHelper.produced(requested, e); + // replenish based on this batch run + requestParent(e); } } @@ -692,9 +560,23 @@ void drainNormal() { } } - boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError) { + void requestParent(long e) { + if ((once.get() & ABANDONED) == 0) { + parent.requestGroup(e); + } + } + + boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError, long emitted) { if (cancelled.get()) { - queue.clear(); + // if this group is canceled, all accumulated emissions and + // remaining items in the queue should be requested + // so that other groups can proceed + while (queue.poll() != null) { + emitted++; + } + if (emitted != 0L) { + requestParent(emitted); + } return true; } @@ -735,6 +617,14 @@ public int requestFusion(int mode) { return NONE; } + void tryReplenish() { + int p = produced; + if (p != 0) { + produced = 0; + requestParent(p); + } + } + @Nullable @Override public T poll() { @@ -753,22 +643,18 @@ public boolean isEmpty() { tryReplenish(); return true; } + tryReplenish(); return false; } - void tryReplenish() { - int p = produced; - if (p != 0) { - produced = 0; - if ((once.get() & ABANDONED) == 0) { - parent.upstream.request(p); - } - } - } - @Override public void clear() { - queue.clear(); + SpscLinkedArrayQueue q = queue; + // queue.clear() would drop submitted items and not replenish, possibly hanging other groups + while (q.poll() != null) { + produced++; + } + tryReplenish(); } } } diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java index fb4b89d947..b7e4edcab4 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java @@ -100,7 +100,8 @@ public Integer apply(Integer i) { public Flowable apply(GroupedFlowable v) { return v; } - }).subscribe(ts); + }, 20) // need to prefetch as many groups as groupBy produces to avoid MBE + .subscribe(ts); // Behavior change: this now counts as group abandonment because concatMap // doesn't subscribe to the 2nd+ emitted groups immediately diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java index f5bf461dda..5dae57500f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java @@ -34,9 +34,9 @@ import io.reactivex.rxjava3.flowables.GroupedFlowable; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; -import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.internal.schedulers.ImmediateThinScheduler; import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; -import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.processors.PublishProcessor; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.PublishSubject; @@ -1322,29 +1322,30 @@ public void accept(Integer v) { System.out.println("testgroupByBackpressure2 >> " + v); } }) - .groupBy(IS_EVEN2).flatMap(new Function, Flowable>() { - - @Override - public Flowable apply(final GroupedFlowable g) { - return g.take(2).observeOn(Schedulers.computation()).map(new Function() { - - @Override - public String apply(Integer l) { - if (g.getKey()) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - } - return l + " is even."; - } else { - return l + " is odd."; - } - } - - }); - } + .groupBy(IS_EVEN2) + .flatMap(new Function, Flowable>() { + @Override + public Flowable apply(final GroupedFlowable g) { + return g.take(2) + .observeOn(Schedulers.computation()) + .map(new Function() { + @Override + public String apply(Integer l) { + if (g.getKey()) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + return l + " is even."; + } else { + return l + " is odd."; + } + } + }); + } + }, 4000) // a lot of groups are created due to take(2) + .subscribe(ts); - }).subscribe(ts); ts.awaitDone(5, TimeUnit.SECONDS); ts.assertNoErrors(); } @@ -1513,9 +1514,30 @@ public void onNext(Integer t) { * or emit to a completely different group. In this test, the merge requests N which * must be produced by the range, however it will create a bunch of groups before the actual * group receives a value. + * + * 12/03/2019: this test produces abandoned groups and as such keeps producing new groups + * that have to be ready to be received by observeOn and merge. */ @Test public void backpressureObserveOnOuter() { + int n = 500; + for (int j = 0; j < 1000; j++) { + Flowable.merge( + Flowable.range(0, n) + .groupBy(new Function() { + @Override + public Object apply(Integer i) { + return i % (Flowable.bufferSize() + 2); + } + }) + .observeOn(Schedulers.computation(), false, n) + , n) + .blockingLast(); + } + } + + @Test(expected = MissingBackpressureException.class) + public void backpressureObserveOnOuterMissingBackpressure() { for (int j = 0; j < 1000; j++) { Flowable.merge( Flowable.range(0, 500) @@ -1537,8 +1559,9 @@ public Object apply(Integer i) { public void backpressureInnerDoesntOverflowOuter() { TestSubscriber> ts = new TestSubscriber>(0L); - Flowable.fromArray(1, 2) - .groupBy(new Function() { + PublishProcessor pp = PublishProcessor.create(); + + pp.groupBy(new Function() { @Override public Integer apply(Integer v) { return v; @@ -1554,11 +1577,37 @@ public void accept(GroupedFlowable g) { ; ts.request(1); + pp.onNext(1); + ts.assertNotComplete(); ts.assertNoErrors(); ts.assertValueCount(1); } + @Test + public void backpressureInnerDoesntOverflowOuterMissingBackpressure() { + TestSubscriber> ts = new TestSubscriber>(1); + + Flowable.fromArray(1, 2) + .groupBy(new Function() { + @Override + public Integer apply(Integer v) { + return v; + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(GroupedFlowable g) { + g.subscribe(); + } + }) // this will request Long.MAX_VALUE + .subscribe(ts) + ; + ts.assertValueCount(1) + .assertError(MissingBackpressureException.class) + .assertNotComplete(); + } + @Test public void oneGroupInnerRequestsTwiceBuffer() { // FIXME: delayed requesting in groupBy results in group abandonment @@ -1628,7 +1677,6 @@ public void accept(GroupedFlowable g) { .assertComplete(); ts2 - .assertFusionMode(QueueFuseable.ASYNC) .assertValueCount(1) .assertNoErrors() .assertComplete(); @@ -1790,30 +1838,6 @@ public Publisher apply(GroupedFlowable g) throws Excep .assertResult(1); } - @Test - public void errorFused() { - TestSubscriberEx ts = new TestSubscriberEx().setInitialFusionMode(QueueFuseable.ANY); - - Flowable.error(new TestException()) - .groupBy(Functions.justFunction(1)) - .subscribe(ts); - - ts.assertFusionMode(QueueFuseable.ASYNC) - .assertFailure(TestException.class); - } - - @Test - public void errorFusedDelayed() { - TestSubscriberEx ts = new TestSubscriberEx().setInitialFusionMode(QueueFuseable.ANY); - - Flowable.error(new TestException()) - .groupBy(Functions.justFunction(1), true) - .subscribe(ts); - - ts.assertFusionMode(QueueFuseable.ASYNC) - .assertFailure(TestException.class); - } - @Test public void groupError() { Flowable.just(1).concatWith(Flowable.error(new TestException())) @@ -2334,86 +2358,6 @@ public void accept(GroupedFlowable g) throws Throwable { ts2.assertFailure(TestException.class, 1); } - @Test - public void fusedNoConcurrentCleanDueToCancel() { - for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) { - List errors = TestHelper.trackPluginErrors(); - try { - final PublishProcessor pp = PublishProcessor.create(); - - final AtomicReference>> qs = - new AtomicReference>>(); - - final TestSubscriber ts2 = new TestSubscriber(); - - pp.groupBy(Functions.identity(), Functions.identity(), false, 4) - .subscribe(new FlowableSubscriber>() { - - boolean once; - - @Override - public void onNext(GroupedFlowable g) { - if (!once) { - try { - GroupedFlowable t = qs.get().poll(); - if (t != null) { - once = true; - t.subscribe(ts2); - } - } catch (Throwable ignored) { - // not relevant here - } - } - } - - @Override - public void onError(Throwable t) { - } - - @Override - public void onComplete() { - } - - @Override - public void onSubscribe(Subscription s) { - @SuppressWarnings("unchecked") - QueueSubscription> q = (QueueSubscription>)s; - qs.set(q); - q.requestFusion(QueueFuseable.ANY); - q.request(1); - } - }) - ; - - Runnable r1 = new Runnable() { - @Override - public void run() { - qs.get().cancel(); - qs.get().clear(); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - ts2.cancel(); - } - }; - - for (int i = 0; i < 100; i++) { - pp.onNext(i); - } - - TestHelper.race(r1, r2); - - if (!errors.isEmpty()) { - throw new CompositeException(errors); - } - } finally { - RxJavaPlugins.reset(); - } - } - } - @Test public void fusedParallelGroupProcessing() { Flowable.range(0, 500000) @@ -2443,4 +2387,61 @@ public Publisher apply(GroupedFlowable g) { .assertComplete() .assertNoErrors(); } + + @Test + @SuppressWarnings("unchecked") + public void valueSelectorCrashAndMissingBackpressure() { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriberEx> ts = pp.groupBy(Functions.justFunction(1), new Function() { + @Override + public Integer apply(Integer t) throws Throwable { + throw new TestException(); + } + }) + .subscribeWith(new TestSubscriberEx>(0L)); + + assertTrue(pp.offer(1)); + + ts.assertFailure(MissingBackpressureException.class); + + assertTrue("" + ts.errors().get(0).getCause(), ts.errors().get(0).getCause() instanceof TestException); + } + + @Test + public void fusedGroupClearedOnCancel() { + Flowable.just(1) + .groupBy(Functions.identity()) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) throws Throwable { + return g.observeOn(ImmediateThinScheduler.INSTANCE).take(1); + } + }) + .test() + .assertResult(1); + } + + @Test + public void fusedGroupClearedOnCancelDelayed() { + Flowable.range(1, 100) + .groupBy(Functions.justFunction(1)) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) throws Throwable { + return g.observeOn(Schedulers.io()) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Throwable { + Thread.sleep(100); + } + }) + .take(1); + } + }) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRetryTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRetryTest.java index 926b37be84..e2db70633a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRetryTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRetryTest.java @@ -864,7 +864,7 @@ public String apply(String t1) { public Flowable apply(GroupedFlowable t1) { return t1.take(1); } - }) + }, NUM_MSG) // Must request as many groups as groupBy produces to avoid MBE .subscribe(new TestSubscriber(subscriber)); InOrder inOrder = inOrder(subscriber);