Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add Maybe/Single/Completable switchOnNext & switchOnNextDelayError #6870

Merged
merged 2 commits into from
Jan 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,71 @@ private static NullPointerException toNpe(Throwable ex) {
return npe;
}

/**
* Switches between {@link CompletableSource}s emitted by the source {@link Publisher} whenever
* a new {@code CompletableSource} is emitted, disposing the previously running {@code CompletableSource},
* exposing the setup as a {@code Completable} sequence.
* <p>
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.switchOnNext.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher}
* or the currently running {@code CompletableSource}, disposing the rest. Late errors are
* forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
* </dl>
* @param sources the {@code Publisher} sequence of inner {@code CompletableSource}s to switch between
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
* @see #switchOnNextDelayError(Publisher)
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
public static Completable switchOnNext(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletablePublisher<>(sources, Functions.identity(), false));
}

/**
* Switches between {@link CompletableSource}s emitted by the source {@link Publisher} whenever
* a new {@code CompletableSource} is emitted, disposing the previously running {@code CompletableSource},
* exposing the setup as a {@code Completable} sequence and delaying all errors from
* all of them until all terminate.
* <p>
* <img width="640" height="415" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.switchOnNextDelayError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>The returned {@code Completable} collects all errors emitted by either the {@code sources}
* {@code Publisher} or any inner {@code CompletableSource} and emits them as a {@link CompositeException}
* when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.</dd>
* </dl>
* @param sources the {@code Publisher} sequence of inner {@code CompletableSource}s to switch between
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
* @see #switchOnNext(Publisher)
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
public static Completable switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapCompletablePublisher<>(sources, Functions.identity(), true));
}

/**
* Returns a {@code Completable} instance which manages a resource along
* with a custom {@link CompletableSource} instance while the subscription is active.
Expand Down Expand Up @@ -2328,6 +2393,7 @@ public final Completable retry(@NonNull Predicate<? super Throwable> predicate)
* @param stop the function that should return {@code true} to stop retrying
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code stop} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
Expand Down
71 changes: 71 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1728,6 +1728,75 @@ public static <T> Single<Boolean> sequenceEqual(@NonNull MaybeSource<? extends T
return RxJavaPlugins.onAssembly(new MaybeEqualSingle<>(source1, source2, isEqual));
}

/**
* Switches between {@link MaybeSource}s emitted by the source {@link Publisher} whenever
* a new {@code MaybeSource} is emitted, disposing the previously running {@code MaybeSource},
* exposing the success items as a {@link Flowable} sequence.
* <p>
* <img width="640" height="521" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.switchOnNext.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher}
* or the currently running {@code MaybeSource}, disposing the rest. Late errors are
* forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
* </dl>
* @param <T> the element type of the {@code MaybeSource}s
* @param sources the {@code Publisher} sequence of inner {@code MaybeSource}s to switch between
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
* @see #switchOnNextDelayError(Publisher)
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> switchOnNext(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybePublisher<>(sources, Functions.identity(), false));
}

/**
* Switches between {@link MaybeSource}s emitted by the source {@link Publisher} whenever
* a new {@code MaybeSource} is emitted, disposing the previously running {@code MaybeSource},
* exposing the success items as a {@link Flowable} sequence and delaying all errors from
* all of them until all terminate.
* <p>
* <img width="640" height="423" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.switchOnNextDelayError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>The returned {@code Flowable} collects all errors emitted by either the {@code sources}
* {@code Publisher} or any inner {@code MaybeSource} and emits them as a {@link CompositeException}
* when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.</dd>
* </dl>
* @param <T> the element type of the {@code MaybeSource}s
* @param sources the {@code Publisher} sequence of inner {@code MaybeSource}s to switch between
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
* @see #switchOnNext(Publisher)
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapMaybePublisher<>(sources, Functions.identity(), true));
}

/**
* Returns a {@code Maybe} that emits {@code 0L} after a specified delay.
* <p>
Expand Down Expand Up @@ -2868,6 +2937,7 @@ public final Maybe<T> delay(long time, @NonNull TimeUnit unit) {
* @throws NullPointerException if {@code unit} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
* @see #delay(long, TimeUnit, Scheduler, boolean)
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
Expand Down Expand Up @@ -2922,6 +2992,7 @@ public final Maybe<T> delay(long time, @NonNull TimeUnit unit, @NonNull Schedule
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
Expand Down
72 changes: 71 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.reactivex.rxjava3.internal.operators.completable.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.SingleFlatMapObservable;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.*;
import io.reactivex.rxjava3.internal.operators.single.*;
import io.reactivex.rxjava3.internal.util.ErrorMode;
Expand Down Expand Up @@ -1406,6 +1406,75 @@ public static <T> Single<Boolean> sequenceEqual(@NonNull SingleSource<? extends
return RxJavaPlugins.onAssembly(new SingleEquals<>(source1, source2));
}

/**
* Switches between {@link SingleSource}s emitted by the source {@link Publisher} whenever
* a new {@code SingleSource} is emitted, disposing the previously running {@code SingleSource},
* exposing the success items as a {@link Flowable} sequence.
* <p>
* <img width="640" height="521" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.switchOnNext.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchOnNext} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>The returned sequence fails with the first error signaled by the {@code sources} {@code Publisher}
* or the currently running {@code SingleSource}, disposing the rest. Late errors are
* forwarded to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.</dd>
* </dl>
* @param <T> the element type of the {@code SingleSource}s
* @param sources the {@code Publisher} sequence of inner {@code SingleSource}s to switch between
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
* @see #switchOnNextDelayError(Publisher)
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> switchOnNext(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSinglePublisher<>(sources, Functions.identity(), false));
}

/**
* Switches between {@link SingleSource}s emitted by the source {@link Publisher} whenever
* a new {@code SingleSource} is emitted, disposing the previously running {@code SingleSource},
* exposing the success items as a {@link Flowable} sequence and delaying all errors from
* all of them until all terminate.
* <p>
* <img width="640" height="425" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.switchOnNextDelayError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code sources} {@code Publisher} is consumed in an unbounded manner (requesting {@link Long#MAX_VALUE}).
* The returned {@code Flowable} respects the backpressure from the downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchOnNextDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>The returned {@code Flowable} collects all errors emitted by either the {@code sources}
* {@code Publisher} or any inner {@code SingleSource} and emits them as a {@link CompositeException}
* when all sources terminate. If only one source ever failed, its error is emitted as-is at the end.</dd>
* </dl>
* @param <T> the element type of the {@code SingleSource}s
* @param sources the {@code Publisher} sequence of inner {@code SingleSource}s to switch between
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
* @see #switchOnNext(Publisher)
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> switchOnNextDelayError(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableSwitchMapSinglePublisher<>(sources, Functions.identity(), true));
}

/**
* <strong>Advanced use only:</strong> creates a {@code Single} instance without
* any safeguards by using a callback that is called with a {@link SingleObserver}.
Expand Down Expand Up @@ -3758,6 +3827,7 @@ public final Single<T> retry(@NonNull Predicate<? super Throwable> predicate) {
* @param stop the function that should return {@code true} to stop retrying
* @return the new {@code Single} instance
* @throws NullPointerException if {@code stop} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.mixed;

import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;

/**
* Switch between subsequent {@link CompletableSource}s emitted by a {@link Publisher}.
* Reuses {@link FlowableSwitchMapCompletable} internals.
* @param <T> the upstream value type
* @since 3.0.0
*/
public final class FlowableSwitchMapCompletablePublisher<T> extends Completable {

final Publisher<T> source;

final Function<? super T, ? extends CompletableSource> mapper;

final boolean delayErrors;

public FlowableSwitchMapCompletablePublisher(Publisher<T> source,
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
this.source = source;
this.mapper = mapper;
this.delayErrors = delayErrors;
}

@Override
protected void subscribeActual(CompletableObserver observer) {
source.subscribe(new FlowableSwitchMapCompletable.SwitchMapCompletableObserver<>(observer, mapper, delayErrors));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.mixed;

import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;

/**
* Switch between subsequent {@link MaybeSource}s emitted by a {@link Publisher}.
* Reuses {@link FlowableSwitchMapMaybe} internals.
* @param <T> the upstream value type
* @param <R> the downstream value type
* @since 3.0.0
*/
public final class FlowableSwitchMapMaybePublisher<T, R> extends Flowable<R> {

final Publisher<T> source;

final Function<? super T, ? extends MaybeSource<? extends R>> mapper;

final boolean delayErrors;

public FlowableSwitchMapMaybePublisher(Publisher<T> source,
Function<? super T, ? extends MaybeSource<? extends R>> mapper,
boolean delayErrors) {
this.source = source;
this.mapper = mapper;
this.delayErrors = delayErrors;
}

@Override
protected void subscribeActual(Subscriber<? super R> s) {
source.subscribe(new FlowableSwitchMapMaybe.SwitchMapMaybeSubscriber<>(s, mapper, delayErrors));
}
}
Loading