diff --git a/.travis.yml b/.travis.yml index 1a5ad164f6..6e595b4e93 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,7 @@ script: gradle/buildViaTravis.sh # Code coverage after_success: - - bash <(curl -s https://codecov.io/bash) + - bash <(curl -s --retry 10 https://codecov.io/bash) - bash gradle/push_javadoc.sh # cache between builds diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 622bab5bb0..785c7e7d53 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -10414,6 +10414,11 @@ public final Disposable forEachWhile(final Predicate onNext, final Co * {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency * value to be greater or equal to the expected number of groups, possibly using * {@code Integer.MAX_VALUE} if the number of expected groups is unknown. + *

+ * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. * *

*
Backpressure:
@@ -10462,6 +10467,12 @@ public final Flowable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. + * *
*
Backpressure:
*
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} @@ -10512,6 +10523,11 @@ public final Flowable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. * *
*
Backpressure:
@@ -10565,6 +10581,11 @@ public final Flowable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. * *
*
Backpressure:
@@ -10621,6 +10642,11 @@ public final Flowable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. * *
*
Backpressure:
@@ -10726,6 +10752,11 @@ public final Flowable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. * *
*
Backpressure:
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index a56aa97695..d5b2895122 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -9067,6 +9067,12 @@ public final Disposable forEachWhile(final Predicate onNext, Consumer * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those * {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may * discard their buffers by applying an operator like {@link #ignoreElements} to them. + *

+ * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. + * *

*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
@@ -9101,6 +9107,12 @@ public final Observable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. + * *
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
@@ -9138,6 +9150,12 @@ public final Observable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. + * *
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
@@ -9176,6 +9194,12 @@ public final Observable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. + * *
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
@@ -9217,6 +9241,12 @@ public final Observable> groupBy(Function + * Note also that ignoring groups or subscribing later (i.e., on another thread) will result in + * so-called group abandonment where a group will only contain one element and the group will be + * re-created over and over as new upstream items trigger a new group. The behavior is + * a tradeoff between no-dataloss, upstream cancellation and excessive group creation. + * *
*
Scheduler:
*
{@code groupBy} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java index 41643e0528..0711c74ff4 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java @@ -179,6 +179,13 @@ public void onNext(T t) { if (newGroup) { q.offer(group); drain(); + + if (group.state.tryAbandon()) { + cancel(key); + group.onComplete(); + + upstream.request(1); + } } } @@ -489,12 +496,17 @@ static final class State extends BasicIntQueueSubscription implements P final AtomicReference> actual = new AtomicReference>(); - final AtomicBoolean once = new AtomicBoolean(); - boolean outputFused; int produced; + final AtomicInteger once = new AtomicInteger(); + + static final int FRESH = 0; + static final int HAS_SUBSCRIBER = 1; + static final int ABANDONED = 2; + static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER; + State(int bufferSize, GroupBySubscriber parent, K key, boolean delayError) { this.queue = new SpscLinkedArrayQueue(bufferSize); this.parent = parent; @@ -513,19 +525,30 @@ public void request(long n) { @Override public void cancel() { if (cancelled.compareAndSet(false, true)) { - parent.cancel(key); + cancelParent(); } } @Override - public void subscribe(Subscriber s) { - if (once.compareAndSet(false, true)) { - s.onSubscribe(this); - actual.lazySet(s); - drain(); - } else { - EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), s); + public void subscribe(Subscriber subscriber) { + for (;;) { + int s = once.get(); + if ((s & HAS_SUBSCRIBER) != 0) { + break; + } + int u = s | HAS_SUBSCRIBER; + if (once.compareAndSet(s, u)) { + subscriber.onSubscribe(this); + actual.lazySet(subscriber); + if (cancelled.get()) { + actual.lazySet(null); + } else { + drain(); + } + return; + } } + EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), subscriber); } public void onNext(T t) { @@ -544,6 +567,16 @@ public void onComplete() { drain(); } + void cancelParent() { + if ((once.get() & ABANDONED) == 0) { + parent.cancel(key); + } + } + + boolean tryAbandon() { + return once.get() == FRESH && once.compareAndSet(FRESH, ABANDONED); + } + void drain() { if (getAndIncrement() != 0) { return; @@ -640,7 +673,9 @@ void drainNormal() { if (r != Long.MAX_VALUE) { requested.addAndGet(-e); } - parent.upstream.request(e); + if ((once.get() & ABANDONED) == 0) { + parent.upstream.request(e); + } } } @@ -708,7 +743,9 @@ public T poll() { int p = produced; if (p != 0) { produced = 0; - parent.upstream.request(p); + if ((once.get() & ABANDONED) == 0) { + parent.upstream.request(p); + } } return null; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java index 77eb93a870..5ac733eb8d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java @@ -110,6 +110,11 @@ public void onNext(T t) { getAndIncrement(); downstream.onNext(group); + + if (group.state.tryAbandon()) { + cancel(key); + group.onComplete(); + } } V v; @@ -151,7 +156,7 @@ public void onComplete() { @Override public void dispose() { - // cancelling the main source means we don't want any more groups + // canceling the main source means we don't want any more groups // but running groups still require new values if (cancelled.compareAndSet(false, true)) { if (decrementAndGet() == 0) { @@ -220,10 +225,15 @@ static final class State extends AtomicInteger implements Disposable, Obse final AtomicBoolean cancelled = new AtomicBoolean(); - final AtomicBoolean once = new AtomicBoolean(); - final AtomicReference> actual = new AtomicReference>(); + final AtomicInteger once = new AtomicInteger(); + + static final int FRESH = 0; + static final int HAS_SUBSCRIBER = 1; + static final int ABANDONED = 2; + static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER; + State(int bufferSize, GroupByObserver parent, K key, boolean delayError) { this.queue = new SpscLinkedArrayQueue(bufferSize); this.parent = parent; @@ -236,7 +246,7 @@ public void dispose() { if (cancelled.compareAndSet(false, true)) { if (getAndIncrement() == 0) { actual.lazySet(null); - parent.cancel(key); + cancelParent(); } } } @@ -248,17 +258,24 @@ public boolean isDisposed() { @Override public void subscribe(Observer observer) { - if (once.compareAndSet(false, true)) { - observer.onSubscribe(this); - actual.lazySet(observer); - if (cancelled.get()) { - actual.lazySet(null); - } else { - drain(); + for (;;) { + int s = once.get(); + if ((s & HAS_SUBSCRIBER) != 0) { + break; + } + int u = s | HAS_SUBSCRIBER; + if (once.compareAndSet(s, u)) { + observer.onSubscribe(this); + actual.lazySet(observer); + if (cancelled.get()) { + actual.lazySet(null); + } else { + drain(); + } + return; } - } else { - EmptyDisposable.error(new IllegalStateException("Only one Observer allowed!"), observer); } + EmptyDisposable.error(new IllegalStateException("Only one Observer allowed!"), observer); } public void onNext(T t) { @@ -315,11 +332,21 @@ void drain() { } } + void cancelParent() { + if ((once.get() & ABANDONED) == 0) { + parent.cancel(key); + } + } + + boolean tryAbandon() { + return once.get() == FRESH && once.compareAndSet(FRESH, ABANDONED); + } + boolean checkTerminated(boolean d, boolean empty, Observer a, boolean delayError) { if (cancelled.get()) { queue.clear(); - parent.cancel(key); actual.lazySet(null); + cancelParent(); return true; } diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java index 287668475e..fb4b89d947 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java @@ -102,7 +102,13 @@ public Flowable apply(GroupedFlowable v) { } }).subscribe(ts); - ts.assertValues(0, 5, 10, 15, 1, 6, 11, 16, 2, 7, 12, 17, 3, 8, 13, 18, 4, 9, 14, 19); + // Behavior change: this now counts as group abandonment because concatMap + // doesn't subscribe to the 2nd+ emitted groups immediately + ts.assertValues( + 0, 5, 10, 15, // First group is okay + // any other group gets abandoned so we get 16 one-element group + 1, 2, 3, 4, 6, 7, 8, 9, 11, 12, 13, 14, 16, 17, 18, 19 + ); ts.assertComplete(); ts.assertNoErrors(); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java index 8bcc07c7de..7feaab715c 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java @@ -157,7 +157,7 @@ private static Map> toMap(Flowable final ConcurrentHashMap> result = new ConcurrentHashMap>(); - flowable.blockingForEach(new Consumer>() { + flowable.doOnNext(new Consumer>() { @Override public void accept(final GroupedFlowable f) { @@ -171,7 +171,7 @@ public void accept(V v) { }); } - }); + }).blockingSubscribe(); return result; } @@ -541,7 +541,9 @@ public void onNext(GroupedFlowable s) { if (!latch.await(500, TimeUnit.MILLISECONDS)) { fail("timed out - never got completion"); } - assertEquals(2, eventCounter.get()); + // Behavior change: groups not subscribed immediately will be automatically abandoned + // so this leads to group recreation + assertEquals(100, eventCounter.get()); } @Test @@ -1558,7 +1560,9 @@ public void accept(GroupedFlowable g) { @Test public void oneGroupInnerRequestsTwiceBuffer() { - TestSubscriber ts1 = new TestSubscriber(0L); + // FIXME: delayed requesting in groupBy results in group abandonment + TestSubscriber ts1 = new TestSubscriber(1L); + final TestSubscriber ts2 = new TestSubscriber(0L); Flowable.range(1, Flowable.bufferSize() * 2) @@ -1576,16 +1580,6 @@ public void accept(GroupedFlowable g) { }) .subscribe(ts1); - ts1.assertNoValues(); - ts1.assertNoErrors(); - ts1.assertNotComplete(); - - ts2.assertNoValues(); - ts2.assertNoErrors(); - ts2.assertNotComplete(); - - ts1.request(1); - ts1.assertValueCount(1); ts1.assertNoErrors(); ts1.assertNotComplete(); @@ -2207,4 +2201,78 @@ public void accept(Object object) { }}; return evictingMapFactory; } + + @Test + public void cancelOverFlatmapRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final TestSubscriber ts = new TestSubscriber(); + + final PublishProcessor pp = PublishProcessor.create(); + + pp.groupBy(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + return v % 10; + } + }, Functions.identity(), false, 2048) + .flatMap(new Function, GroupedFlowable>() { + @Override + public GroupedFlowable apply(GroupedFlowable v) + throws Throwable { + return v; + } + }) + .subscribe(ts); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int j = 0; j < 1000; j++) { + pp.onNext(j); + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + assertFalse("Round " + i, pp.hasSubscribers()); + } + } + + @Test + public void abandonedGroupsNoDataloss() { + final List> groups = new ArrayList>(); + + Flowable.range(1, 1000) + .groupBy(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + return v % 10; + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(GroupedFlowable v) throws Throwable { + groups.add(v); + } + }) + .test() + .assertValueCount(1000) + .assertComplete() + .assertNoErrors(); + + Flowable.concat(groups) + .test() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java index a7ac22a95a..99698d18c2 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java @@ -150,7 +150,7 @@ private static Map> toMap(Observable> result = new ConcurrentHashMap>(); - observable.blockingForEach(new Consumer>() { + observable.doOnNext(new Consumer>() { @Override public void accept(final GroupedObservable o) { @@ -164,7 +164,7 @@ public void accept(V v) { }); } - }); + }).blockingSubscribe(); return result; } @@ -534,7 +534,9 @@ public void onNext(GroupedObservable s) { if (!latch.await(500, TimeUnit.MILLISECONDS)) { fail("timed out - never got completion"); } - assertEquals(2, eventCounter.get()); + // Behavior change: groups not subscribed immediately will be automatically abandoned + // so this leads to group recreation + assertEquals(100, eventCounter.get()); } @Test @@ -1539,4 +1541,78 @@ public void delayErrorSimpleComplete() { .test() .assertResult(1); } + + @Test + public void cancelOverFlatmapRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + + final TestObserver to = new TestObserver(); + + final PublishSubject ps = PublishSubject.create(); + + ps.groupBy(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + return v % 10; + } + }) + .flatMap(new Function, ObservableSource>() { + @Override + public ObservableSource apply(GroupedObservable v) + throws Throwable { + return v; + } + }) + .subscribe(to); + + Runnable r1 = new Runnable() { + @Override + public void run() { + for (int j = 0; j < 1000; j++) { + ps.onNext(j); + } + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + to.dispose(); + } + }; + + TestHelper.race(r1, r2); + + assertFalse("Round " + i, ps.hasObservers()); + } + } + + @Test + public void abandonedGroupsNoDataloss() { + final List> groups = new ArrayList>(); + + Observable.range(1, 1000) + .groupBy(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + return v % 10; + } + }) + .doOnNext(new Consumer>() { + @Override + public void accept(GroupedObservable v) throws Throwable { + groups.add(v); + } + }) + .test() + .assertValueCount(1000) + .assertComplete() + .assertNoErrors(); + + Observable.concat(groups) + .test() + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } }