,
return toSingle(toFlowable().retryWhen(handler));
}
+ /**
+ * Wraps the given {@link SingleObserver}, catches any {@link RuntimeException}s thrown by its
+ * {@link SingleObserver#onSubscribe(Disposable)}, {@link SingleObserver#onSuccess(Object)} or
+ * {@link SingleObserver#onError(Throwable)} methods* and routes those to the global error handler
+ * via {@link RxJavaPlugins#onError(Throwable)}.
+ *
+ * By default, the {@code Single} protocol forbids the {@code onXXX} methods to throw, but some
+ * {@code SingleObserver} implementation may do it anyway, causing undefined behavior in the
+ * upstream. This method and the underlying safe wrapper ensures such misbehaving consumers don't
+ * disrupt the protocol.
+ *
+ * - Scheduler:
+ * - {@code safeSubscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param observer the potentially misbehaving {@code SingleObserver}
+ * @throws NullPointerException if {@code observer} is {@code null}
+ * @see #subscribe(Consumer,Consumer)
+ * @since 3.0.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void safeSubscribe(@NonNull SingleObserver super T> observer) {
+ Objects.requireNonNull(observer, "observer is null");
+ subscribe(new SafeSingleObserver<>(observer));
+ }
+
/**
* Returns a {@link Flowable} which first runs the other {@link CompletableSource}
* then the current {@code Single} if the other completed normally.
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/SafeCompletableObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/SafeCompletableObserver.java
new file mode 100644
index 0000000000..d9e929c234
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/SafeCompletableObserver.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.CompletableObserver;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Wraps another {@link CompletableObserver} and catches exceptions thrown by its
+ * {@code onSubscribe}, {@code onError} or
+ * {@code onComplete} methods despite the protocol forbids it.
+ *
+ * Such exceptions are routed to the {@link RxJavaPlugins#onError(Throwable)} handler.
+ *
+ * @since 3.0.0
+ */
+public final class SafeCompletableObserver implements CompletableObserver {
+
+ final CompletableObserver downstream;
+
+ boolean onSubscribeFailed;
+
+ public SafeCompletableObserver(CompletableObserver downstream) {
+ this.downstream = downstream;
+ }
+
+ @Override
+ public void onSubscribe(@NonNull Disposable d) {
+ try {
+ downstream.onSubscribe(d);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ onSubscribeFailed = true;
+ d.dispose();
+ RxJavaPlugins.onError(ex);
+ }
+ }
+
+ @Override
+ public void onError(@NonNull Throwable e) {
+ if (onSubscribeFailed) {
+ RxJavaPlugins.onError(e);
+ } else {
+ try {
+ downstream.onError(e);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ RxJavaPlugins.onError(new CompositeException(e, ex));
+ }
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (!onSubscribeFailed) {
+ try {
+ downstream.onComplete();
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ RxJavaPlugins.onError(ex);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/SafeMaybeObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/SafeMaybeObserver.java
new file mode 100644
index 0000000000..a6e794b1fd
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/SafeMaybeObserver.java
@@ -0,0 +1,91 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.MaybeObserver;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Wraps another {@link MaybeObserver} and catches exceptions thrown by its
+ * {@code onSubscribe}, {@code onSuccess}, {@code onError} or
+ * {@code onComplete} methods despite the protocol forbids it.
+ *
+ * Such exceptions are routed to the {@link RxJavaPlugins#onError(Throwable)} handler.
+ *
+ * @param the element type of the sequence
+ * @since 3.0.0
+ */
+public final class SafeMaybeObserver implements MaybeObserver {
+
+ final MaybeObserver super T> downstream;
+
+ boolean onSubscribeFailed;
+
+ public SafeMaybeObserver(MaybeObserver super T> downstream) {
+ this.downstream = downstream;
+ }
+
+ @Override
+ public void onSubscribe(@NonNull Disposable d) {
+ try {
+ downstream.onSubscribe(d);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ onSubscribeFailed = true;
+ d.dispose();
+ RxJavaPlugins.onError(ex);
+ }
+ }
+
+ @Override
+ public void onSuccess(@NonNull T t) {
+ if (!onSubscribeFailed) {
+ try {
+ downstream.onSuccess(t);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ RxJavaPlugins.onError(ex);
+ }
+ }
+ }
+
+ @Override
+ public void onError(@NonNull Throwable e) {
+ if (onSubscribeFailed) {
+ RxJavaPlugins.onError(e);
+ } else {
+ try {
+ downstream.onError(e);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ RxJavaPlugins.onError(new CompositeException(e, ex));
+ }
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (!onSubscribeFailed) {
+ try {
+ downstream.onComplete();
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ RxJavaPlugins.onError(ex);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/SafeSingleObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/SafeSingleObserver.java
new file mode 100644
index 0000000000..6757350756
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/SafeSingleObserver.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.SingleObserver;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Wraps another {@link SingleObserver} and catches exceptions thrown by its
+ * {@code onSubscribe}, {@code onSuccess} or {@code onError} methods despite
+ * the protocol forbids it.
+ *
+ * Such exceptions are routed to the {@link RxJavaPlugins#onError(Throwable)} handler.
+ *
+ * @param the element type of the sequence
+ * @since 3.0.0
+ */
+public final class SafeSingleObserver implements SingleObserver {
+
+ final SingleObserver super T> downstream;
+
+ boolean onSubscribeFailed;
+
+ public SafeSingleObserver(SingleObserver super T> downstream) {
+ this.downstream = downstream;
+ }
+
+ @Override
+ public void onSubscribe(@NonNull Disposable d) {
+ try {
+ downstream.onSubscribe(d);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ onSubscribeFailed = true;
+ d.dispose();
+ RxJavaPlugins.onError(ex);
+ }
+ }
+
+ @Override
+ public void onSuccess(@NonNull T t) {
+ if (!onSubscribeFailed) {
+ try {
+ downstream.onSuccess(t);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ RxJavaPlugins.onError(ex);
+ }
+ }
+ }
+
+ @Override
+ public void onError(@NonNull Throwable e) {
+ if (onSubscribeFailed) {
+ RxJavaPlugins.onError(e);
+ } else {
+ try {
+ downstream.onError(e);
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ RxJavaPlugins.onError(new CompositeException(e, ex));
+ }
+ }
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableSafeSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableSafeSubscribeTest.java
new file mode 100644
index 0000000000..234fb0290c
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableSafeSubscribeTest.java
@@ -0,0 +1,148 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.completable;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class CompletableSafeSubscribeTest {
+
+ @Test
+ public void normalError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ CompletableObserver consumer = mock(CompletableObserver.class);
+
+ Completable.error(new TestException())
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onError(any(TestException.class));
+ order.verifyNoMoreInteractions();
+
+ assertTrue("" + errors, errors.isEmpty());
+ });
+ }
+
+ @Test
+ public void normalEmpty() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ CompletableObserver consumer = mock(CompletableObserver.class);
+
+ Completable.complete()
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onComplete();
+ order.verifyNoMoreInteractions();
+ });
+ }
+
+ @Test
+ public void onSubscribeCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ CompletableObserver consumer = mock(CompletableObserver.class);
+ doThrow(new TestException()).when(consumer).onSubscribe(any());
+
+ Disposable d = Disposable.empty();
+
+ new Completable() {
+ @Override
+ protected void subscribeActual(@NonNull CompletableObserver observer) {
+ observer.onSubscribe(d);
+ // none of the following should arrive at the consumer
+ observer.onError(new IOException());
+ observer.onComplete();
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verifyNoMoreInteractions();
+
+ assertTrue(d.isDisposed());
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ TestHelper.assertUndeliverable(errors, 1, IOException.class);
+ });
+ }
+
+ @Test
+ public void onErrorCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ CompletableObserver consumer = mock(CompletableObserver.class);
+ doThrow(new TestException()).when(consumer).onError(any());
+
+ new Completable() {
+ @Override
+ protected void subscribeActual(@NonNull CompletableObserver observer) {
+ observer.onSubscribe(Disposable.empty());
+ // none of the following should arrive at the consumer
+ observer.onError(new IOException());
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onError(any(IOException.class));
+ order.verifyNoMoreInteractions();
+
+ TestHelper.assertError(errors, 0, CompositeException.class);
+
+ CompositeException compositeException = (CompositeException)errors.get(0);
+ TestHelper.assertError(compositeException.getExceptions(), 0, IOException.class);
+ TestHelper.assertError(compositeException.getExceptions(), 1, TestException.class);
+ });
+ }
+
+ @Test
+ public void onCompleteCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ CompletableObserver consumer = mock(CompletableObserver.class);
+ doThrow(new TestException()).when(consumer).onComplete();
+
+ new Completable() {
+ @Override
+ protected void subscribeActual(@NonNull CompletableObserver observer) {
+ observer.onSubscribe(Disposable.empty());
+ // none of the following should arrive at the consumer
+ observer.onComplete();
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onComplete();
+ order.verifyNoMoreInteractions();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSafeSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSafeSubscribeTest.java
new file mode 100644
index 0000000000..5083c20f2c
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSafeSubscribeTest.java
@@ -0,0 +1,197 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class MaybeSafeSubscribeTest {
+
+ @Test
+ public void normalSuccess() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ MaybeObserver consumer = mock(MaybeObserver.class);
+
+ Maybe.just(1)
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onSuccess(1);
+ order.verifyNoMoreInteractions();
+
+ assertTrue("" + errors, errors.isEmpty());
+ });
+ }
+
+ @Test
+ public void normalError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ MaybeObserver consumer = mock(MaybeObserver.class);
+
+ Maybe.error(new TestException())
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onError(any(TestException.class));
+ order.verifyNoMoreInteractions();
+
+ assertTrue("" + errors, errors.isEmpty());
+ });
+ }
+
+ @Test
+ public void normalEmpty() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ MaybeObserver consumer = mock(MaybeObserver.class);
+
+ Maybe.empty()
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onComplete();
+ order.verifyNoMoreInteractions();
+ });
+ }
+
+ @Test
+ public void onSubscribeCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ MaybeObserver consumer = mock(MaybeObserver.class);
+ doThrow(new TestException()).when(consumer).onSubscribe(any());
+
+ Disposable d = Disposable.empty();
+
+ new Maybe() {
+ @Override
+ protected void subscribeActual(@NonNull MaybeObserver super Integer> observer) {
+ observer.onSubscribe(d);
+ // none of the following should arrive at the consumer
+ observer.onSuccess(1);
+ observer.onError(new IOException());
+ observer.onComplete();
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verifyNoMoreInteractions();
+
+ assertTrue(d.isDisposed());
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ TestHelper.assertUndeliverable(errors, 1, IOException.class);
+ });
+ }
+
+ @Test
+ public void onSuccessCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ MaybeObserver consumer = mock(MaybeObserver.class);
+ doThrow(new TestException()).when(consumer).onSuccess(any());
+
+ new Maybe() {
+ @Override
+ protected void subscribeActual(@NonNull MaybeObserver super Integer> observer) {
+ observer.onSubscribe(Disposable.empty());
+ observer.onSuccess(1);
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onSuccess(1);
+ order.verifyNoMoreInteractions();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+
+ @Test
+ public void onErrorCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ MaybeObserver consumer = mock(MaybeObserver.class);
+ doThrow(new TestException()).when(consumer).onError(any());
+
+ new Maybe() {
+ @Override
+ protected void subscribeActual(@NonNull MaybeObserver super Integer> observer) {
+ observer.onSubscribe(Disposable.empty());
+ // none of the following should arrive at the consumer
+ observer.onError(new IOException());
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onError(any(IOException.class));
+ order.verifyNoMoreInteractions();
+
+ TestHelper.assertError(errors, 0, CompositeException.class);
+
+ CompositeException compositeException = (CompositeException)errors.get(0);
+ TestHelper.assertError(compositeException.getExceptions(), 0, IOException.class);
+ TestHelper.assertError(compositeException.getExceptions(), 1, TestException.class);
+ });
+ }
+
+ @Test
+ public void onCompleteCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ MaybeObserver consumer = mock(MaybeObserver.class);
+ doThrow(new TestException()).when(consumer).onComplete();
+
+ new Maybe() {
+ @Override
+ protected void subscribeActual(@NonNull MaybeObserver super Integer> observer) {
+ observer.onSubscribe(Disposable.empty());
+ // none of the following should arrive at the consumer
+ observer.onComplete();
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onComplete();
+ order.verifyNoMoreInteractions();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSafeSubscribeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSafeSubscribeTest.java
new file mode 100644
index 0000000000..80abb4cb1a
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSafeSubscribeTest.java
@@ -0,0 +1,154 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class SingleSafeSubscribeTest {
+
+ @Test
+ public void normalSuccess() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ SingleObserver consumer = mock(SingleObserver.class);
+
+ Single.just(1)
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onSuccess(1);
+ order.verifyNoMoreInteractions();
+
+ assertTrue("" + errors, errors.isEmpty());
+ });
+ }
+
+ @Test
+ public void normalError() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ SingleObserver consumer = mock(SingleObserver.class);
+
+ Single.error(new TestException())
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onError(any(TestException.class));
+ order.verifyNoMoreInteractions();
+
+ assertTrue("" + errors, errors.isEmpty());
+ });
+ }
+
+ @Test
+ public void onSubscribeCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ SingleObserver consumer = mock(SingleObserver.class);
+ doThrow(new TestException()).when(consumer).onSubscribe(any());
+
+ Disposable d = Disposable.empty();
+
+ new Single() {
+ @Override
+ protected void subscribeActual(@NonNull SingleObserver super Integer> observer) {
+ observer.onSubscribe(d);
+ // none of the following should arrive at the consumer
+ observer.onSuccess(1);
+ observer.onError(new IOException());
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verifyNoMoreInteractions();
+
+ assertTrue(d.isDisposed());
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ TestHelper.assertUndeliverable(errors, 1, IOException.class);
+ });
+ }
+
+ @Test
+ public void onSuccessCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ SingleObserver consumer = mock(SingleObserver.class);
+ doThrow(new TestException()).when(consumer).onSuccess(any());
+
+ new Single() {
+ @Override
+ protected void subscribeActual(@NonNull SingleObserver super Integer> observer) {
+ observer.onSubscribe(Disposable.empty());
+ observer.onSuccess(1);
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onSuccess(1);
+ order.verifyNoMoreInteractions();
+
+ TestHelper.assertUndeliverable(errors, 0, TestException.class);
+ });
+ }
+
+ @Test
+ public void onErrorCrash() throws Throwable {
+ TestHelper.withErrorTracking(errors -> {
+ @SuppressWarnings("unchecked")
+ SingleObserver consumer = mock(SingleObserver.class);
+ doThrow(new TestException()).when(consumer).onError(any());
+
+ new Single() {
+ @Override
+ protected void subscribeActual(@NonNull SingleObserver super Integer> observer) {
+ observer.onSubscribe(Disposable.empty());
+ // none of the following should arrive at the consumer
+ observer.onError(new IOException());
+ }
+ }
+ .safeSubscribe(consumer);
+
+ InOrder order = inOrder(consumer);
+ order.verify(consumer).onSubscribe(any(Disposable.class));
+ order.verify(consumer).onError(any(IOException.class));
+ order.verifyNoMoreInteractions();
+
+ TestHelper.assertError(errors, 0, CompositeException.class);
+
+ CompositeException compositeException = (CompositeException)errors.get(0);
+ TestHelper.assertError(compositeException.getExceptions(), 0, IOException.class);
+ TestHelper.assertError(compositeException.getExceptions(), 1, TestException.class);
+ });
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java
index 0e2c0ada16..714a6809f9 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java
@@ -147,7 +147,12 @@ public static void main(String[] args) throws IOException {
out.println("
");
}
}
- if (!tbdList.isEmpty()) {
+ if (tbdList.isEmpty()) {
+ out.println();
+ out.println("#### Under development");
+ out.println();
+ out.println("*Currently, all intended operators are implemented.*");
+ } else {
out.println();
out.println("#### Under development");
out.println();