From 7cdd508e5f608d5394d47c3639c69e2088a4ce6c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 21 Jun 2019 16:31:42 +0200 Subject: [PATCH] 3.x: Make using() resource disposal order consistent with eager-mode --- src/main/java/io/reactivex/Completable.java | 7 +- src/main/java/io/reactivex/Flowable.java | 6 +- src/main/java/io/reactivex/Maybe.java | 6 +- src/main/java/io/reactivex/Observable.java | 6 +- src/main/java/io/reactivex/Single.java | 6 +- .../completable/CompletableUsing.java | 18 +++-- .../operators/flowable/FlowableUsing.java | 17 +++-- .../internal/operators/maybe/MaybeUsing.java | 20 ++++-- .../operators/observable/ObservableUsing.java | 17 +++-- .../operators/single/SingleUsing.java | 18 +++-- .../completable/CompletableUsingTest.java | 63 ++++++++++++++++++ .../operators/flowable/FlowableUsingTest.java | 59 +++++++++++++++++ .../operators/maybe/MaybeUsingTest.java | 65 +++++++++++++++++++ .../observable/ObservableUsingTest.java | 59 +++++++++++++++++ .../operators/single/SingleUsingTest.java | 64 ++++++++++++++++++ 15 files changed, 392 insertions(+), 39 deletions(-) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 994054253c..f83f229622 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1026,8 +1026,11 @@ public static Completable using(Supplier resourceSupplier, * @param completableFunction the function that given a resource returns a non-null * Completable instance that will be subscribed to * @param disposer the consumer that disposes the resource created by the resource supplier - * @param eager if true, the resource is disposed before the terminal event is emitted, if false, the - * resource is disposed after the terminal event has been emitted + * @param eager + * If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed + * or just before the emission of a terminal event ({@code onComplete} or {@code onError}). + * If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed + * or just after the emission of a terminal event ({@code onComplete} or {@code onError}). * @return the new Completable instance */ @CheckReturnValue diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index fdc421fc4d..619c0b2bc3 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -4603,8 +4603,10 @@ public static Flowable using(Supplier resourceSupplier, * @param resourceDisposer * the function that will dispose of the resource * @param eager - * if {@code true} then disposal will happen either on cancellation or just before emission of - * a terminal event ({@code onComplete} or {@code onError}). + * If {@code true} then resource disposal will happen either on a {@code cancel()} call before the upstream is disposed + * or just before the emission of a terminal event ({@code onComplete} or {@code onError}). + * If {@code false} the resource disposal will happen either on a {@code cancel()} call after the upstream is disposed + * or just after the emission of a terminal event ({@code onComplete} or {@code onError}). * @return the Publisher whose lifetime controls the lifetime of the dependent resource object * @see ReactiveX operators documentation: Using * @since 2.0 diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index bf8affc838..b0af16cb19 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1766,8 +1766,10 @@ public static Maybe using(Supplier resourceSupplier, * @param resourceDisposer * the function that will dispose of the resource * @param eager - * if {@code true} then disposal will happen either on a dispose() call or just before emission of - * a terminal event ({@code onComplete} or {@code onError}). + * If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed + * or just before the emission of a terminal event ({@code onSuccess}, {@code onComplete} or {@code onError}). + * If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed + * or just after the emission of a terminal event ({@code onSuccess}, {@code onComplete} or {@code onError}). * @return the Maybe whose lifetime controls the lifetime of the dependent resource object * @see ReactiveX operators documentation: Using */ diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index a090a511b8..a1572326e4 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4089,8 +4089,10 @@ public static Observable using(Supplier resourceSupplier, * @param disposer * the function that will dispose of the resource * @param eager - * if {@code true} then disposal will happen either on a dispose() call or just before emission of - * a terminal event ({@code onComplete} or {@code onError}). + * If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed + * or just before the emission of a terminal event ({@code onComplete} or {@code onError}). + * If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed + * or just after the emission of a terminal event ({@code onComplete} or {@code onError}). * @return the ObservableSource whose lifetime controls the lifetime of the dependent resource object * @see ReactiveX operators documentation: Using * @since 2.0 diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index d2932cbc32..9a62678423 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1477,8 +1477,10 @@ public static Single using(Supplier resourceSupplier, * that particular resource when the generated SingleSource terminates * (successfully or with an error) or gets disposed. * @param eager - * if true, the disposer is called before the terminal event is signalled - * if false, the disposer is called after the terminal event is delivered to downstream + * If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed + * or just before the emission of a terminal event ({@code onSuccess} or {@code onError}). + * If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed + * or just after the emission of a terminal event ({@code onSuccess} or {@code onError}). * @return the new Single instance * @since 2.0 */ diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java index f6fa30ef7f..07b2b3e653 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableUsing.java @@ -106,13 +106,19 @@ static final class UsingObserver @Override public void dispose() { - upstream.dispose(); - upstream = DisposableHelper.DISPOSED; - disposeResourceAfter(); + if (eager) { + disposeResource(); + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } else { + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + disposeResource(); + } } @SuppressWarnings("unchecked") - void disposeResourceAfter() { + void disposeResource() { Object resource = getAndSet(this); if (resource != this) { try { @@ -159,7 +165,7 @@ public void onError(Throwable e) { downstream.onError(e); if (!eager) { - disposeResourceAfter(); + disposeResource(); } } @@ -185,7 +191,7 @@ public void onComplete() { downstream.onComplete(); if (!eager) { - disposeResourceAfter(); + disposeResource(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java index 26cc516de2..b25490fd24 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java @@ -126,7 +126,7 @@ public void onError(Throwable t) { } else { downstream.onError(t); upstream.cancel(); - disposeAfter(); + disposeResource(); } } @@ -148,7 +148,7 @@ public void onComplete() { } else { downstream.onComplete(); upstream.cancel(); - disposeAfter(); + disposeResource(); } } @@ -159,11 +159,18 @@ public void request(long n) { @Override public void cancel() { - disposeAfter(); - upstream.cancel(); + if (eager) { + disposeResource(); + upstream.cancel(); + upstream = SubscriptionHelper.CANCELLED; + } else { + upstream.cancel(); + upstream = SubscriptionHelper.CANCELLED; + disposeResource(); + } } - void disposeAfter() { + void disposeResource() { if (compareAndSet(false, true)) { try { disposer.accept(resource); diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java index 5021eb9996..d2e499c9ed 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeUsing.java @@ -117,13 +117,19 @@ static final class UsingObserver @Override public void dispose() { - upstream.dispose(); - upstream = DisposableHelper.DISPOSED; - disposeResourceAfter(); + if (eager) { + disposeResource(); + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } else { + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + disposeResource(); + } } @SuppressWarnings("unchecked") - void disposeResourceAfter() { + void disposeResource() { Object resource = getAndSet(this); if (resource != this) { try { @@ -171,7 +177,7 @@ public void onSuccess(T value) { downstream.onSuccess(value); if (!eager) { - disposeResourceAfter(); + disposeResource(); } } @@ -196,7 +202,7 @@ public void onError(Throwable e) { downstream.onError(e); if (!eager) { - disposeResourceAfter(); + disposeResource(); } } @@ -222,7 +228,7 @@ public void onComplete() { downstream.onComplete(); if (!eager) { - disposeResourceAfter(); + disposeResource(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java index 0571f4293c..dfc981de1e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java @@ -120,7 +120,7 @@ public void onError(Throwable t) { } else { downstream.onError(t); upstream.dispose(); - disposeAfter(); + disposeResource(); } } @@ -142,14 +142,21 @@ public void onComplete() { } else { downstream.onComplete(); upstream.dispose(); - disposeAfter(); + disposeResource(); } } @Override public void dispose() { - disposeAfter(); - upstream.dispose(); + if (eager) { + disposeResource(); + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } else { + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + disposeResource(); + } } @Override @@ -157,7 +164,7 @@ public boolean isDisposed() { return get(); } - void disposeAfter() { + void disposeResource() { if (compareAndSet(false, true)) { try { disposer.accept(resource); diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java b/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java index 8973d4f59c..55fa142a26 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleUsing.java @@ -106,9 +106,15 @@ static final class UsingSingleObserver extends @Override public void dispose() { - upstream.dispose(); - upstream = DisposableHelper.DISPOSED; - disposeAfter(); + if (eager) { + disposeResource(); + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } else { + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + disposeResource(); + } } @Override @@ -148,7 +154,7 @@ public void onSuccess(T value) { downstream.onSuccess(value); if (!eager) { - disposeAfter(); + disposeResource(); } } @@ -174,12 +180,12 @@ public void onError(Throwable e) { downstream.onError(e); if (!eager) { - disposeAfter(); + disposeResource(); } } @SuppressWarnings("unchecked") - void disposeAfter() { + void disposeResource() { Object u = getAndSet(this); if (u != this) { try { diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java index 5a66d52eaa..3d5a580e30 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableUsingTest.java @@ -570,4 +570,67 @@ public void run() { } } + @Test + public void eagerDisposeResourceThenDisposeUpstream() { + final StringBuilder sb = new StringBuilder(); + + TestObserver to = Completable.using(Functions.justSupplier(1), + new Function() { + @Override + public Completable apply(Integer t) throws Throwable { + return Completable.never() + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, true) + .test() + ; + to.assertEmpty(); + + to.dispose(); + + assertEquals("ResourceDispose", sb.toString()); + } + + @Test + public void nonEagerDisposeUpstreamThenDisposeResource() { + final StringBuilder sb = new StringBuilder(); + + TestObserver to = Completable.using(Functions.justSupplier(1), + new Function() { + @Override + public Completable apply(Integer t) throws Throwable { + return Completable.never() + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, false) + .test() + ; + to.assertEmpty(); + + to.dispose(); + + assertEquals("DisposeResource", sb.toString()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java index 2a7523ace0..823d713141 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableUsingTest.java @@ -677,4 +677,63 @@ protected void subscribeActual(Subscriber subscriber) { }), Functions.emptyConsumer(), true) .subscribe(ts); } + + @Test + public void eagerDisposeResourceThenDisposeUpstream() { + final StringBuilder sb = new StringBuilder(); + + Flowable.using(Functions.justSupplier(1), + new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.range(1, 2) + .doOnCancel(new Action() { + @Override + public void run() throws Throwable { + sb.append("Cancel"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, true) + .take(1) + .test() + .assertResult(1); + + assertEquals("ResourceCancel", sb.toString()); + } + + @Test + public void nonEagerDisposeUpstreamThenDisposeResource() { + final StringBuilder sb = new StringBuilder(); + + Flowable.using(Functions.justSupplier(1), + new Function>() { + @Override + public Flowable apply(Integer t) throws Throwable { + return Flowable.range(1, 2) + .doOnCancel(new Action() { + @Override + public void run() throws Throwable { + sb.append("Cancel"); + } + }); + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, false) + .take(1) + .test() + .assertResult(1); + + assertEquals("CancelResource", sb.toString()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeUsingTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeUsingTest.java index bfa2e89703..0c94e16ce1 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeUsingTest.java @@ -23,6 +23,7 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.PublishSubject; @@ -563,4 +564,68 @@ public void run() { TestHelper.race(r1, r2); } } + + @Test + public void eagerDisposeResourceThenDisposeUpstream() { + final StringBuilder sb = new StringBuilder(); + + TestObserver to = Maybe.using(Functions.justSupplier(1), + new Function>() { + @Override + public Maybe apply(Integer t) throws Throwable { + return Maybe.never() + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, true) + .test() + ; + to.assertEmpty(); + + to.dispose(); + + assertEquals("ResourceDispose", sb.toString()); + } + + @Test + public void nonEagerDisposeUpstreamThenDisposeResource() { + final StringBuilder sb = new StringBuilder(); + + TestObserver to = Maybe.using(Functions.justSupplier(1), + new Function>() { + @Override + public Maybe apply(Integer t) throws Throwable { + return Maybe.never() + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, false) + .test() + ; + to.assertEmpty(); + + to.dispose(); + + assertEquals("DisposeResource", sb.toString()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java index 38e2710692..4ef2828844 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java @@ -606,4 +606,63 @@ protected void subscribeActual(Observer observer) { }), Functions.emptyConsumer(), true) .subscribe(to); } + + @Test + public void eagerDisposeResourceThenDisposeUpstream() { + final StringBuilder sb = new StringBuilder(); + + Observable.using(Functions.justSupplier(1), + new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.range(1, 2) + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, true) + .take(1) + .test() + .assertResult(1); + + assertEquals("ResourceDispose", sb.toString()); + } + + @Test + public void nonEagerDisposeUpstreamThenDisposeResource() { + final StringBuilder sb = new StringBuilder(); + + Observable.using(Functions.justSupplier(1), + new Function>() { + @Override + public Observable apply(Integer t) throws Throwable { + return Observable.range(1, 2) + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }); + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, false) + .take(1) + .test() + .assertResult(1); + + assertEquals("DisposeResource", sb.toString()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleUsingTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleUsingTest.java index e3c994ad33..d90b0377c8 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleUsingTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleUsingTest.java @@ -327,4 +327,68 @@ public void run() { assertTrue(d.isDisposed()); } } + + @Test + public void eagerDisposeResourceThenDisposeUpstream() { + final StringBuilder sb = new StringBuilder(); + + TestObserver to = Single.using(Functions.justSupplier(1), + new Function>() { + @Override + public Single apply(Integer t) throws Throwable { + return Single.never() + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, true) + .test() + ; + to.assertEmpty(); + + to.dispose(); + + assertEquals("ResourceDispose", sb.toString()); + } + + @Test + public void nonEagerDisposeUpstreamThenDisposeResource() { + final StringBuilder sb = new StringBuilder(); + + TestObserver to = Single.using(Functions.justSupplier(1), + new Function>() { + @Override + public Single apply(Integer t) throws Throwable { + return Single.never() + .doOnDispose(new Action() { + @Override + public void run() throws Throwable { + sb.append("Dispose"); + } + }) + ; + } + }, new Consumer() { + @Override + public void accept(Integer t) throws Throwable { + sb.append("Resource"); + } + }, false) + .test() + ; + to.assertEmpty(); + + to.dispose(); + + assertEquals("DisposeResource", sb.toString()); + } }