diff --git a/docs/Operator-Matrix.md b/docs/Operator-Matrix.md index 10427533cc..a44eed94ec 100644 --- a/docs/Operator-Matrix.md +++ b/docs/Operator-Matrix.md @@ -71,7 +71,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `doOnEach`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([45](#notes-45))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([45](#notes-45))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `doOnError`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `doOnEvent`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([46](#notes-46))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([46](#notes-46))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`doOnLifecycle`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)| +`doOnLifecycle`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `doOnNext`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([47](#notes-47))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([47](#notes-47))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `doOnRequest`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))| `doOnSubscribe`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| @@ -144,7 +144,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `mergeWith`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `never`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `observeOn`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`ofType`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([82](#notes-82))| +`ofType`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([82](#notes-82))| `onBackpressureBuffer`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))| `onBackpressureDrop`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))| `onBackpressureLatest`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([48](#notes-48))| @@ -219,7 +219,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `to`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `toCompletionStage`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([98](#notes-98))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([98](#notes-98))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `toFlowable`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([99](#notes-99))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`toFuture`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)| +`toFuture`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `toList`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([13](#notes-13))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([14](#notes-14))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([15](#notes-15))| `toMap`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([13](#notes-13))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([14](#notes-14))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([15](#notes-15))| `toMaybe`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([100](#notes-100))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([100](#notes-100))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([99](#notes-99))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| @@ -237,7 +237,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `zip`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([108](#notes-108))| `zipArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([109](#notes-109))| `zipWith`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([110](#notes-110))| -**237 operators** | **215** | **209** | **111** | **95** | **76** | +**237 operators** | **215** | **209** | **113** | **97** | **78** | #### Notes 1 Use [`contains()`](#contains).
@@ -372,19 +372,13 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat 17. Single.concatMapMaybe() 18. Maybe.concatMapSingle() 19. Single.concatMapSingle() -20. Maybe.doOnLifecycle() -21. Single.doOnLifecycle() -22. Completable.doOnLifecycle() -23. Single.mergeArray() -24. Single.mergeArrayDelayError() -25. Single.ofType() -26. Completable.onErrorReturn() -27. Completable.onErrorReturnItem() -28. Maybe.safeSubscribe() -29. Single.safeSubscribe() -30. Completable.safeSubscribe() -31. Completable.sequenceEqual() -32. Maybe.startWith() -33. Single.startWith() -34. Maybe.toFuture() -35. Completable.toFuture() +20. Single.mergeArray() +21. Single.mergeArrayDelayError() +22. Completable.onErrorReturn() +23. Completable.onErrorReturnItem() +24. Maybe.safeSubscribe() +25. Single.safeSubscribe() +26. Completable.safeSubscribe() +27. Completable.sequenceEqual() +28. Maybe.startWith() +29. Single.startWith() diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index 1c6214e1da..9febbd1204 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -18,6 +18,7 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; @@ -1751,6 +1752,34 @@ public final Completable doOnEvent(@NonNull Consumer<@Nullable ? super Throwable return RxJavaPlugins.onAssembly(new CompletableDoOnEvent(this, onEvent)); } + /** + * Calls the appropriate {@code onXXX} method (shared between all {@link CompletableObserver}s) for the lifecycle events of + * the sequence (subscription, disposal). + *

+ * + *

+ *
Scheduler:
+ *
{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onSubscribe + * a {@link Consumer} called with the {@link Disposable} sent via {@link CompletableObserver#onSubscribe(Disposable)} + * @param onDispose + * called when the downstream disposes the {@code Disposable} via {@code dispose()} + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null} + * @see ReactiveX operators documentation: Do + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Completable doOnLifecycle(@NonNull Consumer onSubscribe, @NonNull Action onDispose) { + return doOnLifecycle(onSubscribe, Functions.emptyConsumer(), + Functions.EMPTY_ACTION, Functions.EMPTY_ACTION, + Functions.EMPTY_ACTION, onDispose); + } + /** * Returns a {@code Completable} instance that calls the various callbacks upon the specific * lifecycle events. diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index b2a6465eae..b3ed8213d4 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -3388,6 +3388,34 @@ public final Maybe doOnEvent(@NonNull BiConsumer<@Nullable ? super T, @Nullab return RxJavaPlugins.onAssembly(new MaybeDoOnEvent<>(this, onEvent)); } + /** + * Calls the appropriate {@code onXXX} method (shared between all {@link MaybeObserver}s) for the lifecycle events of + * the sequence (subscription, disposal). + *

+ * + *

+ *
Scheduler:
+ *
{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onSubscribe + * a {@link Consumer} called with the {@link Disposable} sent via {@link MaybeObserver#onSubscribe(Disposable)} + * @param onDispose + * called when the downstream disposes the {@code Disposable} via {@code dispose()} + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null} + * @see ReactiveX operators documentation: Do + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Maybe doOnLifecycle(@NonNull Consumer onSubscribe, @NonNull Action onDispose) { + Objects.requireNonNull(onSubscribe, "onSubscribe is null"); + Objects.requireNonNull(onDispose, "onDispose is null"); + return RxJavaPlugins.onAssembly(new MaybeDoOnLifecycle<>(this, onSubscribe, onDispose)); + } + /** * Calls the shared {@link Consumer} with the {@link Disposable} sent through the {@code onSubscribe} for each * {@link MaybeObserver} that subscribes to the current {@code Maybe}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 12400b1fd0..81b0aaaffe 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -2684,6 +2684,34 @@ public final Single doFinally(@NonNull Action onFinally) { return RxJavaPlugins.onAssembly(new SingleDoFinally<>(this, onFinally)); } + /** + * Calls the appropriate {@code onXXX} method (shared between all {@link SingleObserver}s) for the lifecycle events of + * the sequence (subscription, disposal). + *

+ * + *

+ *
Scheduler:
+ *
{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param onSubscribe + * a {@link Consumer} called with the {@link Disposable} sent via {@link SingleObserver#onSubscribe(Disposable)} + * @param onDispose + * called when the downstream disposes the {@code Disposable} via {@code dispose()} + * @return the new {@code Single} instance + * @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null} + * @see ReactiveX operators documentation: Do + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Single doOnLifecycle(@NonNull Consumer onSubscribe, @NonNull Action onDispose) { + Objects.requireNonNull(onSubscribe, "onSubscribe is null"); + Objects.requireNonNull(onDispose, "onDispose is null"); + return RxJavaPlugins.onAssembly(new SingleDoOnLifecycle<>(this, onSubscribe, onDispose)); + } + /** * Calls the shared consumer with the {@link Disposable} sent through the {@code onSubscribe} for each * {@link SingleObserver} that subscribes to the current {@code Single}. @@ -3455,6 +3483,7 @@ public final Flowable mergeWith(@NonNull SingleSource other) { * @return the new {@link Maybe} instance * @throws NullPointerException if {@code clazz} is {@code null} * @see ReactiveX operators documentation: Filter + * @since 3.0.0 */ @CheckReturnValue @NonNull diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDoOnLifecycle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDoOnLifecycle.java new file mode 100644 index 0000000000..3355c79014 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDoOnLifecycle.java @@ -0,0 +1,125 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.operators.maybe; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Invokes callbacks upon {@code onSubscribe} from upstream and + * {@code dispose} from downstream. + * + * @param the element type of the flow + * @since 3.0.0 + */ +public final class MaybeDoOnLifecycle extends AbstractMaybeWithUpstream { + + final Consumer onSubscribe; + + final Action onDispose; + + public MaybeDoOnLifecycle(Maybe upstream, Consumer onSubscribe, + Action onDispose) { + super(upstream); + this.onSubscribe = onSubscribe; + this.onDispose = onDispose; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new MaybeLifecycleObserver<>(observer, onSubscribe, onDispose)); + } + + static final class MaybeLifecycleObserver implements MaybeObserver, Disposable { + + final MaybeObserver downstream; + + final Consumer onSubscribe; + + final Action onDispose; + + Disposable upstream; + + MaybeLifecycleObserver(MaybeObserver downstream, Consumer onSubscribe, Action onDispose) { + this.downstream = downstream; + this.onSubscribe = onSubscribe; + this.onDispose = onDispose; + } + + @Override + public void onSubscribe(@NonNull Disposable d) { + // this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior + try { + onSubscribe.accept(d); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + d.dispose(); + this.upstream = DisposableHelper.DISPOSED; + EmptyDisposable.error(e, downstream); + return; + } + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(@NonNull T t) { + if (upstream != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; + downstream.onSuccess(t); + } + } + + @Override + public void onError(@NonNull Throwable e) { + if (upstream != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; + downstream.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void onComplete() { + if (upstream != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; + downstream.onComplete(); + } + } + + @Override + public void dispose() { + try { + onDispose.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); + } + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycle.java new file mode 100644 index 0000000000..288e8970cd --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycle.java @@ -0,0 +1,119 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.operators.single; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Invokes callbacks upon {@code onSubscribe} from upstream and + * {@code dispose} from downstream. + * + * @param the element type of the flow + * @since 3.0.0 + */ +public final class SingleDoOnLifecycle extends Single { + + final Single source; + + final Consumer onSubscribe; + + final Action onDispose; + + public SingleDoOnLifecycle(Single upstream, Consumer onSubscribe, + Action onDispose) { + this.source = upstream; + this.onSubscribe = onSubscribe; + this.onDispose = onDispose; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + source.subscribe(new SingleLifecycleObserver<>(observer, onSubscribe, onDispose)); + } + + static final class SingleLifecycleObserver implements SingleObserver, Disposable { + + final SingleObserver downstream; + + final Consumer onSubscribe; + + final Action onDispose; + + Disposable upstream; + + SingleLifecycleObserver(SingleObserver downstream, Consumer onSubscribe, Action onDispose) { + this.downstream = downstream; + this.onSubscribe = onSubscribe; + this.onDispose = onDispose; + } + + @Override + public void onSubscribe(@NonNull Disposable d) { + // this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior + try { + onSubscribe.accept(d); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + d.dispose(); + this.upstream = DisposableHelper.DISPOSED; + EmptyDisposable.error(e, downstream); + return; + } + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(@NonNull T t) { + if (upstream != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; + downstream.onSuccess(t); + } + } + + @Override + public void onError(@NonNull Throwable e) { + if (upstream != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; + downstream.onError(e); + } else { + RxJavaPlugins.onError(e); + } + } + + @Override + public void dispose() { + try { + onDispose.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); + } + upstream.dispose(); + upstream = DisposableHelper.DISPOSED; + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableDoOnLifecycleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableDoOnLifecycleTest.java new file mode 100644 index 0000000000..3e85e1e027 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableDoOnLifecycleTest.java @@ -0,0 +1,153 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.completable; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.CompletableSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class CompletableDoOnLifecycleTest extends RxJavaTest { + + @Test + public void empty() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + Completable.complete() + .doOnLifecycle(onSubscribe, onDispose) + .test() + .assertResult(); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + Completable.error(new TestException()) + .doOnLifecycle(onSubscribe, onDispose) + .test() + .assertFailure(TestException.class); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + } + + @Test + public void onSubscribeCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + doThrow(new TestException("First")).when(onSubscribe).accept(any()); + + Disposable bs = Disposable.empty(); + + new Completable() { + @Override + protected void subscribeActual(CompletableObserver observer) { + observer.onSubscribe(bs); + observer.onError(new TestException("Second")); + observer.onComplete(); + } + } + .doOnLifecycle(onSubscribe, onDispose) + .to(TestHelper.testConsumer()) + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + }); + } + + @Test + public void onDisposeCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + doThrow(new TestException("First")).when(onDispose).run(); + + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = cs + .doOnLifecycle(onSubscribe, onDispose) + .test(); + + assertTrue(cs.hasObservers()); + + to.dispose(); + + assertFalse(cs.hasObservers()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "First"); + + verify(onSubscribe).accept(any()); + verify(onDispose).run(); + }); + } + + @Test + public void dispose() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = cs + .doOnLifecycle(onSubscribe, onDispose) + .test(); + + assertTrue(cs.hasObservers()); + + to.dispose(); + + assertFalse(cs.hasObservers()); + + verify(onSubscribe).accept(any()); + verify(onDispose).run(); + } + + @Test + public void isDisposed() { + TestHelper.checkDisposed(CompletableSubject.create().doOnLifecycle(d -> { }, () -> { })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletable(m -> m.doOnLifecycle(d -> { }, () -> { })); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDoOnLifecycleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDoOnLifecycleTest.java new file mode 100644 index 0000000000..b8921b7eb9 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDoOnLifecycleTest.java @@ -0,0 +1,168 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class MaybeDoOnLifecycleTest extends RxJavaTest { + + @Test + public void success() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + Maybe.just(1) + .doOnLifecycle(onSubscribe, onDispose) + .test() + .assertResult(1); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + } + + @Test + public void empty() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + Maybe.empty() + .doOnLifecycle(onSubscribe, onDispose) + .test() + .assertResult(); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + Maybe.error(new TestException()) + .doOnLifecycle(onSubscribe, onDispose) + .test() + .assertFailure(TestException.class); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + } + + @Test + public void onSubscribeCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + doThrow(new TestException("First")).when(onSubscribe).accept(any()); + + Disposable bs = Disposable.empty(); + + new Maybe() { + @Override + protected void subscribeActual(MaybeObserver observer) { + observer.onSubscribe(bs); + observer.onError(new TestException("Second")); + observer.onComplete(); + observer.onSuccess(1); + } + } + .doOnLifecycle(onSubscribe, onDispose) + .to(TestHelper.testConsumer()) + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + }); + } + + @Test + public void onDisposeCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + doThrow(new TestException("First")).when(onDispose).run(); + + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = ms + .doOnLifecycle(onSubscribe, onDispose) + .test(); + + assertTrue(ms.hasObservers()); + + to.dispose(); + + assertFalse(ms.hasObservers()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "First"); + + verify(onSubscribe).accept(any()); + verify(onDispose).run(); + }); + } + + @Test + public void dispose() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = ms + .doOnLifecycle(onSubscribe, onDispose) + .test(); + + assertTrue(ms.hasObservers()); + + to.dispose(); + + assertFalse(ms.hasObservers()); + + verify(onSubscribe).accept(any()); + verify(onDispose).run(); + } + + @Test + public void isDisposed() { + TestHelper.checkDisposed(MaybeSubject.create().doOnLifecycle(d -> { }, () -> { })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(m -> m.doOnLifecycle(d -> { }, () -> { })); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycleTest.java new file mode 100644 index 0000000000..9208ab2be8 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycleTest.java @@ -0,0 +1,152 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleDoOnLifecycleTest extends RxJavaTest { + + @Test + public void success() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + Single.just(1) + .doOnLifecycle(onSubscribe, onDispose) + .test() + .assertResult(1); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + Single.error(new TestException()) + .doOnLifecycle(onSubscribe, onDispose) + .test() + .assertFailure(TestException.class); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + } + + @Test + public void onSubscribeCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + doThrow(new TestException("First")).when(onSubscribe).accept(any()); + + Disposable bs = Disposable.empty(); + + new Single() { + @Override + protected void subscribeActual(SingleObserver observer) { + observer.onSubscribe(bs); + observer.onError(new TestException("Second")); + observer.onSuccess(1); + } + } + .doOnLifecycle(onSubscribe, onDispose) + .to(TestHelper.testConsumer()) + .assertFailureAndMessage(TestException.class, "First"); + + assertTrue(bs.isDisposed()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second"); + + verify(onSubscribe).accept(any()); + verify(onDispose, never()).run(); + }); + } + + @Test + public void onDisposeCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + doThrow(new TestException("First")).when(onDispose).run(); + + SingleSubject ss = SingleSubject.create(); + + TestObserver to = ss + .doOnLifecycle(onSubscribe, onDispose) + .test(); + + assertTrue(ss.hasObservers()); + + to.dispose(); + + assertFalse(ss.hasObservers()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "First"); + + verify(onSubscribe).accept(any()); + verify(onDispose).run(); + }); + } + + @Test + public void dispose() throws Throwable { + @SuppressWarnings("unchecked") + Consumer onSubscribe = mock(Consumer.class); + Action onDispose = mock(Action.class); + + SingleSubject ss = SingleSubject.create(); + + TestObserver to = ss + .doOnLifecycle(onSubscribe, onDispose) + .test(); + + assertTrue(ss.hasObservers()); + + to.dispose(); + + assertFalse(ss.hasObservers()); + + verify(onSubscribe).accept(any()); + verify(onDispose).run(); + } + + @Test + public void isDisposed() { + TestHelper.checkDisposed(SingleSubject.create().doOnLifecycle(d -> { }, () -> { })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(m -> m.doOnLifecycle(d -> { }, () -> { })); + } +}