Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add doOnLifecycle to Maybe, Single & Completable #6877

Merged
merged 1 commit into from
Jan 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions docs/Operator-Matrix.md

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
Expand Down Expand Up @@ -1751,6 +1752,34 @@ public final Completable doOnEvent(@NonNull Consumer<@Nullable ? super Throwable
return RxJavaPlugins.onAssembly(new CompletableDoOnEvent(this, onEvent));
}

/**
* Calls the appropriate {@code onXXX} method (shared between all {@link CompletableObserver}s) for the lifecycle events of
* the sequence (subscription, disposal).
* <p>
* <img width="640" height="257" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnLifecycle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* a {@link Consumer} called with the {@link Disposable} sent via {@link CompletableObserver#onSubscribe(Disposable)}
* @param onDispose
* called when the downstream disposes the {@code Disposable} via {@code dispose()}
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Completable doOnLifecycle(@NonNull Consumer<? super Disposable> onSubscribe, @NonNull Action onDispose) {
return doOnLifecycle(onSubscribe, Functions.emptyConsumer(),
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION,
Functions.EMPTY_ACTION, onDispose);
}

/**
* Returns a {@code Completable} instance that calls the various callbacks upon the specific
* lifecycle events.
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3388,6 +3388,34 @@ public final Maybe<T> doOnEvent(@NonNull BiConsumer<@Nullable ? super T, @Nullab
return RxJavaPlugins.onAssembly(new MaybeDoOnEvent<>(this, onEvent));
}

/**
* Calls the appropriate {@code onXXX} method (shared between all {@link MaybeObserver}s) for the lifecycle events of
* the sequence (subscription, disposal).
* <p>
* <img width="640" height="183" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.doOnLifecycle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* a {@link Consumer} called with the {@link Disposable} sent via {@link MaybeObserver#onSubscribe(Disposable)}
* @param onDispose
* called when the downstream disposes the {@code Disposable} via {@code dispose()}
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Maybe<T> doOnLifecycle(@NonNull Consumer<? super Disposable> onSubscribe, @NonNull Action onDispose) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
Objects.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new MaybeDoOnLifecycle<>(this, onSubscribe, onDispose));
}

/**
* Calls the shared {@link Consumer} with the {@link Disposable} sent through the {@code onSubscribe} for each
* {@link MaybeObserver} that subscribes to the current {@code Maybe}.
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,34 @@ public final Single<T> doFinally(@NonNull Action onFinally) {
return RxJavaPlugins.onAssembly(new SingleDoFinally<>(this, onFinally));
}

/**
* Calls the appropriate {@code onXXX} method (shared between all {@link SingleObserver}s) for the lifecycle events of
* the sequence (subscription, disposal).
* <p>
* <img width="640" height="232" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doOnLifecycle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* a {@link Consumer} called with the {@link Disposable} sent via {@link SingleObserver#onSubscribe(Disposable)}
* @param onDispose
* called when the downstream disposes the {@code Disposable} via {@code dispose()}
* @return the new {@code Single} instance
* @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Single<T> doOnLifecycle(@NonNull Consumer<? super Disposable> onSubscribe, @NonNull Action onDispose) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
Objects.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new SingleDoOnLifecycle<>(this, onSubscribe, onDispose));
}

/**
* Calls the shared consumer with the {@link Disposable} sent through the {@code onSubscribe} for each
* {@link SingleObserver} that subscribes to the current {@code Single}.
Expand Down Expand Up @@ -3455,6 +3483,7 @@ public final Flowable<T> mergeWith(@NonNull SingleSource<? extends T> other) {
* @return the new {@link Maybe} instance
* @throws NullPointerException if {@code clazz} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.maybe;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Invokes callbacks upon {@code onSubscribe} from upstream and
* {@code dispose} from downstream.
*
* @param <T> the element type of the flow
* @since 3.0.0
*/
public final class MaybeDoOnLifecycle<T> extends AbstractMaybeWithUpstream<T, T> {

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

public MaybeDoOnLifecycle(Maybe<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
super(upstream);
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new MaybeLifecycleObserver<>(observer, onSubscribe, onDispose));
}

static final class MaybeLifecycleObserver<T> implements MaybeObserver<T>, Disposable {

final MaybeObserver<? super T> downstream;

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

Disposable upstream;

MaybeLifecycleObserver(MaybeObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
public void onSubscribe(@NonNull Disposable d) {
// this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior
try {
onSubscribe.accept(d);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
d.dispose();
this.upstream = DisposableHelper.DISPOSED;
EmptyDisposable.error(e, downstream);
return;
}
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}

@Override
public void onSuccess(@NonNull T t) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onSuccess(t);
}
}

@Override
public void onError(@NonNull Throwable e) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onComplete() {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onComplete();
}
}

@Override
public void dispose() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.single;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Invokes callbacks upon {@code onSubscribe} from upstream and
* {@code dispose} from downstream.
*
* @param <T> the element type of the flow
* @since 3.0.0
*/
public final class SingleDoOnLifecycle<T> extends Single<T> {

final Single<T> source;

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

public SingleDoOnLifecycle(Single<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
this.source = upstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
source.subscribe(new SingleLifecycleObserver<>(observer, onSubscribe, onDispose));
}

static final class SingleLifecycleObserver<T> implements SingleObserver<T>, Disposable {

final SingleObserver<? super T> downstream;

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

Disposable upstream;

SingleLifecycleObserver(SingleObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
public void onSubscribe(@NonNull Disposable d) {
// this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior
try {
onSubscribe.accept(d);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
d.dispose();
this.upstream = DisposableHelper.DISPOSED;
EmptyDisposable.error(e, downstream);
return;
}
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}

@Override
public void onSuccess(@NonNull T t) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onSuccess(t);
}
}

@Override
public void onError(@NonNull Throwable e) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void dispose() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
}
Loading