diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index 81028f726e..779d04d1d2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -1016,6 +1016,71 @@ private static NullPointerException toNpe(Throwable ex) { return npe; } + /** + * Switches between {@link CompletableSource}s emitted by the source {@link Publisher} whenever + * a new {@code CompletableSource} is emitted, disposing the previously running {@code CompletableSource}, + * exposing the setup as a {@code Completable} sequence. + *

+ * + *

+ *
Backpressure:
+ *
The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
+ *
Scheduler:
+ *
{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher} + * or the currently running {@code CompletableSource}, disposing the rest. Late errors are + * forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
+ * @param sources the {@code Publisher} sequence of inner {@code CompletableSource}s to switch between + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + * @see #switchOnNextDelayError(Publisher) + * @see ReactiveX operators documentation: Switch + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + public static Completable switchOnNext(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) { + Objects.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletablePublisher<>(sources, Functions.identity(), false)); + } + + /** + * Switches between {@link CompletableSource}s emitted by the source {@link Publisher} whenever + * a new {@code CompletableSource} is emitted, disposing the previously running {@code CompletableSource}, + * exposing the setup as a {@code Completable} sequence and delaying all errors from + * all of them until all terminate. + *

+ * + *

+ *
Backpressure:
+ *
The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
+ *
Scheduler:
+ *
{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
The returned {@code Completable} collects all errors emitted by either the {@code sources} + * {@code Publisher} or any inner {@code CompletableSource} and emits them as a {@link CompositeException} + * when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.
+ *
+ * @param sources the {@code Publisher} sequence of inner {@code CompletableSource}s to switch between + * @return the new {@code Completable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + * @see #switchOnNext(Publisher) + * @see ReactiveX operators documentation: Switch + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + public static Completable switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) { + Objects.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletablePublisher<>(sources, Functions.identity(), true)); + } + /** * Returns a {@code Completable} instance which manages a resource along * with a custom {@link CompletableSource} instance while the subscription is active. @@ -2328,6 +2393,7 @@ public final Completable retry(@NonNull Predicate predicate) * @param stop the function that should return {@code true} to stop retrying * @return the new {@code Completable} instance * @throws NullPointerException if {@code stop} is {@code null} + * @since 3.0.0 */ @CheckReturnValue @NonNull diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 98065a40d5..048b5ec713 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -1728,6 +1728,75 @@ public static Single sequenceEqual(@NonNull MaybeSource(source1, source2, isEqual)); } + /** + * Switches between {@link MaybeSource}s emitted by the source {@link Publisher} whenever + * a new {@code MaybeSource} is emitted, disposing the previously running {@code MaybeSource}, + * exposing the success items as a {@link Flowable} sequence. + *

+ * + *

+ *
Backpressure:
+ *
The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}). + * The returned {@code Flowable} respects the backpressure from the downstream.
+ *
Scheduler:
+ *
{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher} + * or the currently running {@code MaybeSource}, disposing the rest. Late errors are + * forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
+ * @param the element type of the {@code MaybeSource}s + * @param sources the {@code Publisher} sequence of inner {@code MaybeSource}s to switch between + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + * @see #switchOnNextDelayError(Publisher) + * @see ReactiveX operators documentation: Switch + */ + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable switchOnNext(@NonNull Publisher<@NonNull ? extends MaybeSource> sources) { + Objects.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybePublisher<>(sources, Functions.identity(), false)); + } + + /** + * Switches between {@link MaybeSource}s emitted by the source {@link Publisher} whenever + * a new {@code MaybeSource} is emitted, disposing the previously running {@code MaybeSource}, + * exposing the success items as a {@link Flowable} sequence and delaying all errors from + * all of them until all terminate. + *

+ * + *

+ *
Backpressure:
+ *
The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}). + * The returned {@code Flowable} respects the backpressure from the downstream.
+ *
Scheduler:
+ *
{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
The returned {@code Flowable} collects all errors emitted by either the {@code sources} + * {@code Publisher} or any inner {@code MaybeSource} and emits them as a {@link CompositeException} + * when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.
+ *
+ * @param the element type of the {@code MaybeSource}s + * @param sources the {@code Publisher} sequence of inner {@code MaybeSource}s to switch between + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + * @see #switchOnNext(Publisher) + * @see ReactiveX operators documentation: Switch + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource> sources) { + Objects.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybePublisher<>(sources, Functions.identity(), true)); + } + /** * Returns a {@code Maybe} that emits {@code 0L} after a specified delay. *

@@ -2868,6 +2937,7 @@ public final Maybe delay(long time, @NonNull TimeUnit unit) { * @throws NullPointerException if {@code unit} is {@code null} * @see ReactiveX operators documentation: Delay * @see #delay(long, TimeUnit, Scheduler, boolean) + * @since 3.0.0 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.COMPUTATION) @@ -2922,6 +2992,7 @@ public final Maybe delay(long time, @NonNull TimeUnit unit, @NonNull Schedule * @return the new {@code Maybe} instance * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} * @see ReactiveX operators documentation: Delay + * @since 3.0.0 */ @CheckReturnValue @NonNull diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index e17ff9e8ec..719719520f 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -30,7 +30,7 @@ import io.reactivex.rxjava3.internal.operators.completable.*; import io.reactivex.rxjava3.internal.operators.flowable.*; import io.reactivex.rxjava3.internal.operators.maybe.*; -import io.reactivex.rxjava3.internal.operators.mixed.SingleFlatMapObservable; +import io.reactivex.rxjava3.internal.operators.mixed.*; import io.reactivex.rxjava3.internal.operators.observable.*; import io.reactivex.rxjava3.internal.operators.single.*; import io.reactivex.rxjava3.internal.util.ErrorMode; @@ -1406,6 +1406,75 @@ public static Single sequenceEqual(@NonNull SingleSource(source1, source2)); } + /** + * Switches between {@link SingleSource}s emitted by the source {@link Publisher} whenever + * a new {@code SingleSource} is emitted, disposing the previously running {@code SingleSource}, + * exposing the success items as a {@link Flowable} sequence. + *

+ * + *

+ *
Backpressure:
+ *
The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}). + * The returned {@code Flowable} respects the backpressure from the downstream.
+ *
Scheduler:
+ *
{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher} + * or the currently running {@code SingleSource}, disposing the rest. Late errors are + * forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ *
+ * @param the element type of the {@code SingleSource}s + * @param sources the {@code Publisher} sequence of inner {@code SingleSource}s to switch between + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + * @see #switchOnNextDelayError(Publisher) + * @see ReactiveX operators documentation: Switch + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable switchOnNext(@NonNull Publisher<@NonNull ? extends SingleSource> sources) { + Objects.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapSinglePublisher<>(sources, Functions.identity(), false)); + } + + /** + * Switches between {@link SingleSource}s emitted by the source {@link Publisher} whenever + * a new {@code SingleSource} is emitted, disposing the previously running {@code SingleSource}, + * exposing the success items as a {@link Flowable} sequence and delaying all errors from + * all of them until all terminate. + *

+ * + *

+ *
Backpressure:
+ *
The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}). + * The returned {@code Flowable} respects the backpressure from the downstream.
+ *
Scheduler:
+ *
{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
The returned {@code Flowable} collects all errors emitted by either the {@code sources} + * {@code Publisher} or any inner {@code SingleSource} and emits them as a {@link CompositeException} + * when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.
+ *
+ * @param the element type of the {@code SingleSource}s + * @param sources the {@code Publisher} sequence of inner {@code SingleSource}s to switch between + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @since 3.0.0 + * @see #switchOnNext(Publisher) + * @see ReactiveX operators documentation: Switch + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends SingleSource> sources) { + Objects.requireNonNull(sources, "sources is null"); + return RxJavaPlugins.onAssembly(new FlowableSwitchMapSinglePublisher<>(sources, Functions.identity(), true)); + } + /** * Advanced use only: creates a {@code Single} instance without * any safeguards by using a callback that is called with a {@link SingleObserver}. @@ -3758,6 +3827,7 @@ public final Single retry(@NonNull Predicate predicate) { * @param stop the function that should return {@code true} to stop retrying * @return the new {@code Single} instance * @throws NullPointerException if {@code stop} is {@code null} + * @since 3.0.0 */ @CheckReturnValue @NonNull diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapCompletablePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapCompletablePublisher.java new file mode 100644 index 0000000000..5752027c23 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapCompletablePublisher.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.mixed; + +import org.reactivestreams.Publisher; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; + +/** + * Switch between subsequent {@link CompletableSource}s emitted by a {@link Publisher}. + * Reuses {@link FlowableSwitchMapCompletable} internals. + * @param the upstream value type + * @since 3.0.0 + */ +public final class FlowableSwitchMapCompletablePublisher extends Completable { + + final Publisher source; + + final Function mapper; + + final boolean delayErrors; + + public FlowableSwitchMapCompletablePublisher(Publisher source, + Function mapper, boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(CompletableObserver observer) { + source.subscribe(new FlowableSwitchMapCompletable.SwitchMapCompletableObserver<>(observer, mapper, delayErrors)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybePublisher.java new file mode 100644 index 0000000000..41696133e9 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybePublisher.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.mixed; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; + +/** + * Switch between subsequent {@link MaybeSource}s emitted by a {@link Publisher}. + * Reuses {@link FlowableSwitchMapMaybe} internals. + * @param the upstream value type + * @param the downstream value type + * @since 3.0.0 + */ +public final class FlowableSwitchMapMaybePublisher extends Flowable { + + final Publisher source; + + final Function> mapper; + + final boolean delayErrors; + + public FlowableSwitchMapMaybePublisher(Publisher source, + Function> mapper, + boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlowableSwitchMapMaybe.SwitchMapMaybeSubscriber<>(s, mapper, delayErrors)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSinglePublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSinglePublisher.java new file mode 100644 index 0000000000..e64f839046 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSinglePublisher.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.mixed; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; + +/** + * Switch between subsequent {@link SingleSource}s emitted by a {@link Publisher}. + * Reuses {@link FlowableSwitchMapSingle} internals. + * @param the upstream value type + * @param the downstream value type + * @since 3.0.0 + */ +public final class FlowableSwitchMapSinglePublisher extends Flowable { + + final Publisher source; + + final Function> mapper; + + final boolean delayErrors; + + public FlowableSwitchMapSinglePublisher(Publisher source, + Function> mapper, + boolean delayErrors) { + this.source = source; + this.mapper = mapper; + this.delayErrors = delayErrors; + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new FlowableSwitchMapSingle.SwitchMapSingleSubscriber<>(s, mapper, delayErrors)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableSwitchOnNextTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableSwitchOnNextTest.java new file mode 100644 index 0000000000..ad6147fef5 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableSwitchOnNextTest.java @@ -0,0 +1,132 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.completable; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.subjects.CompletableSubject; + +public class CompletableSwitchOnNextTest extends RxJavaTest { + + @Test + public void normal() { + Runnable run = mock(Runnable.class); + + Completable.switchOnNext( + Flowable.range(1, 10) + .map(v -> { + if (v % 2 == 0) { + return Completable.fromRunnable(run); + } + return Completable.complete(); + }) + ) + .test() + .assertResult(); + + verify(run, times(5)).run(); + } + + @Test + public void normalDelayError() { + Runnable run = mock(Runnable.class); + + Completable.switchOnNextDelayError( + Flowable.range(1, 10) + .map(v -> { + if (v % 2 == 0) { + return Completable.fromRunnable(run); + } + return Completable.complete(); + }) + ) + .test() + .assertResult(); + + verify(run, times(5)).run(); + } + + @Test + public void noDelaySwitch() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.switchOnNext(pp).test(); + + assertTrue(pp.hasSubscribers()); + + to.assertEmpty(); + + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + pp.onNext(cs1); + + assertTrue(cs1.hasObservers()); + + pp.onNext(cs2); + + assertFalse(cs1.hasObservers()); + assertTrue(cs2.hasObservers()); + + pp.onComplete(); + + assertTrue(cs2.hasObservers()); + + cs2.onComplete(); + + to.assertResult(); + } + + @Test + public void delaySwitch() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver to = Completable.switchOnNextDelayError(pp).test(); + + assertTrue(pp.hasSubscribers()); + + to.assertEmpty(); + + CompletableSubject cs1 = CompletableSubject.create(); + CompletableSubject cs2 = CompletableSubject.create(); + + pp.onNext(cs1); + + assertTrue(cs1.hasObservers()); + + pp.onNext(cs2); + + assertFalse(cs1.hasObservers()); + assertTrue(cs2.hasObservers()); + + assertTrue(cs2.hasObservers()); + + cs2.onError(new TestException()); + + assertTrue(pp.hasSubscribers()); + + to.assertEmpty(); + + pp.onComplete(); + + to.assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSwitchOnNextTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSwitchOnNextTest.java new file mode 100644 index 0000000000..6b217aa680 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSwitchOnNextTest.java @@ -0,0 +1,123 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.subscribers.TestSubscriber; + +public class MaybeSwitchOnNextTest extends RxJavaTest { + + @Test + public void normal() { + Maybe.switchOnNext( + Flowable.range(1, 10) + .map(v -> { + if (v % 2 == 0) { + return Maybe.just(v); + } + return Maybe.empty(); + }) + ) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void normalDelayError() { + Maybe.switchOnNextDelayError( + Flowable.range(1, 10) + .map(v -> { + if (v % 2 == 0) { + return Maybe.just(v); + } + return Maybe.empty(); + }) + ) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void noDelaySwitch() { + PublishProcessor> pp = PublishProcessor.create(); + + TestSubscriber ts = Maybe.switchOnNext(pp).test(); + + assertTrue(pp.hasSubscribers()); + + ts.assertEmpty(); + + MaybeSubject ms1 = MaybeSubject.create(); + MaybeSubject ms2 = MaybeSubject.create(); + + pp.onNext(ms1); + + assertTrue(ms1.hasObservers()); + + pp.onNext(ms2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + pp.onComplete(); + + assertTrue(ms2.hasObservers()); + + ms2.onSuccess(1); + + ts.assertResult(1); + } + + @Test + public void delaySwitch() { + PublishProcessor> pp = PublishProcessor.create(); + + TestSubscriber ts = Maybe.switchOnNextDelayError(pp).test(); + + assertTrue(pp.hasSubscribers()); + + ts.assertEmpty(); + + MaybeSubject ms1 = MaybeSubject.create(); + MaybeSubject ms2 = MaybeSubject.create(); + + pp.onNext(ms1); + + assertTrue(ms1.hasObservers()); + + pp.onNext(ms2); + + assertFalse(ms1.hasObservers()); + assertTrue(ms2.hasObservers()); + + assertTrue(ms2.hasObservers()); + + ms2.onError(new TestException()); + + assertTrue(pp.hasSubscribers()); + + ts.assertEmpty(); + + pp.onComplete(); + + ts.assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSwitchOnNextTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSwitchOnNextTest.java new file mode 100644 index 0000000000..63c97f14ec --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSwitchOnNextTest.java @@ -0,0 +1,123 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.subscribers.TestSubscriber; + +public class SingleSwitchOnNextTest extends RxJavaTest { + + @Test + public void normal() { + Single.switchOnNext( + Flowable.range(1, 5) + .map(v -> { + if (v % 2 == 0) { + return Single.just(v); + } + return Single.just(10 + v); + }) + ) + .test() + .assertResult(11, 2, 13, 4, 15); + } + + @Test + public void normalDelayError() { + Single.switchOnNextDelayError( + Flowable.range(1, 5) + .map(v -> { + if (v % 2 == 0) { + return Single.just(v); + } + return Single.just(10 + v); + }) + ) + .test() + .assertResult(11, 2, 13, 4, 15); + } + + @Test + public void noDelaySwitch() { + PublishProcessor> pp = PublishProcessor.create(); + + TestSubscriber ts = Single.switchOnNext(pp).test(); + + assertTrue(pp.hasSubscribers()); + + ts.assertEmpty(); + + SingleSubject ss1 = SingleSubject.create(); + SingleSubject ss2 = SingleSubject.create(); + + pp.onNext(ss1); + + assertTrue(ss1.hasObservers()); + + pp.onNext(ss2); + + assertFalse(ss1.hasObservers()); + assertTrue(ss2.hasObservers()); + + pp.onComplete(); + + assertTrue(ss2.hasObservers()); + + ss2.onSuccess(1); + + ts.assertResult(1); + } + + @Test + public void delaySwitch() { + PublishProcessor> pp = PublishProcessor.create(); + + TestSubscriber ts = Single.switchOnNextDelayError(pp).test(); + + assertTrue(pp.hasSubscribers()); + + ts.assertEmpty(); + + SingleSubject ss1 = SingleSubject.create(); + SingleSubject ss2 = SingleSubject.create(); + + pp.onNext(ss1); + + assertTrue(ss1.hasObservers()); + + pp.onNext(ss2); + + assertFalse(ss1.hasObservers()); + assertTrue(ss2.hasObservers()); + + assertTrue(ss2.hasObservers()); + + ss2.onError(new TestException()); + + assertTrue(pp.hasSubscribers()); + + ts.assertEmpty(); + + pp.onComplete(); + + ts.assertFailure(TestException.class); + } +}