Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Make using() resource disposal order consistent with eager-mode #6534

Merged
merged 1 commit into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1026,8 +1026,11 @@ public static <R> Completable using(Supplier<R> resourceSupplier,
* @param completableFunction the function that given a resource returns a non-null
* Completable instance that will be subscribed to
* @param disposer the consumer that disposes the resource created by the resource supplier
* @param eager if true, the resource is disposed before the terminal event is emitted, if false, the
* resource is disposed after the terminal event has been emitted
* @param eager
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
* or just before the emission of a terminal event ({@code onComplete} or {@code onError}).
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
* or just after the emission of a terminal event ({@code onComplete} or {@code onError}).
* @return the new Completable instance
*/
@CheckReturnValue
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4603,8 +4603,10 @@ public static <T, D> Flowable<T> using(Supplier<? extends D> resourceSupplier,
* @param resourceDisposer
* the function that will dispose of the resource
* @param eager
* if {@code true} then disposal will happen either on cancellation or just before emission of
* a terminal event ({@code onComplete} or {@code onError}).
* If {@code true} then resource disposal will happen either on a {@code cancel()} call before the upstream is disposed
* or just before the emission of a terminal event ({@code onComplete} or {@code onError}).
* If {@code false} the resource disposal will happen either on a {@code cancel()} call after the upstream is disposed
* or just after the emission of a terminal event ({@code onComplete} or {@code onError}).
* @return the Publisher whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
* @since 2.0
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1766,8 +1766,10 @@ public static <T, D> Maybe<T> using(Supplier<? extends D> resourceSupplier,
* @param resourceDisposer
* the function that will dispose of the resource
* @param eager
* if {@code true} then disposal will happen either on a dispose() call or just before emission of
* a terminal event ({@code onComplete} or {@code onError}).
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
* or just before the emission of a terminal event ({@code onSuccess}, {@code onComplete} or {@code onError}).
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
* or just after the emission of a terminal event ({@code onSuccess}, {@code onComplete} or {@code onError}).
* @return the Maybe whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
*/
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4089,8 +4089,10 @@ public static <T, D> Observable<T> using(Supplier<? extends D> resourceSupplier,
* @param disposer
* the function that will dispose of the resource
* @param eager
* if {@code true} then disposal will happen either on a dispose() call or just before emission of
* a terminal event ({@code onComplete} or {@code onError}).
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
* or just before the emission of a terminal event ({@code onComplete} or {@code onError}).
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
* or just after the emission of a terminal event ({@code onComplete} or {@code onError}).
* @return the ObservableSource whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
* @since 2.0
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1477,8 +1477,10 @@ public static <T, U> Single<T> using(Supplier<U> resourceSupplier,
* that particular resource when the generated SingleSource terminates
* (successfully or with an error) or gets disposed.
* @param eager
* if true, the disposer is called before the terminal event is signalled
* if false, the disposer is called after the terminal event is delivered to downstream
* If {@code true} then resource disposal will happen either on a {@code dispose()} call before the upstream is disposed
* or just before the emission of a terminal event ({@code onSuccess} or {@code onError}).
* If {@code false} the resource disposal will happen either on a {@code dispose()} call after the upstream is disposed
* or just after the emission of a terminal event ({@code onSuccess} or {@code onError}).
* @return the new Single instance
* @since 2.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,19 @@ static final class UsingObserver<R>

@Override
public void dispose() {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResourceAfter();
if (eager) {
disposeResource();
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
} else {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResource();
}
}

@SuppressWarnings("unchecked")
void disposeResourceAfter() {
void disposeResource() {
Object resource = getAndSet(this);
if (resource != this) {
try {
Expand Down Expand Up @@ -159,7 +165,7 @@ public void onError(Throwable e) {
downstream.onError(e);

if (!eager) {
disposeResourceAfter();
disposeResource();
}
}

Expand All @@ -185,7 +191,7 @@ public void onComplete() {
downstream.onComplete();

if (!eager) {
disposeResourceAfter();
disposeResource();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void onError(Throwable t) {
} else {
downstream.onError(t);
upstream.cancel();
disposeAfter();
disposeResource();
}
}

Expand All @@ -148,7 +148,7 @@ public void onComplete() {
} else {
downstream.onComplete();
upstream.cancel();
disposeAfter();
disposeResource();
}
}

Expand All @@ -159,11 +159,18 @@ public void request(long n) {

@Override
public void cancel() {
disposeAfter();
upstream.cancel();
if (eager) {
disposeResource();
upstream.cancel();
upstream = SubscriptionHelper.CANCELLED;
} else {
upstream.cancel();
upstream = SubscriptionHelper.CANCELLED;
disposeResource();
}
}

void disposeAfter() {
void disposeResource() {
if (compareAndSet(false, true)) {
try {
disposer.accept(resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,19 @@ static final class UsingObserver<T, D>

@Override
public void dispose() {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResourceAfter();
if (eager) {
disposeResource();
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
} else {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResource();
}
}

@SuppressWarnings("unchecked")
void disposeResourceAfter() {
void disposeResource() {
Object resource = getAndSet(this);
if (resource != this) {
try {
Expand Down Expand Up @@ -171,7 +177,7 @@ public void onSuccess(T value) {
downstream.onSuccess(value);

if (!eager) {
disposeResourceAfter();
disposeResource();
}
}

Expand All @@ -196,7 +202,7 @@ public void onError(Throwable e) {
downstream.onError(e);

if (!eager) {
disposeResourceAfter();
disposeResource();
}
}

Expand All @@ -222,7 +228,7 @@ public void onComplete() {
downstream.onComplete();

if (!eager) {
disposeResourceAfter();
disposeResource();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onError(Throwable t) {
} else {
downstream.onError(t);
upstream.dispose();
disposeAfter();
disposeResource();
}
}

Expand All @@ -142,22 +142,29 @@ public void onComplete() {
} else {
downstream.onComplete();
upstream.dispose();
disposeAfter();
disposeResource();
}
}

@Override
public void dispose() {
disposeAfter();
upstream.dispose();
if (eager) {
disposeResource();
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
} else {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResource();
}
}

@Override
public boolean isDisposed() {
return get();
}

void disposeAfter() {
void disposeResource() {
if (compareAndSet(false, true)) {
try {
disposer.accept(resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,15 @@ static final class UsingSingleObserver<T, U> extends

@Override
public void dispose() {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeAfter();
if (eager) {
disposeResource();
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
} else {
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
disposeResource();
}
}

@Override
Expand Down Expand Up @@ -148,7 +154,7 @@ public void onSuccess(T value) {
downstream.onSuccess(value);

if (!eager) {
disposeAfter();
disposeResource();
}
}

Expand All @@ -174,12 +180,12 @@ public void onError(Throwable e) {
downstream.onError(e);

if (!eager) {
disposeAfter();
disposeResource();
}
}

@SuppressWarnings("unchecked")
void disposeAfter() {
void disposeResource() {
Object u = getAndSet(this);
if (u != this) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,4 +570,67 @@ public void run() {
}
}

@Test
public void eagerDisposeResourceThenDisposeUpstream() {
final StringBuilder sb = new StringBuilder();

TestObserver<Void> to = Completable.using(Functions.justSupplier(1),
new Function<Integer, Completable>() {
@Override
public Completable apply(Integer t) throws Throwable {
return Completable.never()
.doOnDispose(new Action() {
@Override
public void run() throws Throwable {
sb.append("Dispose");
}
})
;
}
}, new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Throwable {
sb.append("Resource");
}
}, true)
.test()
;
to.assertEmpty();

to.dispose();

assertEquals("ResourceDispose", sb.toString());
}

@Test
public void nonEagerDisposeUpstreamThenDisposeResource() {
final StringBuilder sb = new StringBuilder();

TestObserver<Void> to = Completable.using(Functions.justSupplier(1),
new Function<Integer, Completable>() {
@Override
public Completable apply(Integer t) throws Throwable {
return Completable.never()
.doOnDispose(new Action() {
@Override
public void run() throws Throwable {
sb.append("Dispose");
}
})
;
}
}, new Consumer<Integer>() {
@Override
public void accept(Integer t) throws Throwable {
sb.append("Resource");
}
}, false)
.test()
;
to.assertEmpty();

to.dispose();

assertEquals("DisposeResource", sb.toString());
}
}
Loading