diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index 253610ef80..87d77f46f2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -2572,7 +2572,7 @@ public final Completable retryWhen(@NonNull Function /** * Returns a {@code Completable} which first runs the other {@link CompletableSource} - * then this {@code Completable} if the other completed normally. + * then the current {@code Completable} if the other completed normally. *

* *

@@ -2591,9 +2591,61 @@ public final Completable startWith(@NonNull CompletableSource other) { return concatArray(other, this); } + /** + * Returns a {@link Flowable} which first runs the other {@link SingleSource} + * then the current {@code Completable} if the other succeeded normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the {@code other} {@code SingleSource}. + * @param other the other {@code SingleSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull SingleSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Single.wrap(other).toFlowable(), toFlowable()); + } + + /** + * Returns a {@link Flowable} which first runs the other {@link MaybeSource} + * then the current {@code Completable} if the other succeeded or completed normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the {@code other} {@code MaybeSource}. + * @param other the other {@code MaybeSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull MaybeSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Maybe.wrap(other).toFlowable(), toFlowable()); + } + /** * Returns an {@link Observable} which first delivers the events - * of the other {@link ObservableSource} then runs this {@code Completable}. + * of the other {@link ObservableSource} then runs the current {@code Completable}. *

* *

@@ -2612,9 +2664,10 @@ public final Observable startWith(@NonNull ObservableSource other) { Objects.requireNonNull(other, "other is null"); return Observable.wrap(other).concatWith(this.toObservable()); } + /** * Returns a {@link Flowable} which first delivers the events - * of the other {@link Publisher} then runs this {@code Completable}. + * of the other {@link Publisher} then runs the current {@code Completable}. *

* *

diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index a994e526c5..453c9e4bba 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -15238,6 +15238,81 @@ public final Flowable startWithIterable(@NonNull Iterable<@NonNull ? extends return concatArray(fromIterable(items), this); } + /** + * Returns a {@code Flowable} which first runs the other {@link CompletableSource} + * then the current {@code Flowable} if the other completed normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code CompletableSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull CompletableSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Completable.wrap(other).toFlowable(), this); + } + + /** + * Returns a {@code Flowable} which first runs the other {@link SingleSource} + * then the current {@code Flowable} if the other succeeded normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code SingleSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull SingleSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Single.wrap(other).toFlowable(), this); + } + + /** + * Returns a {@code Flowable} which first runs the other {@link MaybeSource} + * then the current {@code Flowable} if the other succeeded or completed normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code MaybeSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull MaybeSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Maybe.wrap(other).toFlowable(), this); + } + /** * Returns a {@code Flowable} that emits the items in a specified {@link Publisher} before it begins to emit * items emitted by the current {@code Flowable}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 5e676681ff..96ff14c1e9 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -20,6 +20,7 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; @@ -4859,6 +4860,129 @@ public final Maybe retryWhen( return toFlowable().retryWhen(handler).singleElement(); } + /** + * Returns a {@link Flowable} which first runs the other {@link CompletableSource} + * then the current {@code Maybe} if the other completed normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code CompletableSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull CompletableSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Completable.wrap(other).toFlowable(), toFlowable()); + } + + /** + * Returns a {@link Flowable} which first runs the other {@link SingleSource} + * then the current {@code Maybe} if the other succeeded normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code SingleSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull SingleSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Single.wrap(other).toFlowable(), toFlowable()); + } + + /** + * Returns a {@link Flowable} which first runs the other {@link MaybeSource} + * then the current {@code Maybe} if the other succeeded or completed normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code MaybeSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull MaybeSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Maybe.wrap(other).toFlowable(), toFlowable()); + } + + /** + * Returns an {@link Observable} which first delivers the events + * of the other {@link ObservableSource} then runs the current {@code Maybe}. + *

+ * + *

+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code ObservableSource} to run first + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable startWith(@NonNull ObservableSource other) { + Objects.requireNonNull(other, "other is null"); + return Observable.wrap(other).concatWith(this.toObservable()); + } + + /** + * Returns a {@link Flowable} which first delivers the events + * of the other {@link Publisher} then runs the current {@code Maybe}. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code Publisher} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable startWith(@NonNull Publisher other) { + Objects.requireNonNull(other, "other is null"); + return toFlowable().startWith(other); + } + /** * Subscribes to a {@code Maybe} and ignores {@code onSuccess} and {@code onComplete} emissions. *

diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 3ab0be1ab9..6db7ce3cc1 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -12704,6 +12704,72 @@ public final Observable startWithIterable(@NonNull Iterable<@NonNull ? extend return concatArray(fromIterable(items), this); } + /** + * Returns an {@code Observable} which first runs the other {@link CompletableSource} + * then the current {@code Observable} if the other completed normally. + *

+ * + *

+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code CompletableSource} to run first + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable startWith(@NonNull CompletableSource other) { + Objects.requireNonNull(other, "other is null"); + return Observable.concat(Completable.wrap(other).toObservable(), this); + } + + /** + * Returns an {@code Observable} which first runs the other {@link SingleSource} + * then the current {@code Observable} if the other succeeded normally. + *

+ * + *

+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code SingleSource} to run first + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable startWith(@NonNull SingleSource other) { + Objects.requireNonNull(other, "other is null"); + return Observable.concat(Single.wrap(other).toObservable(), this); + } + + /** + * Returns an {@code Observable} which first runs the other {@link MaybeSource} + * then the current {@code Observable} if the other succeeded or completed normally. + *

+ * + *

+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code MaybeSource} to run first + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable startWith(@NonNull MaybeSource other) { + Objects.requireNonNull(other, "other is null"); + return Observable.concat(Maybe.wrap(other).toObservable(), this); + } + /** * Returns an {@code Observable} that emits the items in a specified {@link ObservableSource} before it begins to emit * items emitted by the current {@code Observable}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 5e9b084ad2..5911000d28 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -20,6 +20,7 @@ import org.reactivestreams.*; import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; @@ -4286,6 +4287,129 @@ public final Single retryWhen(@NonNull Function, return toSingle(toFlowable().retryWhen(handler)); } + /** + * Returns a {@link Flowable} which first runs the other {@link CompletableSource} + * then the current {@code Single} if the other completed normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code CompletableSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull CompletableSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Completable.wrap(other).toFlowable(), toFlowable()); + } + + /** + * Returns a {@link Flowable} which first runs the other {@link SingleSource} + * then the current {@code Single} if the other succeeded normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code SingleSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull SingleSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Single.wrap(other).toFlowable(), toFlowable()); + } + + /** + * Returns a {@link Flowable} which first runs the other {@link MaybeSource} + * then the current {@code Single} if the other succeeded or completed normally. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code MaybeSource} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + public final Flowable startWith(@NonNull MaybeSource other) { + Objects.requireNonNull(other, "other is null"); + return Flowable.concat(Maybe.wrap(other).toFlowable(), toFlowable()); + } + + /** + * Returns an {@link Observable} which first delivers the events + * of the other {@link ObservableSource} then runs the current {@code Single}. + *

+ * + *

+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code ObservableSource} to run first + * @return the new {@code Observable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable startWith(@NonNull ObservableSource other) { + Objects.requireNonNull(other, "other is null"); + return Observable.wrap(other).concatWith(this.toObservable()); + } + + /** + * Returns a {@link Flowable} which first delivers the events + * of the other {@link Publisher} then runs the current {@code Single}. + *

+ * + *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
+ *
Scheduler:
+ *
{@code startWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param other the other {@code Publisher} to run first + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code other} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable startWith(@NonNull Publisher other) { + Objects.requireNonNull(other, "other is null"); + return toFlowable().startWith(other); + } + /** * Subscribes to a {@code Single} but ignore its emission or notification. *

diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableStartWithTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableStartWithTest.java new file mode 100644 index 0000000000..065a600a0e --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableStartWithTest.java @@ -0,0 +1,67 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.completable; + +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class CompletableStartWithTest { + + @Test + public void singleNormal() { + Completable.complete().startWith(Single.just(1)) + .test() + .assertResult(1); + } + + @Test + public void singleError() { + Runnable run = mock(Runnable.class); + + Completable.fromRunnable(run).startWith(Single.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void maybeNormal() { + Completable.complete().startWith(Maybe.just(1)) + .test() + .assertResult(1); + } + + @Test + public void maybeEmptyNormal() { + Completable.complete().startWith(Maybe.empty()) + .test() + .assertResult(); + } + + @Test + public void maybeError() { + Runnable run = mock(Runnable.class); + + Completable.fromRunnable(run).startWith(Maybe.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableStartWithTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableStartWithTest.java new file mode 100644 index 0000000000..68f4ee6272 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableStartWithTest.java @@ -0,0 +1,153 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.flowable; + +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class FlowableStartWithTest { + + @Test + public void justCompletableComplete() { + Flowable.just(1).startWith(Completable.complete()) + .test() + .assertResult(1); + } + + @Test + public void emptyCompletableComplete() { + Flowable.empty().startWith(Completable.complete()) + .test() + .assertResult(); + } + + @Test + public void runCompletableError() { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run).startWith(Completable.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justSingleJust() { + Flowable.just(1).startWith(Single.just(2)) + .test() + .assertResult(2, 1); + } + + @Test + public void emptySingleJust() { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run) + .startWith(Single.just(2)) + .test() + .assertResult(2); + + verify(run).run(); + } + + @Test + public void runSingleError() { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run).startWith(Single.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justMaybeJust() { + Flowable.just(1).startWith(Maybe.just(2)) + .test() + .assertResult(2, 1); + } + + @Test + public void emptyMaybeJust() { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run) + .startWith(Maybe.just(2)) + .test() + .assertResult(2); + + verify(run).run(); + } + + @Test + public void runMaybeError() { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run).startWith(Maybe.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justFlowableJust() { + Flowable.just(1).startWith(Flowable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5, 1); + } + + @Test + public void emptyFlowableJust() { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run) + .startWith(Flowable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5); + + verify(run).run(); + } + + @Test + public void emptyFlowableEmpty() { + Runnable run = mock(Runnable.class); + Runnable run2 = mock(Runnable.class); + + Flowable.fromRunnable(run) + .startWith(Flowable.fromRunnable(run2)) + .test() + .assertResult(); + + verify(run).run(); + verify(run2).run(); + } + + @Test + public void runFlowableError() { + Runnable run = mock(Runnable.class); + + Flowable.fromRunnable(run).startWith(Flowable.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeStartWithTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeStartWithTest.java new file mode 100644 index 0000000000..87969974d4 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeStartWithTest.java @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class MaybeStartWithTest { + + @Test + public void justCompletableComplete() { + Maybe.just(1).startWith(Completable.complete()) + .test() + .assertResult(1); + } + + @Test + public void emptyCompletableComplete() { + Maybe.empty().startWith(Completable.complete()) + .test() + .assertResult(); + } + + @Test + public void runCompletableError() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run).startWith(Completable.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justSingleJust() { + Maybe.just(1).startWith(Single.just(2)) + .test() + .assertResult(2, 1); + } + + @Test + public void emptySingleJust() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run) + .startWith(Single.just(2)) + .test() + .assertResult(2); + + verify(run).run(); + } + + @Test + public void runSingleError() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run).startWith(Single.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justMaybeJust() { + Maybe.just(1).startWith(Maybe.just(2)) + .test() + .assertResult(2, 1); + } + + @Test + public void emptyMaybeJust() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run) + .startWith(Maybe.just(2)) + .test() + .assertResult(2); + + verify(run).run(); + } + + @Test + public void runMaybeError() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run).startWith(Maybe.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justObservableJust() { + Maybe.just(1).startWith(Observable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5, 1); + } + + @Test + public void emptyObservableJust() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run) + .startWith(Observable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5); + + verify(run).run(); + } + + @Test + public void emptyObservableEmpty() { + Runnable run = mock(Runnable.class); + Runnable run2 = mock(Runnable.class); + + Maybe.fromRunnable(run) + .startWith(Observable.fromRunnable(run2)) + .test() + .assertResult(); + + verify(run).run(); + verify(run2).run(); + } + + @Test + public void runObservableError() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run).startWith(Observable.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justFlowableJust() { + Maybe.just(1).startWith(Flowable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5, 1); + } + + @Test + public void emptyFlowableJust() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run) + .startWith(Flowable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5); + + verify(run).run(); + } + + @Test + public void emptyFlowableEmpty() { + Runnable run = mock(Runnable.class); + Runnable run2 = mock(Runnable.class); + + Maybe.fromRunnable(run) + .startWith(Flowable.fromRunnable(run2)) + .test() + .assertResult(); + + verify(run).run(); + verify(run2).run(); + } + + @Test + public void runFlowableError() { + Runnable run = mock(Runnable.class); + + Maybe.fromRunnable(run).startWith(Flowable.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableStartWithTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableStartWithTest.java new file mode 100644 index 0000000000..47cf4dc962 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableStartWithTest.java @@ -0,0 +1,153 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.observable; + +import static org.mockito.Mockito.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class ObservableStartWithTest { + + @Test + public void justCompletableComplete() { + Observable.just(1).startWith(Completable.complete()) + .test() + .assertResult(1); + } + + @Test + public void emptyCompletableComplete() { + Observable.empty().startWith(Completable.complete()) + .test() + .assertResult(); + } + + @Test + public void runCompletableError() { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run).startWith(Completable.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justSingleJust() { + Observable.just(1).startWith(Single.just(2)) + .test() + .assertResult(2, 1); + } + + @Test + public void emptySingleJust() { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run) + .startWith(Single.just(2)) + .test() + .assertResult(2); + + verify(run).run(); + } + + @Test + public void runSingleError() { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run).startWith(Single.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justMaybeJust() { + Observable.just(1).startWith(Maybe.just(2)) + .test() + .assertResult(2, 1); + } + + @Test + public void emptyMaybeJust() { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run) + .startWith(Maybe.just(2)) + .test() + .assertResult(2); + + verify(run).run(); + } + + @Test + public void runMaybeError() { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run).startWith(Maybe.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } + + @Test + public void justObservableJust() { + Observable.just(1).startWith(Observable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5, 1); + } + + @Test + public void emptyObservableJust() { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run) + .startWith(Observable.just(2, 3, 4, 5)) + .test() + .assertResult(2, 3, 4, 5); + + verify(run).run(); + } + + @Test + public void emptyObservableEmpty() { + Runnable run = mock(Runnable.class); + Runnable run2 = mock(Runnable.class); + + Observable.fromRunnable(run) + .startWith(Observable.fromRunnable(run2)) + .test() + .assertResult(); + + verify(run).run(); + verify(run2).run(); + } + + @Test + public void runObservableError() { + Runnable run = mock(Runnable.class); + + Observable.fromRunnable(run).startWith(Observable.error(new TestException())) + .test() + .assertFailure(TestException.class); + + verify(run, never()).run(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleStartWithTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleStartWithTest.java new file mode 100644 index 0000000000..a3f7e7225a --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleStartWithTest.java @@ -0,0 +1,126 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class SingleStartWithTest { + + @Test + public void justCompletableComplete() { + Single.just(1) + .startWith(Completable.complete()) + .test() + .assertResult(1); + } + + @Test + public void justCompletableError() { + Single.just(1) + .startWith(Completable.error(new TestException())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void justSingleJust() { + Single.just(1) + .startWith(Single.just(0)) + .test() + .assertResult(0, 1); + } + + @Test + public void justSingleError() { + Single.just(1) + .startWith(Single.error(new TestException())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void justMaybeJust() { + Single.just(1) + .startWith(Maybe.just(0)) + .test() + .assertResult(0, 1); + } + + @Test + public void justMaybeEmpty() { + Single.just(1) + .startWith(Maybe.empty()) + .test() + .assertResult(1); + } + + @Test + public void justMaybeError() { + Single.just(1) + .startWith(Maybe.error(new TestException())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void justObservableJust() { + Single.just(1) + .startWith(Observable.just(-1, 0)) + .test() + .assertResult(-1, 0, 1); + } + + @Test + public void justObservableEmpty() { + Single.just(1) + .startWith(Observable.empty()) + .test() + .assertResult(1); + } + + @Test + public void justObservableError() { + Single.just(1) + .startWith(Observable.error(new TestException())) + .test() + .assertFailure(TestException.class); + } + + @Test + public void justFlowableJust() { + Single.just(1) + .startWith(Flowable.just(-1, 0)) + .test() + .assertResult(-1, 0, 1); + } + + @Test + public void justFlowableEmpty() { + Single.just(1) + .startWith(Observable.empty()) + .test() + .assertResult(1); + } + + @Test + public void justFlowableError() { + Single.just(1) + .startWith(Flowable.error(new TestException())) + .test() + .assertFailure(TestException.class); + } +}