diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index f8cbdcdbe7..20e52f2f5f 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -3653,7 +3653,7 @@ public final Maybe flatMap(@NonNull Function * @@ -3691,7 +3691,7 @@ public final Maybe flatMap( * Returns a {@code Maybe} that emits the results of a specified function to the pair of values emitted by the * current {@code Maybe} and a specified mapped {@link MaybeSource}. *

- * + * *

*
Scheduler:
*
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index cab4d3c073..667125b08b 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -3196,6 +3196,72 @@ public final Single flatMap(@NonNull Function(this, mapper)); } + /** + * Returns a {@code Single} that emits the results of a specified function to the pair of values emitted by the + * current {@code Single} and a specified mapped {@link SingleSource}. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of items emitted by the {@code SingleSource} returned by the {@code mapper} function + * @param + * the type of items emitted by the resulting {@code Single} + * @param mapper + * a function that returns a {@code SingleSource} for the item emitted by the current {@code Single} + * @param combiner + * a function that combines one item emitted by each of the source and collection {@code SingleSource} and + * returns an item to be emitted by the resulting {@code SingleSource} + * @return the new {@code Single} instance + * @throws NullPointerException if {@code mapper} or {@code combiner} is {@code null} + * @see ReactiveX operators documentation: FlatMap + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Single flatMap(@NonNull Function> mapper, + @NonNull BiFunction combiner) { + Objects.requireNonNull(mapper, "mapper is null"); + Objects.requireNonNull(combiner, "combiner is null"); + return RxJavaPlugins.onAssembly(new SingleFlatMapBiSelector<>(this, mapper, combiner)); + } + + /** + * Maps the {@code onSuccess} or {@code onError} signals of the current {@code Single} into a {@link SingleSource} and emits that + * {@code SingleSource}'s signals. + *

+ * + *

+ *
Scheduler:
+ *
{@code flatMap} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the result type + * @param onSuccessMapper + * a function that returns a {@code SingleSource} to merge for the {@code onSuccess} item emitted by this {@code Single} + * @param onErrorMapper + * a function that returns a {@code SingleSource} to merge for an {@code onError} notification from this {@code Single} + * @return the new {@code Single} instance + * @throws NullPointerException if {@code onSuccessMapper} or {@code onErrorMapper} is {@code null} + * @see ReactiveX operators documentation: FlatMap + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Single flatMap( + @NonNull Function> onSuccessMapper, + @NonNull Function> onErrorMapper) { + Objects.requireNonNull(onSuccessMapper, "onSuccessMapper is null"); + Objects.requireNonNull(onErrorMapper, "onErrorMapper is null"); + return RxJavaPlugins.onAssembly(new SingleFlatMapNotification<>(this, onSuccessMapper, onErrorMapper)); + } + /** * Returns a {@link Maybe} that is based on applying a specified function to the item emitted by the current {@code Single}, * where that function returns a {@link MaybeSource}. diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java index e357938db2..ffe6ee5e20 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java @@ -109,7 +109,9 @@ public void onSuccess(T value) { return; } - source.subscribe(new InnerObserver()); + if (!isDisposed()) { + source.subscribe(new InnerObserver()); + } } @Override @@ -124,7 +126,9 @@ public void onError(Throwable e) { return; } - source.subscribe(new InnerObserver()); + if (!isDisposed()) { + source.subscribe(new InnerObserver()); + } } @Override @@ -139,7 +143,9 @@ public void onComplete() { return; } - source.subscribe(new InnerObserver()); + if (!isDisposed()) { + source.subscribe(new InnerObserver()); + } } final class InnerObserver implements MaybeObserver { diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java index aed71dc855..999b22f836 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java @@ -89,7 +89,9 @@ public void onSuccess(T value) { return; } - ss.subscribe(new FlatMapSingleObserver(this, downstream)); + if (!isDisposed()) { + ss.subscribe(new FlatMapSingleObserver(this, downstream)); + } } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java index ae42405f74..ef1dc5f023 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java @@ -106,7 +106,9 @@ public void onSuccess(T t) { return; } - o.subscribe(this); + if (!isDisposed()) { + o.subscribe(this); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java index c590755704..82a6d5da7d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java @@ -116,7 +116,9 @@ public void onSuccess(T t) { return; } - p.subscribe(this); + if (get() != SubscriptionHelper.CANCELLED) { + p.subscribe(this); + } } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java index c3dd9b59c1..b2562d8dba 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java @@ -106,7 +106,9 @@ public void onSuccess(T t) { return; } - o.subscribe(this); + if (!isDisposed()) { + o.subscribe(this); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelector.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelector.java new file mode 100644 index 0000000000..d1051f3357 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelector.java @@ -0,0 +1,156 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +/** + * Maps a source item to another SingleSource then calls a BiFunction with the + * original item and the secondary item to generate the final result. + * + * @param the main value type + * @param the second value type + * @param the result value type + * @since 3.0.0 + */ +public final class SingleFlatMapBiSelector extends Single { + + final SingleSource source; + + final Function> mapper; + + final BiFunction resultSelector; + + public SingleFlatMapBiSelector(SingleSource source, + Function> mapper, + BiFunction resultSelector) { + this.source = source; + this.mapper = mapper; + this.resultSelector = resultSelector; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + source.subscribe(new FlatMapBiMainObserver(observer, mapper, resultSelector)); + } + + static final class FlatMapBiMainObserver + implements SingleObserver, Disposable { + + final Function> mapper; + + final InnerObserver inner; + + FlatMapBiMainObserver(SingleObserver actual, + Function> mapper, + BiFunction resultSelector) { + this.inner = new InnerObserver<>(actual, resultSelector); + this.mapper = mapper; + } + + @Override + public void dispose() { + DisposableHelper.dispose(inner); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(inner.get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(inner, d)) { + inner.downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + SingleSource next; + + try { + next = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + inner.downstream.onError(ex); + return; + } + + if (DisposableHelper.replace(inner, null)) { + inner.value = value; + next.subscribe(inner); + } + } + + @Override + public void onError(Throwable e) { + inner.downstream.onError(e); + } + + static final class InnerObserver + extends AtomicReference + implements SingleObserver { + + private static final long serialVersionUID = -2897979525538174559L; + + final SingleObserver downstream; + + final BiFunction resultSelector; + + T value; + + InnerObserver(SingleObserver actual, + BiFunction resultSelector) { + this.downstream = actual; + this.resultSelector = resultSelector; + } + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(this, d); + } + + @Override + public void onSuccess(U value) { + T t = this.value; + this.value = null; + + R r; + + try { + r = Objects.requireNonNull(resultSelector.apply(t, value), "The resultSelector returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + downstream.onSuccess(r); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotification.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotification.java new file mode 100644 index 0000000000..1ba9796b7c --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotification.java @@ -0,0 +1,147 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +/** + * Maps a value into a SingleSource and relays its signal. + * + * @param the source value type + * @param the result value type + * @since 3.0.0 + */ +public final class SingleFlatMapNotification extends Single { + + final SingleSource source; + + final Function> onSuccessMapper; + + final Function> onErrorMapper; + + public SingleFlatMapNotification(SingleSource source, + Function> onSuccessMapper, + Function> onErrorMapper) { + this.source = source; + this.onSuccessMapper = onSuccessMapper; + this.onErrorMapper = onErrorMapper; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + source.subscribe(new FlatMapSingleObserver<>(observer, onSuccessMapper, onErrorMapper)); + } + + static final class FlatMapSingleObserver + extends AtomicReference + implements SingleObserver, Disposable { + + private static final long serialVersionUID = 4375739915521278546L; + + final SingleObserver downstream; + + final Function> onSuccessMapper; + + final Function> onErrorMapper; + + Disposable upstream; + + FlatMapSingleObserver(SingleObserver actual, + Function> onSuccessMapper, + Function> onErrorMapper) { + this.downstream = actual; + this.onSuccessMapper = onSuccessMapper; + this.onErrorMapper = onErrorMapper; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + SingleSource source; + + try { + source = Objects.requireNonNull(onSuccessMapper.apply(value), "The onSuccessMapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + if (!isDisposed()) { + source.subscribe(new InnerObserver()); + } + } + + @Override + public void onError(Throwable e) { + SingleSource source; + + try { + source = Objects.requireNonNull(onErrorMapper.apply(e), "The onErrorMapper returned a null SingleSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(new CompositeException(e, ex)); + return; + } + + if (!isDisposed()) { + source.subscribe(new InnerObserver()); + } + } + + final class InnerObserver implements SingleObserver { + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(FlatMapSingleObserver.this, d); + } + + @Override + public void onSuccess(R value) { + downstream.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java index f474830e40..c89913791d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapPublisher.java @@ -92,7 +92,9 @@ public void onSuccess(S value) { downstream.onError(e); return; } - f.subscribe(this); + if (parent.get() != SubscriptionHelper.CANCELLED) { + f.subscribe(this); + } } @Override diff --git a/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java b/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java index 6e80b57904..06394de7ab 100644 --- a/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java +++ b/src/test/java/io/reactivex/rxjava3/core/XFlatMapTest.java @@ -22,7 +22,7 @@ import org.reactivestreams.Publisher; import io.reactivex.rxjava3.exceptions.TestException; -import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.Schedulers; @@ -224,7 +224,7 @@ public Completable apply(Integer v) throws Exception { } @Test - public void observableFlowable() throws Exception { + public void observableObservable() throws Exception { List errors = TestHelper.trackPluginErrors(); try { TestObserver to = Observable.just(1) @@ -505,7 +505,179 @@ public Completable apply(Integer v) throws Exception { } @Test - @Ignore + public void singlePublisher() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Single.just(1) + .subscribeOn(Schedulers.io()) + .flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + sleep(); + return Flowable.error(new TestException()); + } + }) + .test(); + + cb.await(); + + beforeCancelSleep(ts); + + ts.cancel(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + ts.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void singleCombiner() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.io()) + .flatMap(new Function>() { + @Override + public Single apply(Integer v) throws Exception { + sleep(); + return Single.error(new TestException()); + } + }, (a, b) -> a + b) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void singleObservable() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.io()) + .flatMapObservable(new Function>() { + @Override + public Observable apply(Integer v) throws Exception { + sleep(); + return Observable.error(new TestException()); + } + }) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void singleNotificationSuccess() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.just(1) + .subscribeOn(Schedulers.io()) + .flatMap( + new Function>() { + @Override + public Single apply(Integer v) throws Exception { + sleep(); + return Single.error(new TestException()); + } + }, + new Function>() { + @Override + public Single apply(Throwable v) throws Exception { + sleep(); + return Single.error(new TestException()); + } + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void singleNotificationError() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Single.error(new TestException()) + .subscribeOn(Schedulers.io()) + .flatMap( + new Function>() { + @Override + public Single apply(Integer v) throws Exception { + sleep(); + return Single.error(new TestException()); + } + }, + new Function>() { + @Override + public Single apply(Throwable v) throws Exception { + sleep(); + return Single.error(new TestException()); + } + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test public void maybeSingle() throws Exception { List errors = TestHelper.trackPluginErrors(); try { @@ -537,6 +709,37 @@ public Single apply(Integer v) throws Exception { } } + @Test + public void maybeSingle2() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.io()) + .flatMapSingle(new Function>() { + @Override + public Single apply(Integer v) throws Exception { + sleep(); + return Single.error(new TestException()); + } + }) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void maybeMaybe() throws Exception { List errors = TestHelper.trackPluginErrors(); @@ -568,6 +771,240 @@ public Maybe apply(Integer v) throws Exception { } } + @Test + public void maybePublisher() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestSubscriber ts = Maybe.just(1) + .subscribeOn(Schedulers.io()) + .flatMapPublisher(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + sleep(); + return Flowable.error(new TestException()); + } + }) + .test(); + + cb.await(); + + beforeCancelSleep(ts); + + ts.cancel(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + ts.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void maybeObservable() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.io()) + .flatMapObservable(new Function>() { + @Override + public Observable apply(Integer v) throws Exception { + sleep(); + return Observable.error(new TestException()); + } + }) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void maybeNotificationSuccess() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.io()) + .flatMap( + new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + }, + new Function>() { + @Override + public Maybe apply(Throwable v) throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + }, + new Supplier>() { + @Override + public Maybe get() throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void maybeNotificationError() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.error(new TestException()) + .subscribeOn(Schedulers.io()) + .flatMap( + new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + }, + new Function>() { + @Override + public Maybe apply(Throwable v) throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + }, + new Supplier>() { + @Override + public Maybe get() throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void maybeNotificationEmpty() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.empty() + .subscribeOn(Schedulers.io()) + .flatMap( + new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + }, + new Function>() { + @Override + public Maybe apply(Throwable v) throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + }, + new Supplier>() { + @Override + public Maybe get() throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + } + ) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void maybeCombiner() throws Exception { + List errors = TestHelper.trackPluginErrors(); + try { + TestObserver to = Maybe.just(1) + .subscribeOn(Schedulers.io()) + .flatMap(new Function>() { + @Override + public Maybe apply(Integer v) throws Exception { + sleep(); + return Maybe.error(new TestException()); + } + }, (a, b) -> a + b) + .test(); + + cb.await(); + + beforeCancelSleep(to); + + to.dispose(); + + Thread.sleep(SLEEP_AFTER_CANCEL); + + to.assertEmpty(); + + assertTrue(errors.toString(), errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void maybeCompletable() throws Exception { List errors = TestHelper.trackPluginErrors(); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelectorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelectorTest.java new file mode 100644 index 0000000000..84a2a650c4 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelectorTest.java @@ -0,0 +1,205 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleFlatMapBiSelectorTest extends RxJavaTest { + + BiFunction stringCombine() { + return new BiFunction() { + @Override + public String apply(Integer a, Integer b) throws Exception { + return a + ":" + b; + } + }; + } + + @Test + public void normal() { + Single.just(1) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(2); + } + }, stringCombine()) + .test() + .assertResult("1:2"); + } + + @Test + public void errorWithJust() { + final int[] call = { 0 }; + + Single.error(new TestException()) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + call[0]++; + return Single.just(1); + } + }, stringCombine()) + .test() + .assertFailure(TestException.class); + + assertEquals(0, call[0]); + } + + @Test + public void justWithError() { + final int[] call = { 0 }; + + Single.just(1) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + call[0]++; + return Single.error(new TestException()); + } + }, stringCombine()) + .test() + .assertFailure(TestException.class); + + assertEquals(1, call[0]); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(SingleSubject.create() + .flatMap(new Function>() { + @Override + public SingleSource apply(Object v) throws Exception { + return Single.just(1); + } + }, new BiFunction() { + @Override + public Object apply(Object a, Integer b) throws Exception { + return b; + } + })); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single v) throws Exception { + return v.flatMap(new Function>() { + @Override + public SingleSource apply(Object v) throws Exception { + return Single.just(1); + } + }, new BiFunction() { + @Override + public Object apply(Object a, Integer b) throws Exception { + return b; + } + }); + } + }); + } + + @Test + public void mapperThrows() { + Single.just(1) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + throw new TestException(); + } + }, stringCombine()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperReturnsNull() { + Single.just(1) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return null; + } + }, stringCombine()) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void resultSelectorThrows() { + Single.just(1) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(2); + } + }, new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void resultSelectorReturnsNull() { + Single.just(1) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + return Single.just(2); + } + }, new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void mapperCancels() { + final TestObserver to = new TestObserver<>(); + + Single.just(1) + .flatMap(new Function>() { + @Override + public SingleSource apply(Integer v) throws Exception { + to.dispose(); + return Single.just(2); + } + }, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + throw new IllegalStateException(); + } + }) + .subscribeWith(to) + .assertEmpty(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotificationTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotificationTest.java new file mode 100644 index 0000000000..266acc93f2 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapNotificationTest.java @@ -0,0 +1,113 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.functions.Functions; +import io.reactivex.rxjava3.testsupport.*; + +public class SingleFlatMapNotificationTest extends RxJavaTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.just(1) + .flatMap(Functions.justFunction(Single.just(1)), + Functions.justFunction(Single.just(1)))); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>() { + @Override + public SingleSource apply(Single m) throws Exception { + return m + .flatMap(Functions.justFunction(Single.just(1)), + Functions.justFunction(Single.just(1))); + } + }); + } + + @Test + public void onSuccessNull() { + Single.just(1) + .flatMap(Functions.justFunction((Single)null), + Functions.justFunction(Single.just(1))) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void onErrorNull() { + TestObserverEx to = Single.error(new TestException()) + .flatMap(Functions.justFunction(Single.just(1)), + Functions.justFunction((Single)null)) + .to(TestHelper.testConsumer()) + .assertFailure(CompositeException.class); + + List ce = TestHelper.compositeList(to.errors().get(0)); + + TestHelper.assertError(ce, 0, TestException.class); + TestHelper.assertError(ce, 1, NullPointerException.class); + } + + @Test + public void onSuccessError() { + Single.just(1) + .flatMap(Functions.justFunction(Single.error(new TestException())), + Functions.justFunction((Single)null)) + .test() + .assertFailure(TestException.class); + } + + @Test + public void onSucccessSuccess() { + Single.just(1) + .flatMap(v -> Single.just(2), e -> Single.just(3)) + .test() + .assertResult(2); + } + + @Test + public void onErrorSuccess() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Single.error(new TestException()) + .flatMap(v -> Single.just(2), e -> Single.just(3)) + .test() + .assertResult(3); + + assertTrue("" + errors, errors.isEmpty()); + }); + } + + @Test + public void onErrorError() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Single.error(new TestException()) + .flatMap(v -> Single.just(2), e -> Single.error(new IOException())) + .test() + .assertFailure(IOException.class); + + assertTrue("" + errors, errors.isEmpty()); + }); + } +}