Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add Maybe/Single/Completable blockingSubscribe #6862

Merged
merged 2 commits into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,106 @@ public final boolean blockingAwait(long timeout, @NonNull TimeUnit unit) {
return observer.blockingAwait(timeout, unit);
}

/**
* Subscribes to the current {@code Completable} and <em>blocks the current thread</em> until it terminates.
* <p>
* <img width="640" height="346" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the current {@code Completable} signals an error,
* the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
* </dd>
* </dl>
* @since 3.0.0
* @see #blockingSubscribe(Action)
* @see #blockingSubscribe(Action, Consumer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe() {
blockingSubscribe(Functions.EMPTY_ACTION, Functions.ERROR_CONSUMER);
}

/**
* Subscribes to the current {@code Completable} and calls given {@code onComplete} callback on the <em>current thread</em>
* when it completes normally.
* <p>
* <img width="640" height="351" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.a.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If either the current {@code Completable} signals an error or {@code onComplete} throws,
* the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
* </dd>
* </dl>
* @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
* @throws NullPointerException if {@code onComplete} is {@code null}
* @since 3.0.0
* @see #blockingSubscribe(Action, Consumer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull Action onComplete) {
blockingSubscribe(onComplete, Functions.ERROR_CONSUMER);
}

/**
* Subscribes to the current {@code Completable} and calls the appropriate callback on the <em>current thread</em>
* when it terminates.
* <p>
* <img width="640" height="352" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.ac.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If either {@code onComplete} or {@code onError} throw, the {@link Throwable} is routed to the
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
* </dd>
* </dl>
* @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
* @param onError the {@link Consumer} to call if the current {@code Completable} signals an error
* @throws NullPointerException if {@code onComplete} or {@code onError} is {@code null}
* @since 3.0.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull Action onComplete, @NonNull Consumer<? super Throwable> onError) {
Objects.requireNonNull(onComplete, "onComplete is null");
Objects.requireNonNull(onError, "onError is null");
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<>();
subscribe(observer);
observer.blockingConsume(Functions.emptyConsumer(), onError, onComplete);
}

/**
* Subscribes to the current {@code Completable} and calls the appropriate {@link CompletableObserver} method on the <em>current thread</em>.
* <p>
* <img width="640" height="468" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingSubscribe.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>An {@code onError} signal is delivered to the {@link CompletableObserver#onError(Throwable)} method.
* If any of the {@code CompletableObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
* If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
* </dd>
* </dl>
* @param observer the {@code CompletableObserver} to call methods on the current thread
* @throws NullPointerException if {@code observer} is {@code null}
* @since 3.0.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull CompletableObserver observer) {
Objects.requireNonNull(observer, "observer is null");
BlockingDisposableMultiObserver<Void> blockingObserver = new BlockingDisposableMultiObserver<>();
observer.onSubscribe(blockingObserver);
subscribe(blockingObserver);
blockingObserver.blockingConsume(observer);
}

/**
* Subscribes to this {@code Completable} only once, when the first {@link CompletableObserver}
* subscribes to the result {@code Completable}, caches its terminal event
Expand Down
131 changes: 130 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.jdk8.*;
import io.reactivex.rxjava3.internal.observers.BlockingMultiObserver;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
Expand Down Expand Up @@ -2475,6 +2475,135 @@ public final T blockingGet(@NonNull T defaultValue) {
return observer.blockingGet(defaultValue);
}

/**
* Subscribes to the current {@code Maybe} and <em>blocks the current thread</em> until it terminates.
* <p>
* <img width="640" height="238" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the current {@code Maybe} signals an error,
* the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
* </dd>
* </dl>
* @since 3.0.0
* @see #blockingSubscribe(Consumer)
* @see #blockingSubscribe(Consumer, Consumer)
* @see #blockingSubscribe(Consumer, Consumer, Action)
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe() {
blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the current {@code Maybe} and calls given {@code onSuccess} callback on the <em>current thread</em>
* when it completes normally.
* <p>
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.c.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If either the current {@code Maybe} signals an error or {@code onSuccess} throws,
* the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
* </dd>
* </dl>
* @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
* @throws NullPointerException if {@code onSuccess} is {@code null}
* @since 3.0.0
* @see #blockingSubscribe(Consumer, Consumer)
* @see #blockingSubscribe(Consumer, Consumer, Action)
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess) {
blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
* when it terminates.
* <p>
* <img width="640" height="256" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cc.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
* </dd>
* </dl>
* @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
* @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
* @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
* @since 3.0.0
* @see #blockingSubscribe(Consumer, Consumer, Action)
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError) {
blockingSubscribe(onSuccess, onError, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the current {@code Maybe} and calls the appropriate callback on the <em>current thread</em>
* when it terminates.
* <p>
* <img width="640" height="251" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.cca.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If either {@code onSuccess}, {@code onError} or {@code onComplete} throw, the {@link Throwable} is routed to the
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
* </dd>
* </dl>
* @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
* @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
* @param onComplete the {@linnk Action} to call if the current {@code Maybe} completes without a value
* @throws NullPointerException if {@code onSuccess}, {@code onError} or {@code onComplete} is {@code null}
* @since 3.0.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) {
Objects.requireNonNull(onSuccess, "onSuccess is null");
Objects.requireNonNull(onError, "onError is null");
Objects.requireNonNull(onComplete, "onComplete is null");
BlockingMultiObserver<T> observer = new BlockingMultiObserver<>();
subscribe(observer);
observer.blockingConsume(onSuccess, onError, onComplete);
}

/**
* Subscribes to the current {@code Maybe} and calls the appropriate {@link MaybeObserver} method on the <em>current thread</em>.
* <p>
* <img width="640" height="398" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.blockingSubscribe.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>An {@code onError} signal is delivered to the {@link MaybeObserver#onError(Throwable)} method.
* If any of the {@code MaybeObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
* If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
* </dd>
* </dl>
* @param observer the {@code MaybeObserver} to call methods on the current thread
* @throws NullPointerException if {@code observer} is {@code null}
* @since 3.0.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull MaybeObserver<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
BlockingDisposableMultiObserver<T> blockingObserver = new BlockingDisposableMultiObserver<>();
observer.onSubscribe(blockingObserver);
subscribe(blockingObserver);
blockingObserver.blockingConsume(observer);
}

/**
* Returns a {@code Maybe} that subscribes to this {@code Maybe} lazily, caches its event
* and replays it, to all the downstream subscribers.
Expand Down
100 changes: 100 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2947,6 +2947,106 @@ public final T blockingGet() {
return observer.blockingGet();
}

/**
* Subscribes to the current {@code Single} and <em>blocks the current thread</em> until it terminates.
* <p>
* <img width="640" height="329" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the current {@code Single} signals an error,
* the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
* </dd>
* </dl>
* @since 3.0.0
* @see #blockingSubscribe(Consumer)
* @see #blockingSubscribe(Consumer, Consumer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe() {
blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER);
}

/**
* Subscribes to the current {@code Single} and calls given {@code onSuccess} callback on the <em>current thread</em>
* when it completes normally.
* <p>
* <img width="640" height="351" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.c.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If either the current {@code Single} signals an error or {@code onSuccess} throws,
* the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
* </dd>
* </dl>
* @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
* @throws NullPointerException if {@code onSuccess} is {@code null}
* @since 3.0.0
* @see #blockingSubscribe(Consumer, Consumer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess) {
blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER);
}

/**
* Subscribes to the current {@code Single} and calls the appropriate callback on the <em>current thread</em>
* when it terminates.
* <p>
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.cc.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
* global error handler via {@link RxJavaPlugins#onError(Throwable)}.
* If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
* </dd>
* </dl>
* @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
* @param onError the {@code Consumer} to call if the current {@code Single} signals an error
* @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
* @since 3.0.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull Consumer<? super T> onSuccess, @NonNull Consumer<? super Throwable> onError) {
Objects.requireNonNull(onSuccess, "onSuccess is null");
Objects.requireNonNull(onError, "onError is null");
BlockingMultiObserver<T> observer = new BlockingMultiObserver<>();
subscribe(observer);
observer.blockingConsume(onSuccess, onError, Functions.EMPTY_ACTION);
}

/**
* Subscribes to the current {@code Single} and calls the appropriate {@link SingleObserver} method on the <em>current thread</em>.
* <p>
* <img width="640" height="479" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.blockingSubscribe.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>An {@code onError} signal is delivered to the {@link SingleObserver#onError(Throwable)} method.
* If any of the {@code SingleObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
* If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
* </dd>
* </dl>
* @param observer the {@code SingleObserver} to call methods on the current thread
* @throws NullPointerException if {@code observer} is {@code null}
* @since 3.0.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingSubscribe(@NonNull SingleObserver<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
BlockingDisposableMultiObserver<T> blockingObserver = new BlockingDisposableMultiObserver<>();
observer.onSubscribe(blockingObserver);
subscribe(blockingObserver);
blockingObserver.blockingConsume(observer);
}

/**
* <strong>This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;</strong>
Expand Down
Loading