diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java
index 1266255315..0913ac398d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java
@@ -1310,6 +1310,106 @@ public final boolean blockingAwait(long timeout, @NonNull TimeUnit unit) {
return observer.blockingAwait(timeout, unit);
}
+ /**
+ * Subscribes to the current {@code Completable} and blocks the current thread until it terminates.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the current {@code Completable} signals an error,
+ * the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+ *
+ *
+ * @since 3.0.0
+ * @see #blockingSubscribe(Action)
+ * @see #blockingSubscribe(Action, Consumer)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe() {
+ blockingSubscribe(Functions.EMPTY_ACTION, Functions.ERROR_CONSUMER);
+ }
+
+ /**
+ * Subscribes to the current {@code Completable} and calls given {@code onComplete} callback on the current thread
+ * when it completes normally.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either the current {@code Completable} signals an error or {@code onComplete} throws,
+ * the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+ *
+ *
+ * @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
+ * @throws NullPointerException if {@code onComplete} is {@code null}
+ * @since 3.0.0
+ * @see #blockingSubscribe(Action, Consumer)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull Action onComplete) {
+ blockingSubscribe(onComplete, Functions.ERROR_CONSUMER);
+ }
+
+ /**
+ * Subscribes to the current {@code Completable} and calls the appropriate callback on the current thread
+ * when it terminates.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either {@code onComplete} or {@code onError} throw, the {@link Throwable} is routed to the
+ * global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+ *
+ *
+ * @param onComplete the {@link Action} to call if the current {@code Completable} completes normally
+ * @param onError the {@link Consumer} to call if the current {@code Completable} signals an error
+ * @throws NullPointerException if {@code onComplete} or {@code onError} is {@code null}
+ * @since 3.0.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull Action onComplete, @NonNull Consumer super Throwable> onError) {
+ Objects.requireNonNull(onComplete, "onComplete is null");
+ Objects.requireNonNull(onError, "onError is null");
+ BlockingMultiObserver observer = new BlockingMultiObserver<>();
+ subscribe(observer);
+ observer.blockingConsume(Functions.emptyConsumer(), onError, onComplete);
+ }
+
+ /**
+ * Subscribes to the current {@code Completable} and calls the appropriate {@link CompletableObserver} method on the current thread.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
An {@code onError} signal is delivered to the {@link CompletableObserver#onError(Throwable)} method.
+ * If any of the {@code CompletableObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
+ * If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
+ *
+ *
+ * @param observer the {@code CompletableObserver} to call methods on the current thread
+ * @throws NullPointerException if {@code observer} is {@code null}
+ * @since 3.0.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull CompletableObserver observer) {
+ Objects.requireNonNull(observer, "observer is null");
+ BlockingDisposableMultiObserver blockingObserver = new BlockingDisposableMultiObserver<>();
+ observer.onSubscribe(blockingObserver);
+ subscribe(blockingObserver);
+ blockingObserver.blockingConsume(observer);
+ }
+
/**
* Subscribes to this {@code Completable} only once, when the first {@link CompletableObserver}
* subscribes to the result {@code Completable}, caches its terminal event
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 4322afaa93..cc3bef199e 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -26,7 +26,7 @@
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.jdk8.*;
-import io.reactivex.rxjava3.internal.observers.BlockingMultiObserver;
+import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
@@ -2475,6 +2475,135 @@ public final T blockingGet(@NonNull T defaultValue) {
return observer.blockingGet(defaultValue);
}
+ /**
+ * Subscribes to the current {@code Maybe} and blocks the current thread until it terminates.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the current {@code Maybe} signals an error,
+ * the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+ *
+ *
+ * @since 3.0.0
+ * @see #blockingSubscribe(Consumer)
+ * @see #blockingSubscribe(Consumer, Consumer)
+ * @see #blockingSubscribe(Consumer, Consumer, Action)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe() {
+ blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
+ }
+
+ /**
+ * Subscribes to the current {@code Maybe} and calls given {@code onSuccess} callback on the current thread
+ * when it completes normally.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either the current {@code Maybe} signals an error or {@code onSuccess} throws,
+ * the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+ *
+ *
+ * @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
+ * @throws NullPointerException if {@code onSuccess} is {@code null}
+ * @since 3.0.0
+ * @see #blockingSubscribe(Consumer, Consumer)
+ * @see #blockingSubscribe(Consumer, Consumer, Action)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull Consumer super T> onSuccess) {
+ blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
+ }
+
+ /**
+ * Subscribes to the current {@code Maybe} and calls the appropriate callback on the current thread
+ * when it terminates.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
+ * global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+ *
+ *
+ * @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
+ * @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
+ * @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
+ * @since 3.0.0
+ * @see #blockingSubscribe(Consumer, Consumer, Action)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull Consumer super T> onSuccess, @NonNull Consumer super Throwable> onError) {
+ blockingSubscribe(onSuccess, onError, Functions.EMPTY_ACTION);
+ }
+
+ /**
+ * Subscribes to the current {@code Maybe} and calls the appropriate callback on the current thread
+ * when it terminates.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either {@code onSuccess}, {@code onError} or {@code onComplete} throw, the {@link Throwable} is routed to the
+ * global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+ *
+ *
+ * @param onSuccess the {@link Consumer} to call if the current {@code Maybe} succeeds
+ * @param onError the {@code Consumer} to call if the current {@code Maybe} signals an error
+ * @param onComplete the {@linnk Action} to call if the current {@code Maybe} completes without a value
+ * @throws NullPointerException if {@code onSuccess}, {@code onError} or {@code onComplete} is {@code null}
+ * @since 3.0.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull Consumer super T> onSuccess, @NonNull Consumer super Throwable> onError, @NonNull Action onComplete) {
+ Objects.requireNonNull(onSuccess, "onSuccess is null");
+ Objects.requireNonNull(onError, "onError is null");
+ Objects.requireNonNull(onComplete, "onComplete is null");
+ BlockingMultiObserver observer = new BlockingMultiObserver<>();
+ subscribe(observer);
+ observer.blockingConsume(onSuccess, onError, onComplete);
+ }
+
+ /**
+ * Subscribes to the current {@code Maybe} and calls the appropriate {@link MaybeObserver} method on the current thread.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
An {@code onError} signal is delivered to the {@link MaybeObserver#onError(Throwable)} method.
+ * If any of the {@code MaybeObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
+ * If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
+ *
+ *
+ * @param observer the {@code MaybeObserver} to call methods on the current thread
+ * @throws NullPointerException if {@code observer} is {@code null}
+ * @since 3.0.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull MaybeObserver super T> observer) {
+ Objects.requireNonNull(observer, "observer is null");
+ BlockingDisposableMultiObserver blockingObserver = new BlockingDisposableMultiObserver<>();
+ observer.onSubscribe(blockingObserver);
+ subscribe(blockingObserver);
+ blockingObserver.blockingConsume(observer);
+ }
+
/**
* Returns a {@code Maybe} that subscribes to this {@code Maybe} lazily, caches its event
* and replays it, to all the downstream subscribers.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index c93bfa9282..216d3deb87 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -2947,6 +2947,106 @@ public final T blockingGet() {
return observer.blockingGet();
}
+ /**
+ * Subscribes to the current {@code Single} and blocks the current thread until it terminates.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the current {@code Single} signals an error,
+ * the {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+ *
+ *
+ * @since 3.0.0
+ * @see #blockingSubscribe(Consumer)
+ * @see #blockingSubscribe(Consumer, Consumer)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe() {
+ blockingSubscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER);
+ }
+
+ /**
+ * Subscribes to the current {@code Single} and calls given {@code onSuccess} callback on the current thread
+ * when it completes normally.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either the current {@code Single} signals an error or {@code onSuccess} throws,
+ * the respective {@link Throwable} is routed to the global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, an {@link InterruptedException} is routed to the same global error handler.
+ *
+ *
+ * @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
+ * @throws NullPointerException if {@code onSuccess} is {@code null}
+ * @since 3.0.0
+ * @see #blockingSubscribe(Consumer, Consumer)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull Consumer super T> onSuccess) {
+ blockingSubscribe(onSuccess, Functions.ERROR_CONSUMER);
+ }
+
+ /**
+ * Subscribes to the current {@code Single} and calls the appropriate callback on the current thread
+ * when it terminates.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If either {@code onSuccess} or {@code onError} throw, the {@link Throwable} is routed to the
+ * global error handler via {@link RxJavaPlugins#onError(Throwable)}.
+ * If the current thread is interrupted, the {@code onError} consumer is called with an {@link InterruptedException}.
+ *
+ *
+ * @param onSuccess the {@link Consumer} to call if the current {@code Single} succeeds
+ * @param onError the {@code Consumer} to call if the current {@code Single} signals an error
+ * @throws NullPointerException if {@code onSuccess} or {@code onError} is {@code null}
+ * @since 3.0.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull Consumer super T> onSuccess, @NonNull Consumer super Throwable> onError) {
+ Objects.requireNonNull(onSuccess, "onSuccess is null");
+ Objects.requireNonNull(onError, "onError is null");
+ BlockingMultiObserver observer = new BlockingMultiObserver<>();
+ subscribe(observer);
+ observer.blockingConsume(onSuccess, onError, Functions.EMPTY_ACTION);
+ }
+
+ /**
+ * Subscribes to the current {@code Single} and calls the appropriate {@link SingleObserver} method on the current thread.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code blockingSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
An {@code onError} signal is delivered to the {@link SingleObserver#onError(Throwable)} method.
+ * If any of the {@code SingleObserver}'s methods throw, the {@link RuntimeException} is propagated to the caller of this method.
+ * If the current thread is interrupted, an {@link InterruptedException} is delivered to {@code observer.onError}.
+ *
+ *
+ * @param observer the {@code SingleObserver} to call methods on the current thread
+ * @throws NullPointerException if {@code observer} is {@code null}
+ * @since 3.0.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingSubscribe(@NonNull SingleObserver super T> observer) {
+ Objects.requireNonNull(observer, "observer is null");
+ BlockingDisposableMultiObserver blockingObserver = new BlockingDisposableMultiObserver<>();
+ observer.onSubscribe(blockingObserver);
+ subscribe(blockingObserver);
+ blockingObserver.blockingConsume(observer);
+ }
+
/**
* This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingDisposableMultiObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingDisposableMultiObserver.java
new file mode 100644
index 0000000000..da7e289e27
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingDisposableMultiObserver.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import java.util.concurrent.CountDownLatch;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.internal.disposables.*;
+import io.reactivex.rxjava3.internal.util.BlockingHelper;
+
+/**
+ * Blocks until the upstream terminates and dispatches the outcome to
+ * the actual observer.
+ *
+ * @param the element type of the source
+ * @since 3.0.0
+ */
+public final class BlockingDisposableMultiObserver
+extends CountDownLatch
+implements MaybeObserver, SingleObserver, CompletableObserver, Disposable {
+
+ T value;
+ Throwable error;
+
+ final SequentialDisposable upstream;
+
+ public BlockingDisposableMultiObserver() {
+ super(1);
+ upstream = new SequentialDisposable();
+ }
+
+ @Override
+ public void dispose() {
+ upstream.dispose();
+ countDown();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return upstream.isDisposed();
+ }
+
+ @Override
+ public void onSubscribe(@NonNull Disposable d) {
+ DisposableHelper.setOnce(upstream, d);
+ }
+
+ @Override
+ public void onSuccess(@NonNull T t) {
+ this.value = t;
+ upstream.lazySet(Disposable.disposed());
+ countDown();
+ }
+
+ @Override
+ public void onError(@NonNull Throwable e) {
+ this.error = e;
+ upstream.lazySet(Disposable.disposed());
+ countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ upstream.lazySet(Disposable.disposed());
+ countDown();
+ }
+
+ public void blockingConsume(CompletableObserver observer) {
+ if (getCount() != 0) {
+ try {
+ BlockingHelper.verifyNonBlocking();
+ await();
+ } catch (InterruptedException ex) {
+ dispose();
+ observer.onError(ex);
+ return;
+ }
+ }
+ if (isDisposed()) {
+ return;
+ }
+
+ Throwable ex = error;
+ if (ex != null) {
+ observer.onError(ex);
+ } else {
+ observer.onComplete();
+ }
+ }
+
+ public void blockingConsume(SingleObserver super T> observer) {
+ if (getCount() != 0) {
+ try {
+ BlockingHelper.verifyNonBlocking();
+ await();
+ } catch (InterruptedException ex) {
+ dispose();
+ observer.onError(ex);
+ return;
+ }
+ }
+ if (isDisposed()) {
+ return;
+ }
+
+ Throwable ex = error;
+ if (ex != null) {
+ observer.onError(ex);
+ } else {
+ observer.onSuccess(value);
+ }
+ }
+
+ public void blockingConsume(MaybeObserver super T> observer) {
+ if (getCount() != 0) {
+ try {
+ BlockingHelper.verifyNonBlocking();
+ await();
+ } catch (InterruptedException ex) {
+ dispose();
+ observer.onError(ex);
+ return;
+ }
+ }
+ if (isDisposed()) {
+ return;
+ }
+
+ Throwable ex = error;
+ if (ex != null) {
+ observer.onError(ex);
+ } else {
+ T v = value;
+ if (v == null) {
+ observer.onComplete();
+ } else {
+ observer.onSuccess(v);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java
index 1f4ee305bd..4945730ab5 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java
@@ -17,7 +17,10 @@
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.util.*;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* A combined Observer that awaits the success or error signal via a CountDownLatch.
@@ -143,4 +146,39 @@ public boolean blockingAwait(long timeout, TimeUnit unit) {
}
return true;
}
+
+ /**
+ * Blocks until the source completes and calls the appropriate callback.
+ * @param onSuccess for a succeeding source
+ * @param onError for a failing source
+ * @param onComplete for an empty source
+ */
+ public void blockingConsume(Consumer super T> onSuccess, Consumer super Throwable> onError, Action onComplete) {
+ try {
+ if (getCount() != 0) {
+ try {
+ BlockingHelper.verifyNonBlocking();
+ await();
+ } catch (InterruptedException ex) {
+ dispose();
+ onError.accept(ex);
+ return;
+ }
+ }
+ Throwable ex = error;
+ if (ex != null) {
+ onError.accept(ex);
+ return;
+ }
+ T v = value;
+ if (v != null) {
+ onSuccess.accept(v);
+ } else {
+ onComplete.run();
+ }
+ } catch (Throwable t) {
+ Exceptions.throwIfFatal(t);
+ RxJavaPlugins.onError(t);
+ }
+ }
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java
new file mode 100644
index 0000000000..e70aac7157
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java
@@ -0,0 +1,331 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.completable;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class CompletableBlockingSubscribeTest {
+
+ @Test
+ public void noArgComplete() {
+ Completable.complete()
+ .blockingSubscribe();
+ }
+
+ @Test
+ public void noArgCompleteAsync() {
+ Completable.complete()
+ .delay(100, TimeUnit.MILLISECONDS)
+ .blockingSubscribe();
+ }
+
+ @Test
+ public void noArgError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Completable.error(new TestException())
+ .blockingSubscribe();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+
+ @Test
+ public void noArgErrorAsync() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Completable.error(new TestException())
+ .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+ .blockingSubscribe();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+
+ @Test
+ public void oneArgComplete() throws Throwable {
+ Action action = mock(Action.class);
+
+ Completable.complete()
+ .blockingSubscribe(action);
+
+ verify(action).run();
+ }
+
+ @Test
+ public void oneArgCompleteAsync() throws Throwable {
+ Action action = mock(Action.class);
+
+ Completable.complete()
+ .delay(50, TimeUnit.MILLISECONDS)
+ .blockingSubscribe(action);
+
+ verify(action).run();
+ }
+
+ @Test
+ public void oneArgCompleteFails() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action action = mock(Action.class);
+ doThrow(new TestException()).when(action).run();
+
+ Completable.complete()
+ .blockingSubscribe(action);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(action).run();
+ });
+ }
+
+ @Test
+ public void oneArgError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action action = mock(Action.class);
+
+ Completable.error(new TestException())
+ .blockingSubscribe(action);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(action, never()).run();
+ });
+ }
+
+ @Test
+ public void oneArgErrorAsync() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action action = mock(Action.class);
+
+ Completable.error(new TestException())
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+ .blockingSubscribe(action);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(action, never()).run();
+ });
+ }
+
+ @Test
+ public void twoArgComplete() throws Throwable {
+ Action action = mock(Action.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Completable.complete()
+ .blockingSubscribe(action, consumer);
+
+ verify(action).run();
+ verify(consumer, never()).accept(any());
+ }
+
+ @Test
+ public void twoArgCompleteAsync() throws Throwable {
+ Action action = mock(Action.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Completable.complete()
+ .delay(50, TimeUnit.MILLISECONDS)
+ .blockingSubscribe(action, consumer);
+
+ verify(action).run();
+ verify(consumer, never()).accept(any());
+ }
+
+ @Test
+ public void twoArgCompleteFails() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action action = mock(Action.class);
+ doThrow(new TestException()).when(action).run();
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Completable.complete()
+ .blockingSubscribe(action, consumer);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(action).run();
+ verify(consumer, never()).accept(any());
+ });
+ }
+
+ @Test
+ public void twoArgError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action action = mock(Action.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Completable.error(new TestException())
+ .blockingSubscribe(action, consumer);
+
+ assertTrue("" + errors, errors.isEmpty());
+
+ verify(action, never()).run();
+ verify(consumer).accept(any(TestException.class));
+ });
+ }
+
+ @Test
+ public void twoArgErrorAsync() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action action = mock(Action.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Completable.error(new TestException())
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+ .blockingSubscribe(action, consumer);
+
+ assertTrue("" + errors, errors.isEmpty());
+
+ verify(action, never()).run();
+ verify(consumer).accept(any(TestException.class));
+ });
+ }
+
+ @Test
+ public void twoArgErrorFails() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action action = mock(Action.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+ doThrow(new TestException()).when(consumer).accept(any());
+
+ Completable.error(new TestException())
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+ .blockingSubscribe(action, consumer);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(action, never()).run();
+ verify(consumer).accept(any(TestException.class));
+ });
+ }
+
+ @Test
+ public void twoArgInterrupted() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action onDispose = mock(Action.class);
+
+ Action action = mock(Action.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Thread.currentThread().interrupt();
+
+ Completable.never()
+ .doOnDispose(onDispose)
+ .blockingSubscribe(action, consumer);
+
+ assertTrue("" + errors, errors.isEmpty());
+
+ verify(onDispose).run();
+ verify(action, never()).run();
+ verify(consumer).accept(any(InterruptedException.class));
+ });
+ }
+
+ @Test
+ public void observerComplete() {
+ TestObserver to = new TestObserver<>();
+
+ Completable.complete()
+ .blockingSubscribe(to);
+
+ to.assertResult();
+ }
+
+ @Test
+ public void observerCompleteAsync() {
+ TestObserver to = new TestObserver<>();
+
+ Completable.complete()
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+ .blockingSubscribe(to);
+
+ to.assertResult();
+ }
+
+ @Test
+ public void observerError() {
+ TestObserver to = new TestObserver<>();
+
+ Completable.error(new TestException())
+ .blockingSubscribe(to);
+
+ to.assertFailure(TestException.class);
+ }
+
+ @Test
+ public void observerErrorAsync() {
+ TestObserver to = new TestObserver<>();
+
+ Completable.error(new TestException())
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
+ .blockingSubscribe(to);
+
+ to.assertFailure(TestException.class);
+ }
+
+ @Test
+ public void observerDispose() throws Throwable {
+ Action onDispose = mock(Action.class);
+
+ TestObserver to = new TestObserver<>();
+ to.dispose();
+
+ Completable.never()
+ .doOnDispose(onDispose)
+ .blockingSubscribe(to);
+
+ to.assertEmpty();
+
+ verify(onDispose).run();
+ }
+
+ @Test
+ public void ovserverInterrupted() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action onDispose = mock(Action.class);
+
+ TestObserver to = new TestObserver<>();
+
+ Thread.currentThread().interrupt();
+
+ Completable.never()
+ .doOnDispose(onDispose)
+ .blockingSubscribe(to);
+
+ assertTrue("" + errors, errors.isEmpty());
+
+ verify(onDispose).run();
+ to.assertFailure(InterruptedException.class);
+ });
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeBlockingSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeBlockingSubscribeTest.java
new file mode 100644
index 0000000000..647d385eef
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeBlockingSubscribeTest.java
@@ -0,0 +1,502 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.Maybe;
+import io.reactivex.rxjava3.exceptions.TestException;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.observers.TestObserver;
+import io.reactivex.rxjava3.schedulers.Schedulers;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class MaybeBlockingSubscribeTest {
+
+ @Test
+ public void noArgSuccess() {
+ Maybe.just(1)
+ .blockingSubscribe();
+ }
+
+ @Test
+ public void noArgSuccessAsync() {
+ Maybe.just(1)
+ .delay(100, TimeUnit.MILLISECONDS)
+ .blockingSubscribe();
+ }
+
+ @Test
+ public void noArgEmpty() {
+ Maybe.empty()
+ .blockingSubscribe();
+ }
+
+ @Test
+ public void noArgEmptyAsync() {
+ Maybe.empty()
+ .delay(100, TimeUnit.MILLISECONDS)
+ .blockingSubscribe();
+ }
+
+ @Test
+ public void noArgError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Maybe.error(new TestException())
+ .blockingSubscribe();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+
+ @Test
+ public void noArgErrorAsync() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Maybe.error(new TestException())
+ .delay(100, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .blockingSubscribe();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+
+ @Test
+ public void oneArgSuccess() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+
+ Maybe.just(1)
+ .blockingSubscribe(success);
+
+ verify(success).accept(1);
+ }
+
+ @Test
+ public void oneArgSuccessAsync() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+
+ Maybe.just(1)
+ .delay(50, TimeUnit.MILLISECONDS)
+ .blockingSubscribe(success);
+
+ verify(success).accept(1);
+ }
+
+ @Test
+ public void oneArgEmpty() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+
+ Maybe.empty()
+ .blockingSubscribe(success);
+
+ verify(success, never()).accept(any());
+ }
+
+ @Test
+ public void oneArgEmptyAsync() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+
+ Maybe.empty()
+ .delay(50, TimeUnit.MILLISECONDS)
+ .blockingSubscribe(success);
+
+ verify(success, never()).accept(any());
+ }
+
+ @Test
+ public void oneArgSuccessFails() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ doThrow(new TestException()).when(success).accept(any());
+
+ Maybe.just(1)
+ .blockingSubscribe(success);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(success).accept(1);
+ });
+ }
+
+ @Test
+ public void oneArgError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+
+ Maybe.error(new TestException())
+ .blockingSubscribe(success);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(success, never()).accept(any());
+ });
+ }
+
+ @Test
+ public void oneArgErrorAsync() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+
+ Maybe.error(new TestException())
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .blockingSubscribe(success);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(success, never()).accept(any());
+ });
+ }
+
+ @Test
+ public void twoArgSuccess() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Maybe.just(1)
+ .blockingSubscribe(success, consumer);
+
+ verify(success).accept(1);
+ verify(consumer, never()).accept(any());
+ }
+
+ @Test
+ public void twoArgSuccessAsync() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Maybe.just(1)
+ .delay(50, TimeUnit.MILLISECONDS)
+ .blockingSubscribe(success, consumer);
+
+ verify(success).accept(any());
+ verify(consumer, never()).accept(any());
+ }
+
+ @Test
+ public void twoArgEmpty() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Maybe.empty()
+ .blockingSubscribe(success, consumer);
+
+ verify(success, never()).accept(any());
+ verify(consumer, never()).accept(any());
+ }
+
+ @Test
+ public void twoArgEmptyAsync() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Maybe.empty()
+ .delay(50, TimeUnit.MILLISECONDS)
+ .blockingSubscribe(success, consumer);
+
+ verify(success, never()).accept(any());
+ verify(consumer, never()).accept(any());
+ }
+
+ @Test
+ public void twoArgSuccessFails() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ doThrow(new TestException()).when(success).accept(any());
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Maybe.just(1)
+ .blockingSubscribe(success, consumer);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(success).accept(any());
+ verify(consumer, never()).accept(any());
+ });
+ }
+
+ @Test
+ public void twoArgError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Maybe.error(new TestException())
+ .blockingSubscribe(success, consumer);
+
+ assertTrue("" + errors, errors.isEmpty());
+
+ verify(success, never()).accept(any());
+ verify(consumer).accept(any(TestException.class));
+ });
+ }
+
+ @Test
+ public void twoArgErrorAsync() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Maybe.error(new TestException())
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .blockingSubscribe(success, consumer);
+
+ assertTrue("" + errors, errors.isEmpty());
+
+ verify(success, never()).accept(any());
+ verify(consumer).accept(any(TestException.class));
+ });
+ }
+
+ @Test
+ public void twoArgErrorFails() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+ doThrow(new TestException()).when(consumer).accept(any());
+
+ Maybe.error(new TestException())
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .blockingSubscribe(success, consumer);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(success, never()).accept(any());
+ verify(consumer).accept(any(TestException.class));
+ });
+ }
+
+ @Test
+ public void threeArgSuccess() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+ Action action = mock(Action.class);
+
+ Maybe.just(1)
+ .blockingSubscribe(success, consumer, action);
+
+ verify(success).accept(any());
+ verify(consumer, never()).accept(any(Throwable.class));
+ verify(action, never()).run();
+ }
+
+ @Test
+ public void threeArgEmpty() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+ Action action = mock(Action.class);
+
+ Maybe.empty()
+ .blockingSubscribe(success, consumer, action);
+
+ verify(success, never()).accept(any());
+ verify(consumer, never()).accept(any(Throwable.class));
+ verify(action).run();
+ }
+
+ @Test
+ public void threeArgError() throws Throwable {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+ Action action = mock(Action.class);
+
+ Maybe.error(new TestException())
+ .blockingSubscribe(success, consumer, action);
+
+ verify(success, never()).accept(any());
+ verify(consumer).accept(any(TestException.class));
+ verify(action, never()).run();
+ }
+
+ @Test
+ public void threeArgEmptyFails() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+
+ Action action = mock(Action.class);
+ doThrow(new TestException()).when(action).run();
+
+ Maybe.empty()
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .blockingSubscribe(success, consumer, action);
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+
+ verify(success, never()).accept(any());
+ verify(consumer, never()).accept(any());
+ verify(action).run();
+ });
+ }
+
+ @Test
+ public void threeArgInterrupted() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ Action onDispose = mock(Action.class);
+
+ @SuppressWarnings("unchecked")
+ Consumer success = mock(Consumer.class);
+ @SuppressWarnings("unchecked")
+ Consumer super Throwable> consumer = mock(Consumer.class);
+ Action action = mock(Action.class);
+
+ Thread.currentThread().interrupt();
+
+ Maybe.never()
+ .doOnDispose(onDispose)
+ .blockingSubscribe(success, consumer, action);
+
+ assertTrue("" + errors, errors.isEmpty());
+
+ verify(onDispose).run();
+ verify(success, never()).accept(any());
+ verify(action, never()).run();
+ verify(consumer).accept(any(InterruptedException.class));
+ });
+ }
+
+ @Test
+ public void observerSuccess() {
+ TestObserver to = new TestObserver<>();
+
+ Maybe.just(1)
+ .blockingSubscribe(to);
+
+ to.assertResult(1);
+ }
+
+ @Test
+ public void observerSuccessAsync() {
+ TestObserver to = new TestObserver<>();
+
+ Maybe.just(1)
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .blockingSubscribe(to);
+
+ to.assertResult(1);
+ }
+
+ @Test
+ public void observerEmpty() {
+ TestObserver to = new TestObserver<>();
+
+ Maybe.empty()
+ .blockingSubscribe(to);
+
+ to.assertResult();
+ }
+
+ @Test
+ public void observerEmptyAsync() {
+ TestObserver to = new TestObserver<>();
+
+ Maybe.empty()
+ .delay(50, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .blockingSubscribe(to);
+
+ to.assertResult();
+ }
+
+ @Test
+ public void observerError() {
+ TestObserver