diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index cb7fbd726f..56d5ef8d68 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -1136,7 +1136,7 @@ public static Flowable combineLatest( public static Flowable concat(Iterable> sources) { ObjectHelper.requireNonNull(sources, "sources is null"); // unlike general sources, fromIterable can only throw on a boundary because it is consumed only there - return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), 2, false); + return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, 2); } /** @@ -1510,7 +1510,7 @@ public static Flowable concatArrayEagerDelayError(Publisher. @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) public static Flowable concatArrayEagerDelayError(int maxConcurrency, int prefetch, Publisher... sources) { - return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true); + return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), true, maxConcurrency, prefetch); } /** @@ -1586,7 +1586,7 @@ public static Flowable concatDelayError(Publisher Flowable concatDelayError(Publisher> sources, int prefetch, boolean tillTheEnd) { - return fromPublisher(sources).concatMapDelayError((Function)Functions.identity(), prefetch, tillTheEnd); + return fromPublisher(sources).concatMapDelayError((Function)Functions.identity(), tillTheEnd, prefetch); } /** @@ -4547,29 +4547,29 @@ public static Flowable zip(Iterable> /** * Returns a Flowable that emits the results of a specified combiner function applied to combinations of - * n items emitted, in sequence, by the n Publishers emitted by a specified Publisher. + * items emitted, in sequence, by an Iterable of other Publishers. *

* {@code zip} applies this function in strict sequence, so the first item emitted by the new Publisher - * will be the result of the function applied to the first item emitted by each of the Publishers emitted - * by the source Publisher; the second item emitted by the new Publisher will be the result of the - * function applied to the second item emitted by each of those Publishers; and so forth. + * will be the result of the function applied to the first item emitted by each of the source Publishers; + * the second item emitted by the new Publisher will be the result of the function applied to the second + * item emitted by each of those Publishers; and so forth. *

* The resulting {@code Publisher} returned from {@code zip} will invoke {@code onNext} as many times as * the number of {@code onNext} invocations of the source Publisher that emits the fewest items. *

* The operator subscribes to its sources in the order they are specified and completes eagerly if - * one of the sources is shorter than the rest while cancel the other sources. Therefore, it + * one of the sources is shorter than the rest while canceling the other sources. Therefore, it * is possible those other sources will never be able to run to completion (and thus not calling * {@code doOnComplete()}). This can also happen if the sources are exactly the same length; if * source A completes and B has been consumed and is about to complete, the operator detects A won't * be sending further values and it will cancel B immediately. For example: - *

zip(just(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)
+ *
zip(Arrays.asList(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)
* {@code action1} will be called but {@code action2} won't. *
To work around this termination property, * use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion * or cancellation. *

- * + * *

*
Backpressure:
*
The operator expects backpressure from the sources and honors backpressure from the downstream. @@ -4579,25 +4579,32 @@ public static Flowable zip(Iterable> *
{@code zip} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the value type of the inner Publishers - * @param the zipped result type + * * @param sources - * a Publisher of source Publishers + * an Iterable of source Publishers * @param zipper - * a function that, when applied to an item emitted by each of the Publishers emitted by - * {@code ws}, results in an item that will be emitted by the resulting Publisher + * a function that, when applied to an item emitted by each of the source Publishers, results in + * an item that will be emitted by the resulting Publisher + * @param delayError + * delay errors signaled by any of the source Publisher until all Publishers terminate + * @param bufferSize + * the number of elements to prefetch from each source Publisher + * @param the common source value type + * @param the zipped result type * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings({ "rawtypes", "unchecked", "cast" }) @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) - public static Flowable zip(Publisher> sources, - final Function zipper) { + public static Flowable zip(Iterable> sources, + Function zipper, boolean delayError, + int bufferSize) { ObjectHelper.requireNonNull(zipper, "zipper is null"); - return fromPublisher(sources).toList().flatMapPublisher((Function)FlowableInternalHelper.zipIterable(zipper)); + ObjectHelper.requireNonNull(sources, "sources is null"); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + return RxJavaPlugins.onAssembly(new FlowableZip(null, sources, zipper, bufferSize, delayError)); } /** @@ -5401,68 +5408,6 @@ public static Flowable zipArray(Function(sources, null, zipper, bufferSize, delayError)); } - /** - * Returns a Flowable that emits the results of a specified combiner function applied to combinations of - * items emitted, in sequence, by an Iterable of other Publishers. - *

- * {@code zip} applies this function in strict sequence, so the first item emitted by the new Publisher - * will be the result of the function applied to the first item emitted by each of the source Publishers; - * the second item emitted by the new Publisher will be the result of the function applied to the second - * item emitted by each of those Publishers; and so forth. - *

- * The resulting {@code Publisher} returned from {@code zip} will invoke {@code onNext} as many times as - * the number of {@code onNext} invocations of the source Publisher that emits the fewest items. - *

- * The operator subscribes to its sources in the order they are specified and completes eagerly if - * one of the sources is shorter than the rest while canceling the other sources. Therefore, it - * is possible those other sources will never be able to run to completion (and thus not calling - * {@code doOnComplete()}). This can also happen if the sources are exactly the same length; if - * source A completes and B has been consumed and is about to complete, the operator detects A won't - * be sending further values and it will cancel B immediately. For example: - *

zip(Arrays.asList(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)
- * {@code action1} will be called but {@code action2} won't. - *
To work around this termination property, - * use {@link #doOnCancel(Action)} as well or use {@code using()} to do cleanup in case of completion - * or cancellation. - *

- * - *

- *
Backpressure:
- *
The operator expects backpressure from the sources and honors backpressure from the downstream. - * (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use - * one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.
- *
Scheduler:
- *
{@code zipIterable} does not operate by default on a particular {@link Scheduler}.
- *
- * - * - * @param sources - * an Iterable of source Publishers - * @param zipper - * a function that, when applied to an item emitted by each of the source Publishers, results in - * an item that will be emitted by the resulting Publisher - * @param delayError - * delay errors signaled by any of the source Publisher until all Publishers terminate - * @param bufferSize - * the number of elements to prefetch from each source Publisher - * @param the common source value type - * @param the zipped result type - * @return a Flowable that emits the zipped results - * @see ReactiveX operators documentation: Zip - */ - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.NONE) - public static Flowable zipIterable(Iterable> sources, - Function zipper, boolean delayError, - int bufferSize) { - ObjectHelper.requireNonNull(zipper, "zipper is null"); - ObjectHelper.requireNonNull(sources, "sources is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return RxJavaPlugins.onAssembly(new FlowableZip(null, sources, zipper, bufferSize, delayError)); - } - // *************************************************************************************************** // Instance operators // *************************************************************************************************** @@ -7181,7 +7126,7 @@ public final Flowable concatMap(FunctionReactiveX operators documentation: FlatMap * @since 3.0.0 * @see #concatMap(Function, int) - * @see #concatMapDelayError(Function, int, boolean, Scheduler) + * @see #concatMapDelayError(Function, boolean, int, Scheduler) */ @CheckReturnValue @NonNull @@ -7366,7 +7311,7 @@ public final Completable concatMapCompletableDelayError(Function * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure - * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, boolean, int, Scheduler)} overload. *
*
Backpressure:
*
The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are @@ -7381,13 +7326,13 @@ public final Completable concatMapCompletableDelayError(Function the result value type * @param mapper the function that maps the items of this Publisher into the inner Publishers. * @return the new Publisher instance with the concatenation behavior - * @see #concatMapDelayError(Function, int, boolean, Scheduler) + * @see #concatMapDelayError(Function, boolean, int, Scheduler) */ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable concatMapDelayError(Function> mapper) { - return concatMapDelayError(mapper, 2, true); + return concatMapDelayError(mapper, true, 2); } /** @@ -7398,7 +7343,7 @@ public final Flowable concatMapDelayError(Function * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure - * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, boolean, int, Scheduler)} overload. * *
*
Backpressure:
@@ -7413,20 +7358,20 @@ public final Flowable concatMapDelayError(Function the result value type * @param mapper the function that maps the items of this Publisher into the inner Publishers. - * @param prefetch - * the number of elements to prefetch from the current Flowable * @param tillTheEnd * if true, all errors from the outer and inner Publisher sources are delayed until the end, * if false, an error from the main source is signaled when the current Publisher source terminates + * @param prefetch + * the number of elements to prefetch from the current Flowable * @return the new Publisher instance with the concatenation behavior - * @see #concatMapDelayError(Function, int, boolean, Scheduler) + * @see #concatMapDelayError(Function, boolean, int, Scheduler) */ @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable concatMapDelayError(Function> mapper, - int prefetch, boolean tillTheEnd) { + boolean tillTheEnd, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); if (this instanceof ScalarSupplier) { @@ -7446,7 +7391,7 @@ public final Flowable concatMapDelayError(Function - * The difference between {@link #concatMapDelayError(Function, int, boolean)} and this operator is that this operator guarantees the {@code mapper} + * The difference between {@link #concatMapDelayError(Function, boolean, int)} and this operator is that this operator guarantees the {@code mapper} * function is executed on the specified scheduler. * *
@@ -7462,15 +7407,15 @@ public final Flowable concatMapDelayError(Function the result value type * @param mapper the function that maps the items of this Publisher into the inner Publishers. - * @param prefetch - * the number of elements to prefetch from the current Flowable * @param tillTheEnd * if true, all errors from the outer and inner Publisher sources are delayed until the end, * if false, an error from the main source is signaled when the current Publisher source terminates + * @param prefetch + * the number of elements to prefetch from the current Flowable * @param scheduler * the scheduler where the {@code mapper} function will be executed * @return the new Publisher instance with the concatenation behavior - * @see #concatMapDelayError(Function, int, boolean) + * @see #concatMapDelayError(Function, boolean, int) * @since 3.0.0 */ @CheckReturnValue @@ -7478,7 +7423,7 @@ public final Flowable concatMapDelayError(Function Flowable concatMapDelayError(Function> mapper, - int prefetch, boolean tillTheEnd, Scheduler scheduler) { + boolean tillTheEnd, int prefetch, Scheduler scheduler) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); @@ -7574,7 +7519,7 @@ public final Flowable concatMapEager(Function Flowable concatMapEagerDelayError(Function> mapper, boolean tillTheEnd) { - return concatMapEagerDelayError(mapper, bufferSize(), bufferSize(), tillTheEnd); + return concatMapEagerDelayError(mapper, tillTheEnd, bufferSize(), bufferSize()); } /** @@ -7594,13 +7539,13 @@ public final Flowable concatMapEagerDelayError(Function the value type * @param mapper the function that maps a sequence of values into a sequence of Publishers that will be * eagerly concatenated - * @param maxConcurrency the maximum number of concurrent subscribed Publishers - * @param prefetch - * the number of elements to prefetch from each source Publisher * @param tillTheEnd * if true, exceptions from the current Flowable and all the inner Publishers are delayed until * all of them terminate, if false, exception from the current Flowable is delayed until the * currently running Publisher terminates + * @param maxConcurrency the maximum number of concurrent subscribed Publishers + * @param prefetch + * the number of elements to prefetch from each source Publisher * @return the new Publisher instance with the specified concatenation behavior * @since 2.0 */ @@ -7609,7 +7554,7 @@ public final Flowable concatMapEagerDelayError(Function Flowable concatMapEagerDelayError(Function> mapper, - int maxConcurrency, int prefetch, boolean tillTheEnd) { + boolean tillTheEnd, int maxConcurrency, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); ObjectHelper.verifyPositive(prefetch, "prefetch"); diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index dc003e1f0a..6cca5647d6 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -1014,7 +1014,7 @@ public static Observable combineLatestDelayError(Iterable Observable concat(Iterable> sources) { ObjectHelper.requireNonNull(sources, "sources is null"); - return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), bufferSize(), false); + return fromIterable(sources).concatMapDelayError((Function)Functions.identity(), false, bufferSize()); } /** @@ -1271,7 +1271,7 @@ public static Observable concatArrayEager(ObservableSource.. @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static Observable concatArrayEager(int maxConcurrency, int prefetch, ObservableSource... sources) { - return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, false); + return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), false, maxConcurrency, prefetch); } /** @@ -1323,7 +1323,7 @@ public static Observable concatArrayEagerDelayError(ObservableSource Observable concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource... sources) { - return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, true); + return fromArray(sources).concatMapEagerDelayError((Function)Functions.identity(), true, maxConcurrency, prefetch); } /** @@ -1492,7 +1492,7 @@ public static Observable concatEager(Iterable Observable concatEager(Iterable> sources, int maxConcurrency, int prefetch) { - return fromIterable(sources).concatMapEagerDelayError((Function)Functions.identity(), maxConcurrency, prefetch, false); + return fromIterable(sources).concatMapEagerDelayError((Function)Functions.identity(), false, maxConcurrency, prefetch); } /** @@ -4102,12 +4102,12 @@ public static Observable zip(Iterablen items emitted, in sequence, by the n ObservableSources emitted by a specified ObservableSource. + * items emitted, in sequence, by an Iterable of other ObservableSources. *

* {@code zip} applies this function in strict sequence, so the first item emitted by the new ObservableSource - * will be the result of the function applied to the first item emitted by each of the ObservableSources emitted - * by the source ObservableSource; the second item emitted by the new ObservableSource will be the result of the - * function applied to the second item emitted by each of those ObservableSources; and so forth. + * will be the result of the function applied to the first item emitted by each of the source ObservableSources; + * the second item emitted by the new ObservableSource will be the result of the function applied to the second + * item emitted by each of those ObservableSources; and so forth. *

* The resulting {@code ObservableSource} returned from {@code zip} will invoke {@code onNext} as many times as * the number of {@code onNext} invocations of the source ObservableSource that emits the fewest items. @@ -4118,7 +4118,7 @@ public static Observable zip(Iterablezip(just(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a) + *

zip(Arrays.asList(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)
* {@code action1} will be called but {@code action2} won't. *
To work around this termination property, * use {@link #doOnDispose(Action)} as well or use {@code using()} to do cleanup in case of completion @@ -4129,30 +4129,36 @@ public static Observable zip(Iterable} passed to the method would trigger a {@code ClassCastException}. * *

- * + * *

*
Scheduler:
*
{@code zip} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the value type of the inner ObservableSources - * @param the zipped result type + * * @param sources - * an ObservableSource of source ObservableSources + * an Iterable of source ObservableSources * @param zipper - * a function that, when applied to an item emitted by each of the ObservableSources emitted by - * {@code ws}, results in an item that will be emitted by the resulting ObservableSource + * a function that, when applied to an item emitted by each of the source ObservableSources, results in + * an item that will be emitted by the resulting ObservableSource + * @param delayError + * delay errors signalled by any of the source ObservableSource until all ObservableSources terminate + * @param bufferSize + * the number of elements to prefetch from each source ObservableSource + * @param the common source value type + * @param the zipped result type * @return an Observable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings({ "rawtypes", "unchecked" }) @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - public static Observable zip(ObservableSource> sources, final Function zipper) { + public static Observable zip(Iterable> sources, + Function zipper, boolean delayError, + int bufferSize) { ObjectHelper.requireNonNull(zipper, "zipper is null"); ObjectHelper.requireNonNull(sources, "sources is null"); - return RxJavaPlugins.onAssembly(new ObservableToList(sources, 16) - .flatMap(ObservableInternalHelper.zipIterable(zipper))); + ObjectHelper.verifyPositive(bufferSize, "bufferSize"); + return RxJavaPlugins.onAssembly(new ObservableZip(null, sources, zipper, bufferSize, delayError)); } /** @@ -4894,67 +4900,6 @@ public static Observable zipArray(Function(sources, null, zipper, bufferSize, delayError)); } - /** - * Returns an Observable that emits the results of a specified combiner function applied to combinations of - * items emitted, in sequence, by an Iterable of other ObservableSources. - *

- * {@code zip} applies this function in strict sequence, so the first item emitted by the new ObservableSource - * will be the result of the function applied to the first item emitted by each of the source ObservableSources; - * the second item emitted by the new ObservableSource will be the result of the function applied to the second - * item emitted by each of those ObservableSources; and so forth. - *

- * The resulting {@code ObservableSource} returned from {@code zip} will invoke {@code onNext} as many times as - * the number of {@code onNext} invocations of the source ObservableSource that emits the fewest items. - *

- * The operator subscribes to its sources in order they are specified and completes eagerly if - * one of the sources is shorter than the rest while disposing the other sources. Therefore, it - * is possible those other sources will never be able to run to completion (and thus not calling - * {@code doOnComplete()}). This can also happen if the sources are exactly the same length; if - * source A completes and B has been consumed and is about to complete, the operator detects A won't - * be sending further values and it will dispose B immediately. For example: - *

zip(Arrays.asList(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)
- * {@code action1} will be called but {@code action2} won't. - *
To work around this termination property, - * use {@link #doOnDispose(Action)} as well or use {@code using()} to do cleanup in case of completion - * or a dispose() call. - *

- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the - * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a - * {@code Function} passed to the method would trigger a {@code ClassCastException}. - * - *

- * - *

- *
Scheduler:
- *
{@code zipIterable} does not operate by default on a particular {@link Scheduler}.
- *
- * - * - * @param sources - * an Iterable of source ObservableSources - * @param zipper - * a function that, when applied to an item emitted by each of the source ObservableSources, results in - * an item that will be emitted by the resulting ObservableSource - * @param delayError - * delay errors signalled by any of the source ObservableSource until all ObservableSources terminate - * @param bufferSize - * the number of elements to prefetch from each source ObservableSource - * @param the common source value type - * @param the zipped result type - * @return an Observable that emits the zipped results - * @see ReactiveX operators documentation: Zip - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public static Observable zipIterable(Iterable> sources, - Function zipper, boolean delayError, - int bufferSize) { - ObjectHelper.requireNonNull(zipper, "zipper is null"); - ObjectHelper.requireNonNull(sources, "sources is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return RxJavaPlugins.onAssembly(new ObservableZip(null, sources, zipper, bufferSize, delayError)); - } - // *************************************************************************************************** // Instance operators // *************************************************************************************************** @@ -6407,7 +6352,7 @@ public final Observable concatMap(Function * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure - * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, boolean, int, Scheduler)} overload. *
*
Scheduler:
*
{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -6416,12 +6361,12 @@ public final Observable concatMap(Function the result value type * @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources. * @return the new ObservableSource instance with the concatenation behavior - * @see #concatMapDelayError(Function, int, boolean, Scheduler) + * @see #concatMapDelayError(Function, boolean, int, Scheduler) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable concatMapDelayError(Function> mapper) { - return concatMapDelayError(mapper, bufferSize(), true); + return concatMapDelayError(mapper, true, bufferSize()); } /** @@ -6434,7 +6379,7 @@ public final Observable concatMapDelayError(Function * Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread, * on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure - * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload. + * the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, boolean, int, Scheduler)} overload. *
*
Scheduler:
*
{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -6442,18 +6387,18 @@ public final Observable concatMapDelayError(Function the result value type * @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources. - * @param prefetch - * the number of elements to prefetch from the current Observable * @param tillTheEnd * if true, all errors from the outer and inner ObservableSource sources are delayed until the end, * if false, an error from the main source is signalled when the current ObservableSource source terminates + * @param prefetch + * the number of elements to prefetch from the current Observable * @return the new ObservableSource instance with the concatenation behavior - * @see #concatMapDelayError(Function, int, boolean, Scheduler) + * @see #concatMapDelayError(Function, boolean, int, Scheduler) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable concatMapDelayError(Function> mapper, - int prefetch, boolean tillTheEnd) { + boolean tillTheEnd, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); if (this instanceof ScalarSupplier) { @@ -6481,21 +6426,21 @@ public final Observable concatMapDelayError(Function the result value type * @param mapper the function that maps the items of this ObservableSource into the inner ObservableSources. - * @param prefetch - * the number of elements to prefetch from the current Observable * @param tillTheEnd * if true, all errors from the outer and inner ObservableSource sources are delayed until the end, * if false, an error from the main source is signalled when the current ObservableSource source terminates + * @param prefetch + * the number of elements to prefetch from the current Observable * @param scheduler * the scheduler where the {@code mapper} function will be executed * @return the new ObservableSource instance with the concatenation behavior - * @see #concatMapDelayError(Function, int, boolean) + * @see #concatMapDelayError(Function, boolean, int) * @since 3.0.0 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable concatMapDelayError(Function> mapper, - int prefetch, boolean tillTheEnd, Scheduler scheduler) { + boolean tillTheEnd, int prefetch, Scheduler scheduler) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); @@ -6584,7 +6529,7 @@ public final Observable concatMapEager(Function Observable concatMapEagerDelayError(Function> mapper, boolean tillTheEnd) { - return concatMapEagerDelayError(mapper, Integer.MAX_VALUE, bufferSize(), tillTheEnd); + return concatMapEagerDelayError(mapper, tillTheEnd, Integer.MAX_VALUE, bufferSize()); } /** @@ -6603,20 +6548,20 @@ public final Observable concatMapEagerDelayError(Function the value type * @param mapper the function that maps a sequence of values into a sequence of ObservableSources that will be * eagerly concatenated - * @param maxConcurrency the maximum number of concurrent subscribed ObservableSources - * @param prefetch - * the number of elements to prefetch from each source ObservableSource * @param tillTheEnd * if true, exceptions from the current Observable and all the inner ObservableSources are delayed until * all of them terminate, if false, exception from the current Observable is delayed until the * currently running ObservableSource terminates + * @param maxConcurrency the maximum number of concurrent subscribed ObservableSources + * @param prefetch + * the number of elements to prefetch from each source ObservableSource * @return the new ObservableSource instance with the specified concatenation behavior * @since 2.0 */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable concatMapEagerDelayError(Function> mapper, - int maxConcurrency, int prefetch, boolean tillTheEnd) { + boolean tillTheEnd, int maxConcurrency, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); ObjectHelper.verifyPositive(prefetch, "prefetch"); diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInternalHelper.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInternalHelper.java index 52cabd90bc..243d58b9b8 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInternalHelper.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInternalHelper.java @@ -12,7 +12,6 @@ */ package io.reactivex.rxjava3.internal.operators.flowable; -import java.util.List; import java.util.concurrent.TimeUnit; import org.reactivestreams.*; @@ -217,24 +216,6 @@ public void accept(Subscription t) throws Exception { } } - static final class ZipIterableFunction - implements Function>, Publisher> { - private final Function zipper; - - ZipIterableFunction(Function zipper) { - this.zipper = zipper; - } - - @Override - public Publisher apply(List> list) { - return Flowable.zipIterable(list, zipper, false, Flowable.bufferSize()); - } - } - - public static Function>, Publisher> zipIterable(final Function zipper) { - return new ZipIterableFunction(zipper); - } - static final class ReplaySupplier implements Supplier> { final Flowable parent; diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableInternalHelper.java index 3b589e3dd1..1665b99014 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableInternalHelper.java @@ -12,7 +12,6 @@ */ package io.reactivex.rxjava3.internal.operators.observable; -import java.util.List; import java.util.concurrent.TimeUnit; import io.reactivex.rxjava3.core.*; @@ -214,24 +213,6 @@ public static Supplier> replaySupplier(final Observ return new TimedReplayCallable(parent, time, unit, scheduler, eagerTruncate); } - static final class ZipIterableFunction - implements Function>, ObservableSource> { - private final Function zipper; - - ZipIterableFunction(Function zipper) { - this.zipper = zipper; - } - - @Override - public ObservableSource apply(List> list) { - return Observable.zipIterable(list, zipper, false, Observable.bufferSize()); - } - } - - public static Function>, ObservableSource> zipIterable(final Function zipper) { - return new ZipIterableFunction(zipper); - } - static final class ReplaySupplier implements Supplier> { private final Observable parent; diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java index dffc68f230..ed520c494c 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java @@ -651,34 +651,9 @@ public Object apply(Object[] a) { }).blockingLast(); } - @Test(expected = NullPointerException.class) - public void zipPublisherNull() { - Flowable.zip((Publisher>)null, new Function() { - @Override - public Object apply(Object[] a) { - return 1; - } - }); - } - - @Test(expected = NullPointerException.class) - public void zipPublisherFunctionNull() { - Flowable.zip((Flowable.just(just1)), null); - } - - @Test(expected = NullPointerException.class) - public void zipPublisherFunctionReturnsNull() { - Flowable.zip((Flowable.just(just1)), new Function() { - @Override - public Object apply(Object[] a) { - return null; - } - }).blockingLast(); - } - @Test(expected = NullPointerException.class) public void zipIterable2Null() { - Flowable.zipIterable((Iterable>)null, new Function() { + Flowable.zip((Iterable>)null, new Function() { @Override public Object apply(Object[] a) { return 1; @@ -688,7 +663,7 @@ public Object apply(Object[] a) { @Test(expected = NullPointerException.class) public void zipIterable2IteratorNull() { - Flowable.zipIterable(new Iterable>() { + Flowable.zip(new Iterable>() { @Override public Iterator> iterator() { return null; @@ -704,13 +679,13 @@ public Object apply(Object[] a) { @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void zipIterable2FunctionNull() { - Flowable.zipIterable(Arrays.asList(just1, just1), null, true, 128); + Flowable.zip(Arrays.asList(just1, just1), null, true, 128); } @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void zipIterable2FunctionReturnsNull() { - Flowable.zipIterable(Arrays.asList(just1, just1), new Function() { + Flowable.zip(Arrays.asList(just1, just1), new Function() { @Override public Object apply(Object[] a) { return null; @@ -2721,31 +2696,6 @@ public void combineLatestDelayErrorIterableFunctionNull() { Flowable.combineLatestDelayError(Arrays.asList(just1), null, 128); } - @Test(expected = NullPointerException.class) - public void zipFlowableNull() { - Flowable.zip((Flowable>)null, new Function() { - @Override - public Object apply(Object[] a) { - return 1; - } - }); - } - - @Test(expected = NullPointerException.class) - public void zipFlowableFunctionNull() { - Flowable.zip((Flowable.just(just1)), null); - } - - @Test(expected = NullPointerException.class) - public void zipFlowableFunctionReturnsNull() { - Flowable.zip((Flowable.just(just1)), new Function() { - @Override - public Object apply(Object[] a) { - return null; - } - }).blockingLast(); - } - @Test(expected = NullPointerException.class) public void concatFlowableNull() { Flowable.concat((Flowable>)null); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java index 1493d7aec7..fc8d665559 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java @@ -85,7 +85,7 @@ public Publisher apply(String v) throws Exception { return Flowable.just(v); } - }, 2, true, ImmediateThinScheduler.INSTANCE) + }, true, 2, ImmediateThinScheduler.INSTANCE) .observeOn(Schedulers.computation()) .distinct() .test() @@ -130,7 +130,7 @@ public Publisher apply(Integer v) throws Exception { return Flowable.just(v); } - }, 2, true, ImmediateThinScheduler.INSTANCE) + }, true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -171,7 +171,7 @@ public void delayErrorCallableTillTheEnd() { } }); } - }, 2, true, ImmediateThinScheduler.INSTANCE) + }, true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(CompositeException.class, 1, 2, 3, 23, 32); } @@ -190,7 +190,7 @@ public void delayErrorCallableEager() { } }); } - }, 2, false, ImmediateThinScheduler.INSTANCE) + }, false, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(NullPointerException.class, 1, 2, 3); } @@ -239,7 +239,7 @@ public void mapperDelayErrorScheduled() { public Flowable apply(Integer t) throws Throwable { return Flowable.just(Thread.currentThread().getName()); } - }, 2, false, Schedulers.single()) + }, false, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -257,7 +257,7 @@ public void mapperDelayErrorScheduledHidden() { public Flowable apply(Integer t) throws Throwable { return Flowable.just(Thread.currentThread().getName()).hide(); } - }, 2, false, Schedulers.single()) + }, false, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -275,7 +275,7 @@ public void mapperDelayError2Scheduled() { public Flowable apply(Integer t) throws Throwable { return Flowable.just(Thread.currentThread().getName()); } - }, 2, true, Schedulers.single()) + }, true, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -293,7 +293,7 @@ public void mapperDelayError2ScheduledHidden() { public Flowable apply(Integer t) throws Throwable { return Flowable.just(Thread.currentThread().getName()).hide(); } - }, 2, true, Schedulers.single()) + }, true, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -435,7 +435,7 @@ public void concatMapJustRange() { public void concatMapDelayErrorJustJust() { TestSubscriber ts = TestSubscriber.create(); - Flowable.just(Flowable.just(1)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + Flowable.just(Flowable.just(1)).concatMapDelayError((Function)Functions.identity(), true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertValue(1); ts.assertNoErrors(); @@ -447,7 +447,7 @@ public void concatMapDelayErrorJustJust() { public void concatMapDelayErrorJustRange() { TestSubscriber ts = TestSubscriber.create(); - Flowable.just(Flowable.range(1, 5)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + Flowable.just(Flowable.range(1, 5)).concatMapDelayError((Function)Functions.identity(), true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertValues(1, 2, 3, 4, 5); ts.assertNoErrors(); @@ -479,7 +479,7 @@ public void startWithArray() throws Exception { @Test public void concatMapDelayError() { Flowable.just(Flowable.just(1), Flowable.just(2)) - .concatMapDelayError(Functions.>identity(), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.>identity(), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertResult(1, 2); } @@ -492,7 +492,7 @@ public void concatMapDelayErrorJustSource() { public Flowable apply(Object v) throws Exception { return Flowable.just(1); } - }, 16, true, ImmediateThinScheduler.INSTANCE) + }, true, 16, ImmediateThinScheduler.INSTANCE) .test() .assertResult(1); @@ -519,7 +519,7 @@ public void concatMapJustSourceDelayError() { public Flowable apply(Object v) throws Exception { return Flowable.just(1); } - }, 16, false, ImmediateThinScheduler.INSTANCE) + }, false, 16, ImmediateThinScheduler.INSTANCE) .test() .assertResult(1); } @@ -535,7 +535,7 @@ public void concatMapScalarBackpressured() { @Test public void concatMapScalarBackpressuredDelayError() { Flowable.just(1).hide() - .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Flowable.just(2)), true, 2, ImmediateThinScheduler.INSTANCE) .test(1L) .assertResult(2); } @@ -551,7 +551,7 @@ public void concatMapEmpty() { @Test public void concatMapEmptyDelayError() { Flowable.just(1).hide() - .concatMapDelayError(Functions.justFunction(Flowable.empty()), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Flowable.empty()), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertResult(); } @@ -583,7 +583,7 @@ public Publisher apply(Flowable f) throws Exception { TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { @Override public Publisher apply(Flowable f) throws Exception { - return f.concatMapDelayError(Functions.justFunction(Flowable.just(2)), 2, true, ImmediateThinScheduler.INSTANCE); + return f.concatMapDelayError(Functions.justFunction(Flowable.just(2)), true, 2, ImmediateThinScheduler.INSTANCE); } }); } @@ -647,7 +647,7 @@ public void concatMapInnerError() { @Test public void concatMapInnerErrorDelayError() { Flowable.just(1).hide() - .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -699,7 +699,7 @@ protected void subscribeActual(Subscriber s) { s.onSubscribe(new BooleanSubscription()); s.onError(new TestException("First")); } - }), 2, true, ImmediateThinScheduler.INSTANCE) + }), true, 2, ImmediateThinScheduler.INSTANCE) .to(TestHelper.testConsumer()); ts.assertFailureAndMessage(TestException.class, "First"); @@ -719,7 +719,7 @@ public void badSourceDelayError() { TestHelper.checkBadSourceFlowable(new Function, Object>() { @Override public Object apply(Flowable f) throws Exception { - return f.concatMapDelayError(Functions.justFunction(Flowable.just(1).hide()), 2, true, ImmediateThinScheduler.INSTANCE); + return f.concatMapDelayError(Functions.justFunction(Flowable.just(1).hide()), true, 2, ImmediateThinScheduler.INSTANCE); } }, true, 1, 1, 1); } @@ -743,7 +743,7 @@ public void fusedCrashDelayError() { @Override public Object apply(Integer v) throws Exception { throw new TestException(); } }) - .concatMapDelayError(Functions.justFunction(Flowable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Flowable.just(1)), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -769,7 +769,7 @@ public void callableCrashDelayError() { public Object call() throws Exception { throw new TestException(); } - })), 2, true, ImmediateThinScheduler.INSTANCE) + })), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -780,13 +780,13 @@ public void dispose() { .concatMap(Functions.justFunction(Flowable.just(1)), 2, ImmediateThinScheduler.INSTANCE)); TestHelper.checkDisposed(Flowable.range(1, 2) - .concatMapDelayError(Functions.justFunction(Flowable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE)); + .concatMapDelayError(Functions.justFunction(Flowable.just(1)), true, 2, ImmediateThinScheduler.INSTANCE)); } @Test public void notVeryEnd() { Flowable.range(1, 2) - .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 16, false, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), false, 16, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -794,7 +794,7 @@ public void notVeryEnd() { @Test public void error() { Flowable.error(new TestException()) - .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 16, false, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Flowable.just(2)), false, 16, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -823,7 +823,7 @@ public void mainErrors() { public Flowable apply(Integer v) { return Flowable.range(v, 2); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); source.onNext(1); source.onNext(2); @@ -846,7 +846,7 @@ public void innerErrors() { public Flowable apply(Integer v) { return inner; } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertValues(1, 2, 1, 2, 1, 2); ts.assertError(CompositeException.class); @@ -866,7 +866,7 @@ public void singleInnerErrors() { public Flowable apply(Integer v) { return inner; } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertValues(1, 2); ts.assertError(TestException.class); @@ -884,7 +884,7 @@ public void innerNull() { public Flowable apply(Integer v) { return null; } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertNoValues(); ts.assertError(NullPointerException.class); @@ -902,7 +902,7 @@ public void innerThrows() { public Flowable apply(Integer v) { throw new TestException(); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertNoValues(); ts.assertError(TestException.class); @@ -919,7 +919,7 @@ public void innerWithEmpty() { public Flowable apply(Integer v) { return v == 2 ? Flowable.empty() : Flowable.range(1, 2); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertValues(1, 2, 1, 2); ts.assertNoErrors(); @@ -936,7 +936,7 @@ public void innerWithScalar() { public Flowable apply(Integer v) { return v == 2 ? Flowable.just(3) : Flowable.range(1, 2); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertValues(1, 2, 3, 1, 2); ts.assertNoErrors(); @@ -952,7 +952,7 @@ public void backpressure() { public Flowable apply(Integer v) { return Flowable.range(v, 2); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(ts); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(ts); ts.assertNoValues(); ts.assertNoErrors(); @@ -1010,7 +1010,7 @@ public Flowable apply(Integer t) throws Throwable { .repeat(1000) .observeOn(Schedulers.io()); } - }, 2, false, Schedulers.single()) + }, false, 2, Schedulers.single()) .distinct() .test() .awaitDone(5, TimeUnit.SECONDS) @@ -1033,7 +1033,7 @@ public Flowable apply(Integer t) throws Throwable { .repeat(1000) .observeOn(Schedulers.io()); } - }, 2, true, Schedulers.single()) + }, true, 2, Schedulers.single()) .distinct() .test() .awaitDone(5, TimeUnit.SECONDS) @@ -1069,7 +1069,7 @@ public Flowable apply(Flowable upstream) { public Publisher apply(Integer v) throws Throwable { return Flowable.just(v).hide(); } - }, 2, false, ImmediateThinScheduler.INSTANCE); + }, false, 2, ImmediateThinScheduler.INSTANCE); } }); } @@ -1084,7 +1084,7 @@ public Flowable apply(Flowable upstream) { public Publisher apply(Integer v) throws Throwable { return Flowable.just(v).hide(); } - }, 2, true, ImmediateThinScheduler.INSTANCE); + }, true, 2, ImmediateThinScheduler.INSTANCE); } }); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapTest.java index 0d70a2430e..74394b122e 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapTest.java @@ -203,7 +203,7 @@ public void delayErrorCallableEager() { } }); } - }, 2, false) + }, false, 2) .test() .assertFailure(NullPointerException.class, 1, 2, 3); } @@ -233,7 +233,7 @@ public Flowable apply(Flowable upstream) { public Publisher apply(Integer v) throws Throwable { return Flowable.just(v).hide(); } - }, 2, false); + }, false, 2); } }); } @@ -248,7 +248,7 @@ public Flowable apply(Flowable upstream) { public Publisher apply(Integer v) throws Throwable { return Flowable.just(v).hide(); } - }, 2, true); + }, true, 2); } }); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java index 368f5cd043..eddc091c77 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java @@ -1183,7 +1183,7 @@ public void concatMapDelayErrorEmptySource() { public Flowable apply(Object v) throws Exception { return Flowable.just(1); } - }, 16, true)); + }, true, 16)); } @Test @@ -1194,7 +1194,7 @@ public void concatMapDelayErrorJustSource() { public Flowable apply(Object v) throws Exception { return Flowable.just(1); } - }, 16, true) + }, true, 16) .test() .assertResult(1); @@ -1244,7 +1244,7 @@ public void concatMapJustSourceDelayError() { public Flowable apply(Object v) throws Exception { return Flowable.just(1); } - }, 16, false) + }, false, 16) .test() .assertResult(1); } @@ -1511,7 +1511,7 @@ public void dispose() { @Test public void notVeryEnd() { Flowable.range(1, 2) - .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), 16, false) + .concatMapDelayError(Functions.justFunction(Flowable.error(new TestException())), false, 16) .test() .assertFailure(TestException.class); } @@ -1519,7 +1519,7 @@ public void notVeryEnd() { @Test public void error() { Flowable.error(new TestException()) - .concatMapDelayError(Functions.justFunction(Flowable.just(2)), 16, false) + .concatMapDelayError(Functions.justFunction(Flowable.just(2)), false, 16) .test() .assertFailure(TestException.class); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java index 1b7bb0d8b8..ace3f55493 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapSchedulerTest.java @@ -86,7 +86,7 @@ public ObservableSource apply(String v) throws Exception { return Observable.just(v); } - }, 2, true, ImmediateThinScheduler.INSTANCE) + }, true, 2, ImmediateThinScheduler.INSTANCE) .observeOn(Schedulers.computation()) .distinct() .test() @@ -131,7 +131,7 @@ public ObservableSource apply(Integer v) throws Exception { return Observable.just(v); } - }, 2, true, ImmediateThinScheduler.INSTANCE) + }, true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -172,7 +172,7 @@ public void delayErrorCallableTillTheEnd() { } }); } - }, 2, true, ImmediateThinScheduler.INSTANCE) + }, true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(CompositeException.class, 1, 2, 3, 23, 32); } @@ -191,7 +191,7 @@ public void delayErrorCallableEager() { } }); } - }, 2, false, ImmediateThinScheduler.INSTANCE) + }, false, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(NullPointerException.class, 1, 2, 3); } @@ -240,7 +240,7 @@ public void mapperDelayErrorScheduled() { public Observable apply(Integer t) throws Throwable { return Observable.just(Thread.currentThread().getName()); } - }, 2, false, Schedulers.single()) + }, false, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -258,7 +258,7 @@ public void mapperDelayErrorScheduledHidden() { public Observable apply(Integer t) throws Throwable { return Observable.just(Thread.currentThread().getName()).hide(); } - }, 2, false, Schedulers.single()) + }, false, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -276,7 +276,7 @@ public void mapperDelayError2Scheduled() { public Observable apply(Integer t) throws Throwable { return Observable.just(Thread.currentThread().getName()); } - }, 2, true, Schedulers.single()) + }, true, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -294,7 +294,7 @@ public void mapperDelayError2ScheduledHidden() { public Observable apply(Integer t) throws Throwable { return Observable.just(Thread.currentThread().getName()).hide(); } - }, 2, true, Schedulers.single()) + }, true, 2, Schedulers.single()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertValueCount(1) @@ -436,7 +436,7 @@ public void concatMapJustRange() { public void concatMapDelayErrorJustJust() { TestObserver to = TestObserver.create(); - Observable.just(Observable.just(1)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + Observable.just(Observable.just(1)).concatMapDelayError((Function)Functions.identity(), true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertValue(1); to.assertNoErrors(); @@ -448,7 +448,7 @@ public void concatMapDelayErrorJustJust() { public void concatMapDelayErrorJustRange() { TestObserver to = TestObserver.create(); - Observable.just(Observable.range(1, 5)).concatMapDelayError((Function)Functions.identity(), 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + Observable.just(Observable.range(1, 5)).concatMapDelayError((Function)Functions.identity(), true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertValues(1, 2, 3, 4, 5); to.assertNoErrors(); @@ -504,7 +504,7 @@ public Iterator iterator() { @Test public void concatMapDelayError() { Observable.just(Observable.just(1), Observable.just(2)) - .concatMapDelayError(Functions.>identity(), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.>identity(), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertResult(1, 2); } @@ -517,7 +517,7 @@ public void concatMapDelayErrorJustSource() { public Observable apply(Object v) throws Exception { return Observable.just(1); } - }, 16, true, ImmediateThinScheduler.INSTANCE) + }, true, 16, ImmediateThinScheduler.INSTANCE) .test() .assertResult(1); @@ -544,7 +544,7 @@ public void concatMapJustSourceDelayError() { public Observable apply(Object v) throws Exception { return Observable.just(1); } - }, 16, false, ImmediateThinScheduler.INSTANCE) + }, false, 16, ImmediateThinScheduler.INSTANCE) .test() .assertResult(1); } @@ -560,7 +560,7 @@ public void concatMapEmpty() { @Test public void concatMapEmptyDelayError() { Observable.just(1).hide() - .concatMapDelayError(Functions.justFunction(Observable.empty()), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Observable.empty()), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertResult(); } @@ -576,7 +576,7 @@ public ObservableSource apply(Observable f) throws Exception { TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { @Override public ObservableSource apply(Observable f) throws Exception { - return f.concatMapDelayError(Functions.justFunction(Observable.just(2)), 2, true, ImmediateThinScheduler.INSTANCE); + return f.concatMapDelayError(Functions.justFunction(Observable.just(2)), true, 2, ImmediateThinScheduler.INSTANCE); } }); } @@ -640,7 +640,7 @@ public void concatMapInnerError() { @Test public void concatMapInnerErrorDelayError() { Observable.just(1).hide() - .concatMapDelayError(Functions.justFunction(Observable.error(new TestException())), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Observable.error(new TestException())), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -692,7 +692,7 @@ protected void subscribeActual(Observer o) { o.onSubscribe(Disposables.empty()); o.onError(new TestException("First")); } - }), 2, true, ImmediateThinScheduler.INSTANCE) + }), true, 2, ImmediateThinScheduler.INSTANCE) .to(TestHelper.testConsumer()); to.assertFailureAndMessage(TestException.class, "First"); @@ -712,7 +712,7 @@ public void badSourceDelayError() { TestHelper.checkBadSourceObservable(new Function, Object>() { @Override public Object apply(Observable f) throws Exception { - return f.concatMapDelayError(Functions.justFunction(Observable.just(1).hide()), 2, true, ImmediateThinScheduler.INSTANCE); + return f.concatMapDelayError(Functions.justFunction(Observable.just(1).hide()), true, 2, ImmediateThinScheduler.INSTANCE); } }, true, 1, 1, 1); } @@ -736,7 +736,7 @@ public void fusedCrashDelayError() { @Override public Object apply(Integer v) throws Exception { throw new TestException(); } }) - .concatMapDelayError(Functions.justFunction(Observable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Observable.just(1)), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -762,7 +762,7 @@ public void callableCrashDelayError() { public Object call() throws Exception { throw new TestException(); } - })), 2, true, ImmediateThinScheduler.INSTANCE) + })), true, 2, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -773,13 +773,13 @@ public void dispose() { .concatMap(Functions.justFunction(Observable.just(1)), 2, ImmediateThinScheduler.INSTANCE)); TestHelper.checkDisposed(Observable.range(1, 2) - .concatMapDelayError(Functions.justFunction(Observable.just(1)), 2, true, ImmediateThinScheduler.INSTANCE)); + .concatMapDelayError(Functions.justFunction(Observable.just(1)), true, 2, ImmediateThinScheduler.INSTANCE)); } @Test public void notVeryEnd() { Observable.range(1, 2) - .concatMapDelayError(Functions.justFunction(Observable.error(new TestException())), 16, false, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Observable.error(new TestException())), false, 16, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -787,7 +787,7 @@ public void notVeryEnd() { @Test public void error() { Observable.error(new TestException()) - .concatMapDelayError(Functions.justFunction(Observable.just(2)), 16, false, ImmediateThinScheduler.INSTANCE) + .concatMapDelayError(Functions.justFunction(Observable.just(2)), false, 16, ImmediateThinScheduler.INSTANCE) .test() .assertFailure(TestException.class); } @@ -816,7 +816,7 @@ public void mainErrors() { public Observable apply(Integer v) { return Observable.range(v, 2); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); source.onNext(1); source.onNext(2); @@ -839,7 +839,7 @@ public void innerErrors() { public Observable apply(Integer v) { return inner; } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertValues(1, 2, 1, 2, 1, 2); to.assertError(CompositeException.class); @@ -859,7 +859,7 @@ public void singleInnerErrors() { public Observable apply(Integer v) { return inner; } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertValues(1, 2); to.assertError(TestException.class); @@ -877,7 +877,7 @@ public void innerNull() { public Observable apply(Integer v) { return null; } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertNoValues(); to.assertError(NullPointerException.class); @@ -895,7 +895,7 @@ public void innerThrows() { public Observable apply(Integer v) { throw new TestException(); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertNoValues(); to.assertError(TestException.class); @@ -912,7 +912,7 @@ public void innerWithEmpty() { public Observable apply(Integer v) { return v == 2 ? Observable.empty() : Observable.range(1, 2); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertValues(1, 2, 1, 2); to.assertNoErrors(); @@ -929,7 +929,7 @@ public void innerWithScalar() { public Observable apply(Integer v) { return v == 2 ? Observable.just(3) : Observable.range(1, 2); } - }, 2, true, ImmediateThinScheduler.INSTANCE).subscribe(to); + }, true, 2, ImmediateThinScheduler.INSTANCE).subscribe(to); to.assertValues(1, 2, 3, 1, 2); to.assertNoErrors(); @@ -971,7 +971,7 @@ public Observable apply(Integer t) throws Throwable { .repeat(1000) .observeOn(Schedulers.io()); } - }, 2, false, Schedulers.single()) + }, false, 2, Schedulers.single()) .distinct() .test() .awaitDone(5, TimeUnit.SECONDS) @@ -994,7 +994,7 @@ public Observable apply(Integer t) throws Throwable { .repeat(1000) .observeOn(Schedulers.io()); } - }, 2, true, Schedulers.single()) + }, true, 2, Schedulers.single()) .distinct() .test() .awaitDone(5, TimeUnit.SECONDS) @@ -1030,7 +1030,7 @@ public Observable apply(Observable upstream) { public Observable apply(Integer v) throws Throwable { return Observable.just(v).hide(); } - }, 2, false, ImmediateThinScheduler.INSTANCE); + }, false, 2, ImmediateThinScheduler.INSTANCE); } }); } @@ -1045,7 +1045,7 @@ public Observable apply(Observable upstream) { public Observable apply(Integer v) throws Throwable { return Observable.just(v).hide(); } - }, 2, true, ImmediateThinScheduler.INSTANCE); + }, true, 2, ImmediateThinScheduler.INSTANCE); } }); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapTest.java index 6508069822..61b1f1579b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapTest.java @@ -226,7 +226,7 @@ public void normalDelayErrorsTillTheEnd() { public ObservableSource apply(Integer v) throws Exception { return Observable.range(v, 2); } - }, 16, true) + }, true, 16) .test() .assertResult(1, 2); } @@ -548,7 +548,7 @@ public Observable apply(Observable upstream) { public Observable apply(Integer v) throws Throwable { return Observable.just(v).hide(); } - }, 2, false); + }, false, 2); } }); } @@ -563,7 +563,7 @@ public Observable apply(Observable upstream) { public Observable apply(Integer v) throws Throwable { return Observable.just(v).hide(); } - }, 2, true); + }, true, 2); } }); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java index 6f436a56cc..0e8c64b92d 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java @@ -908,7 +908,7 @@ public void concatMapDelayErrorEmptySource() { public ObservableSource apply(Object v) throws Exception { return Observable.just(1); } - }, 16, true)); + }, true, 16)); } @Test @@ -919,7 +919,7 @@ public void concatMapDelayErrorJustSource() { public ObservableSource apply(Object v) throws Exception { return Observable.just(1); } - }, 16, true) + }, true, 16) .test() .assertResult(1); diff --git a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java index 76952c0904..211053b0c7 100644 --- a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java @@ -700,34 +700,9 @@ public Object apply(Object[] a) { }).blockingLast(); } - @Test(expected = NullPointerException.class) - public void zipObservableNull() { - Observable.zip((Observable>)null, new Function() { - @Override - public Object apply(Object[] a) { - return 1; - } - }); - } - - @Test(expected = NullPointerException.class) - public void zipObservableFunctionNull() { - Observable.zip((Observable.just(just1)), null); - } - - @Test(expected = NullPointerException.class) - public void zipObservableFunctionReturnsNull() { - Observable.zip((Observable.just(just1)), new Function() { - @Override - public Object apply(Object[] a) { - return null; - } - }).blockingLast(); - } - @Test(expected = NullPointerException.class) public void zipIterable2Null() { - Observable.zipIterable((Iterable>)null, new Function() { + Observable.zip((Iterable>)null, new Function() { @Override public Object apply(Object[] a) { return 1; @@ -737,7 +712,7 @@ public Object apply(Object[] a) { @Test(expected = NullPointerException.class) public void zipIterable2IteratorNull() { - Observable.zipIterable(new Iterable>() { + Observable.zip(new Iterable>() { @Override public Iterator> iterator() { return null; @@ -753,13 +728,13 @@ public Object apply(Object[] a) { @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void zipIterable2FunctionNull() { - Observable.zipIterable(Arrays.asList(just1, just1), null, true, 128); + Observable.zip(Arrays.asList(just1, just1), null, true, 128); } @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void zipIterable2FunctionReturnsNull() { - Observable.zipIterable(Arrays.asList(just1, just1), new Function() { + Observable.zip(Arrays.asList(just1, just1), new Function() { @Override public Object apply(Object[] a) { return null;