concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
+ Objects.requireNonNull(sources, "sources is null");
+ ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
+ ObjectHelper.verifyPositive(prefetch, "prefetch");
+ return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.BOUNDARY));
+ }
+
/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
*
+ *
+ *
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
* {@code Publisher}s and then drains them in order, each one after the previous one completes.
@@ -1677,7 +1748,10 @@ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends
}
/**
- * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
+ * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values and
+ * runs a limited number of inner sequences at once.
+ *
+ *
*
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
@@ -1713,7 +1787,10 @@ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends
}
/**
- * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
+ * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
+ * delaying errors until all the inner sequences terminate.
+ *
+ *
*
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
@@ -1730,18 +1807,22 @@ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
- * @since 2.0
+ * @since 3.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public static Flowable concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
- return concatEager(sources, bufferSize(), bufferSize());
+ public static Flowable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
+ return concatEagerDelayError(sources, bufferSize(), bufferSize());
}
/**
- * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
+ * Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
+ * delaying errors until all the inner sequences terminate and runs a limited number
+ * of inner sequences at once.
+ *
+ *
*
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
@@ -1762,18 +1843,89 @@ public static Flowable concatEager(@NonNull Iterable<@NonNull ? extends P
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
- * @since 2.0
+ * @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "rawtypes", "unchecked" })
- public static Flowable concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
+ public static Flowable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
- return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
+ return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
+ * delaying errors until all the inner and the outer sequences terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
+ * {@code Publisher}s and then drains them in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
+ return concatEagerDelayError(sources, bufferSize(), bufferSize());
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
+ * delaying errors until all the inner and outer sequences terminate and runs a limited number of inner
+ * sequences at once.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
+ * {@code Publisher}s and then drains them in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code Publisher}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code Publisher}s can be active at the same time
+ * @param prefetch the number of elements to prefetch from each inner {@code Publisher} source
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Flowable concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
+ Objects.requireNonNull(sources, "sources is null");
+ ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
+ ObjectHelper.verifyPositive(prefetch, "prefetch");
+ return RxJavaPlugins.onAssembly(new FlowableConcatMapEagerPublisher(sources, Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
}
/**
@@ -10134,6 +10286,7 @@ public final Flowable flatMap(@NonNull Function super T, ? extends Publ
* if {@code false}, the first one signaling an exception will terminate the whole sequence immediately
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code mapper} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
* @see ReactiveX operators documentation: FlatMap
* @since 2.0
*/
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index 66d5ad143d..784343922d 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -561,11 +561,11 @@ public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? ext
/**
* Concatenates a sequence of {@link MaybeSource}s eagerly into a {@link Flowable} sequence.
*
+ *
+ *
* Eager concatenation means that once an observer subscribes, this operator subscribes to all of the
* source {@code MaybeSource}s. The operator buffers the values emitted by these {@code MaybeSource}s and then drains them
* in order, each one after the previous one completes.
- *
- *
*
* - Backpressure:
* - Backpressure is honored towards the downstream.
@@ -583,7 +583,40 @@ public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? ext
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static Flowable concatEager(@NonNull Iterable<@NonNull ? extends MaybeSource extends T>> sources) {
- return Flowable.fromIterable(sources).concatMapEager((Function)MaybeToPublisher.instance());
+ return Flowable.fromIterable(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), false);
+ }
+
+ /**
+ * Concatenates a sequence of {@link MaybeSource}s eagerly into a {@link Flowable} sequence and
+ * runs a limited number of the inner sequences at once.
+ *
+ *
+ *
+ * Eager concatenation means that once an observer subscribes, this operator subscribes to all of the
+ * source {@code MaybeSource}s. The operator buffers the values emitted by these {@code MaybeSource}s and then drains them
+ * in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code MaybeSource} that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code MaybeSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code MaybeSource}s can be active at the same time
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatEager(@NonNull Iterable<@NonNull ? extends MaybeSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromIterable(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), false, maxConcurrency, 1);
}
/**
@@ -616,6 +649,173 @@ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends
return Flowable.fromPublisher(sources).concatMapEager((Function)MaybeToPublisher.instance());
}
+ /**
+ * Concatenates a {@link Publisher} sequence of {@link MaybeSource}s eagerly into a {@link Flowable} sequence,
+ * running at most the given number of inner {@code MaybeSource}s at once.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code MaybeSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code MaybeSource}s and then drains them in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code MaybeSource}s that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code MaybeSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code MaybeSource}s can be active at the same time
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends MaybeSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromPublisher(sources).concatMapEager((Function)MaybeToPublisher.instance(), maxConcurrency, 1);
+ }
+
+ /**
+ * Concatenates a sequence of {@link MaybeSource}s eagerly into a {@link Flowable} sequence,
+ * delaying errors until all inner {@code MaybeSource}s terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once an observer subscribes, this operator subscribes to all of the
+ * source {@code MaybeSource}s. The operator buffers the values emitted by these {@code MaybeSource}s and then drains them
+ * in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code MaybeSource} that need to be eagerly concatenated
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends MaybeSource extends T>> sources) {
+ return Flowable.fromIterable(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true);
+ }
+
+ /**
+ * Concatenates a sequence of {@link MaybeSource}s eagerly into a {@link Flowable} sequence,
+ * delaying errors until all inner {@code MaybeSource}s terminate and
+ * runs a limited number of inner {@code MaybeSource}s at once.
+ *
+ *
+ *
+ * Eager concatenation means that once an observer subscribes, this operator subscribes to all of the
+ * source {@code MaybeSource}s. The operator buffers the values emitted by these {@code MaybeSource}s and then drains them
+ * in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code MaybeSource} that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code MaybeSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code MaybeSource}s can be active at the same time
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends MaybeSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromIterable(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true, maxConcurrency, 1);
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@link MaybeSource}s eagerly into a {@link Flowable} sequence,
+ * delaying errors until all the inner and the outer sequence terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code MaybeSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code MaybeSource}s and then drains them in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code MaybeSource}s that need to be eagerly concatenated
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatEagerDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource extends T>> sources) {
+ return Flowable.fromPublisher(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true);
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@link MaybeSource}s eagerly into a {@link Flowable} sequence,
+ * delaying errors until all the inner and the outer sequence terminate and
+ * runs a limited number of the inner {@code MaybeSource}s at once.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code MaybeSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code MaybeSource}s and then drains them in order, each one after the previous one completes.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code MaybeSource}s that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code MaybeSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code MaybeSource}s can be active at the same time
+ * @return the new {@code Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Flowable concatEagerDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromPublisher(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true, maxConcurrency, 1);
+ }
+
/**
* Provides an API (via a cold {@code Maybe}) that bridges the reactive world with the callback-style world.
*
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index 9eeb169871..033d12bdff 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -1460,14 +1460,70 @@ public static Observable concatDelayError(@NonNull ObservableSource ext
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, Functions.identity(), bufferSize, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
}
+ /**
+ * Concatenates a sequence of {@link ObservableSource}s eagerly into a single stream of values.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s and then drains them
+ * in order, each one after the previous one completes.
+ *
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code ObservableSource}s that need to be eagerly concatenated
+ * @return the new {@code Observable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 2.0
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Observable concatEager(@NonNull Iterable<@NonNull ? extends ObservableSource extends T>> sources) {
+ return concatEager(sources, bufferSize(), bufferSize());
+ }
+
+ /**
+ * Concatenates a sequence of {@link ObservableSource}s eagerly into a single stream of values and
+ * runs a limited number of inner sequences at once.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s and then drains them
+ * in order, each one after the previous one completes.
+ *
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code ObservableSource}s that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code ObservableSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code ObservableSource}s can be active at the same time
+ * @param bufferSize the number of elements expected from each inner {@code ObservableSource} to be buffered
+ * @return the new {@code Observable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} or {@code bufferSize} is non-positive
+ * @since 2.0
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Observable concatEager(@NonNull Iterable<@NonNull ? extends ObservableSource extends T>> sources, int maxConcurrency, int bufferSize) {
+ return fromIterable(sources).concatMapEagerDelayError((Function)Functions.identity(), false, maxConcurrency, bufferSize);
+ }
+
/**
* Concatenates an {@link ObservableSource} sequence of {@code ObservableSource}s eagerly into a single stream of values.
*
+ *
+ *
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code ObservableSource}s as they are observed. The operator buffers the values emitted by these
* {@code ObservableSource}s and then drains them in order, each one after the previous one completes.
- *
- *
*
* - Scheduler:
* - This method does not operate by default on a particular {@link Scheduler}.
@@ -1486,13 +1542,15 @@ public static Observable concatEager(@NonNull ObservableSource extends
}
/**
- * Concatenates an {@link ObservableSource} sequence of {@code ObservableSource}s eagerly into a single stream of values.
+ * Concatenates an {@link ObservableSource} sequence of {@code ObservableSource}s eagerly into a single stream of values
+ * and runs a limited number of inner sequences at once.
+ *
+ *
+ *
*
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code ObservableSource}s as they are observed. The operator buffers the values emitted by these
* {@code ObservableSource}s and then drains them in order, each one after the previous one completes.
- *
- *
*
* - Scheduler:
* - This method does not operate by default on a particular {@link Scheduler}.
@@ -1516,13 +1574,14 @@ public static Observable concatEager(@NonNull ObservableSource extends
}
/**
- * Concatenates a sequence of {@link ObservableSource}s eagerly into a single stream of values.
+ * Concatenates a sequence of {@link ObservableSource}s eagerly into a single stream of values,
+ * delaying errors until all the inner sequences terminate.
+ *
+ *
*
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s and then drains them
* in order, each one after the previous one completes.
- *
- *
*
* - Scheduler:
* - This method does not operate by default on a particular {@link Scheduler}.
@@ -1531,23 +1590,25 @@ public static Observable concatEager(@NonNull ObservableSource extends
* @param sources a sequence of {@code ObservableSource}s that need to be eagerly concatenated
* @return the new {@code Observable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
- * @since 2.0
+ * @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public static Observable concatEager(@NonNull Iterable<@NonNull ? extends ObservableSource extends T>> sources) {
- return concatEager(sources, bufferSize(), bufferSize());
+ public static Observable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends ObservableSource extends T>> sources) {
+ return concatEagerDelayError(sources, bufferSize(), bufferSize());
}
/**
- * Concatenates a sequence of {@link ObservableSource}s eagerly into a single stream of values.
+ * Concatenates a sequence of {@link ObservableSource}s eagerly into a single stream of values,
+ * delaying errors until all the inner sequences terminate and runs a limited number of inner
+ * sequences at once.
+ *
+ *
*
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* {@code ObservableSource}s. The operator buffers the values emitted by these {@code ObservableSource}s and then drains them
* in order, each one after the previous one completes.
- *
- *
*
* - Scheduler:
* - This method does not operate by default on a particular {@link Scheduler}.
@@ -1560,14 +1621,71 @@ public static Observable concatEager(@NonNull Iterable<@NonNull ? extends
* @return the new {@code Observable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code bufferSize} is non-positive
- * @since 2.0
+ * @since 3.0.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public static Observable concatEager(@NonNull Iterable<@NonNull ? extends ObservableSource extends T>> sources, int maxConcurrency, int bufferSize) {
- return fromIterable(sources).concatMapEagerDelayError((Function)Functions.identity(), false, maxConcurrency, bufferSize);
+ public static Observable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends ObservableSource extends T>> sources, int maxConcurrency, int bufferSize) {
+ return fromIterable(sources).concatMapEagerDelayError((Function)Functions.identity(), true, maxConcurrency, bufferSize);
+ }
+
+ /**
+ * Concatenates an {@link ObservableSource} sequence of {@code ObservableSource}s eagerly into a single stream of values,
+ * delaying errors until all the inner and the outer sequence terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code ObservableSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code ObservableSource}s and then drains them in order, each one after the previous one completes.
+ *
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code ObservableSource}s that need to be eagerly concatenated
+ * @return the new {@code Observable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Observable concatEagerDelayError(@NonNull ObservableSource extends ObservableSource extends T>> sources) {
+ return concatEagerDelayError(sources, bufferSize(), bufferSize());
+ }
+
+ /**
+ * Concatenates an {@link ObservableSource} sequence of {@code ObservableSource}s eagerly into a single stream of values,
+ * delaying errors until all the inner and the outer sequence terminate and runs a limited number of inner sequences at once.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code ObservableSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code ObservableSource}s and then drains them in order, each one after the previous one completes.
+ *
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code ObservableSource}s that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code ObservableSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code ObservableSource}s can be active at the same time
+ * @param bufferSize the number of inner {@code ObservableSource} expected to be buffered
+ * @return the new {@code Observable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} or {@code bufferSize} is non-positive
+ * @since 3.0.0
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public static Observable concatEagerDelayError(@NonNull ObservableSource extends ObservableSource extends T>> sources, int maxConcurrency, int bufferSize) {
+ return wrap(sources).concatMapEagerDelayError((Function)Functions.identity(), true, maxConcurrency, bufferSize);
}
/**
@@ -2923,7 +3041,7 @@ public static Observable just(@NonNull T item1, @NonNull T item2, @NonNul
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException
- * if {@code maxConcurrent} or {@code bufferSize} is non-positive
+ * if {@code maxConcurrency} or {@code bufferSize} is non-positive
* @see ReactiveX operators documentation: Merge
* @see #mergeDelayError(Iterable, int, int)
*/
@@ -2971,7 +3089,7 @@ public static Observable merge(@NonNull Iterable<@NonNull ? extends Obser
* @return the new {@code Observable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException
- * if {@code maxConcurrent} or {@code bufferSize} is non-positive
+ * if {@code maxConcurrency} or {@code bufferSize} is non-positive
* @see ReactiveX operators documentation: Merge
* @see #mergeArrayDelayError(int, int, ObservableSource...)
*/
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index 667125b08b..09c67ef5a1 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -492,35 +492,6 @@ public static Flowable concatArrayEagerDelayError(@NonNull SingleSource
return Flowable.fromArray(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true);
}
- /**
- * Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values.
- *
- *
- *
- * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
- * emitted source {@code SingleSource}s as they are observed. The operator buffers the values emitted by these
- * {@code SingleSource}s and then drains them in order, each one after the previous one succeeds.
- *
- * - Backpressure:
- * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
- * expected to support backpressure. Violating this assumption, the operator will
- * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
- * - Scheduler:
- * - This method does not operate by default on a particular {@link Scheduler}.
- *
- * @param the value type
- * @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
- * @return the new {@link Flowable} instance with the specified concatenation behavior
- * @throws NullPointerException if {@code sources} is {@code null}
- */
- @BackpressureSupport(BackpressureKind.FULL)
- @CheckReturnValue
- @NonNull
- @SchedulerSupport(SchedulerSupport.NONE)
- public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends SingleSource extends T>> sources) {
- return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.toFlowable());
- }
-
/**
* Concatenates the {@link Iterable} sequence of {@link SingleSource}s into a single sequence by subscribing to each {@code SingleSource},
* one after the other, one at a time and delays any errors till the all inner {@code SingleSource}s terminate
@@ -631,7 +602,228 @@ public static Flowable concatDelayError(@NonNull Publisher<@NonNull ? ext
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Flowable concatEager(@NonNull Iterable<@NonNull ? extends SingleSource extends T>> sources) {
- return Flowable.fromIterable(sources).concatMapEager(SingleInternalHelper.toFlowable());
+ return Flowable.fromIterable(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), false);
+ }
+
+ /**
+ * Concatenates an {@link Iterable} sequence of {@link SingleSource}s eagerly into a single stream of values and
+ * runs a limited number of the inner sources at once.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * source {@code SingleSource}s. The operator buffers the values emitted by these {@code SingleSource}s and then drains them
+ * in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources an {@code Iterable} sequence of {@code SingleSource} that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code SingleSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code SingleSource}s can be active at the same time
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatEager(@NonNull Iterable<@NonNull ? extends SingleSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromIterable(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), false, maxConcurrency, 1);
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code SingleSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code SingleSource}s and then drains them in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends SingleSource extends T>> sources) {
+ return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.toFlowable());
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values and
+ * runs a limited number of those inner {@code SingleSource}s at once.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code SingleSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code SingleSource}s and then drains them in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code SingleSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code SingleSource}s can be active at the same time
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatEager(@NonNull Publisher<@NonNull ? extends SingleSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromPublisher(sources).concatMapEager(SingleInternalHelper.toFlowable(), maxConcurrency, 1);
+ }
+
+ /**
+ * Concatenates an {@link Iterable} sequence of {@link SingleSource}s eagerly into a single stream of values,
+ * delaying errors until all the inner sources terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * source {@code SingleSource}s. The operator buffers the values emitted by these {@code SingleSource}s and then drains them
+ * in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources an {@code Iterable} sequence of {@code SingleSource} that need to be eagerly concatenated
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends SingleSource extends T>> sources) {
+ return Flowable.fromIterable(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true);
+ }
+
+ /**
+ * Concatenates an {@link Iterable} sequence of {@link SingleSource}s eagerly into a single stream of values,
+ * delaying errors until all the inner sources terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * source {@code SingleSource}s. The operator buffers the values emitted by these {@code SingleSource}s and then drains them
+ * in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources an {@code Iterable} sequence of {@code SingleSource} that need to be eagerly concatenated
+ * @param maxConcurrency the maximum number of concurrently running inner {@code SingleSource}s; {@link Integer#MAX_VALUE}
+ * is interpreted as all inner {@code SingleSource}s can be active at the same time
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatEagerDelayError(@NonNull Iterable<@NonNull ? extends SingleSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromIterable(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true, maxConcurrency, 1);
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values,
+ * delaying errors until all the inner and the outer sequence terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code SingleSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code SingleSource}s and then drains them in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatEagerDelayError(@NonNull Publisher<@NonNull ? extends SingleSource extends T>> sources) {
+ return Flowable.fromPublisher(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true);
+ }
+
+ /**
+ * Concatenates a {@link Publisher} sequence of {@link SingleSource}s eagerly into a single stream of values,
+ * running at most the specified number of those inner {@code SingleSource}s at once and
+ * delaying errors until all the inner and the outer sequence terminate.
+ *
+ *
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source {@code SingleSource}s as they are observed. The operator buffers the values emitted by these
+ * {@code SingleSource}s and then drains them in order, each one after the previous one succeeds.
+ *
+ * - Backpressure:
+ * - Backpressure is honored towards the downstream and the outer {@code Publisher} is
+ * expected to support backpressure. Violating this assumption, the operator will
+ * signal {@link io.reactivex.rxjava3.exceptions.MissingBackpressureException}.
+ * - Scheduler:
+ * - This method does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the value type
+ * @param sources a sequence of {@code SingleSource}s that need to be eagerly concatenated
+ * @param maxConcurrency the number of inner {@code SingleSource}s to run at once
+ * @return the new {@link Flowable} instance with the specified concatenation behavior
+ * @throws NullPointerException if {@code sources} is {@code null}
+ * @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
+ * @since 3.0.0
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @CheckReturnValue
+ @NonNull
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Flowable concatEagerDelayError(@NonNull Publisher<@NonNull ? extends SingleSource extends T>> sources, int maxConcurrency) {
+ return Flowable.fromPublisher(sources).concatMapEagerDelayError(SingleInternalHelper.toFlowable(), true, maxConcurrency, 1);
}
/**
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEagerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEagerTest.java
index 0adfd655b0..b8c2eaaff3 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEagerTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEagerTest.java
@@ -744,7 +744,7 @@ public void Flowable() {
}
@Test
- public void ObservableCapacityHint() {
+ public void publisherCapacityHint() {
Flowable source = Flowable.just(1);
TestSubscriber ts = TestSubscriber.create();
@@ -1351,4 +1351,48 @@ public Flowable apply(Integer v) throws Throwable {
}
});
}
+
+ @Test
+ public void iterableDelayError() {
+ Flowable.concatEagerDelayError(Arrays.asList(
+ Flowable.range(1, 2),
+ Flowable.error(new TestException()),
+ Flowable.range(3, 3)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
+
+ @Test
+ public void iterableDelayErrorMaxConcurrency() {
+ Flowable.concatEagerDelayError(Arrays.asList(
+ Flowable.range(1, 2),
+ Flowable.error(new TestException()),
+ Flowable.range(3, 3)
+ ), 1, 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
+
+ @Test
+ public void publisherDelayError() {
+ Flowable.concatEagerDelayError(Flowable.fromArray(
+ Flowable.range(1, 2),
+ Flowable.error(new TestException()),
+ Flowable.range(3, 3)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
+
+ @Test
+ public void publisherDelayErrorMaxConcurrency() {
+ Flowable.concatEagerDelayError(Flowable.fromArray(
+ Flowable.range(1, 2),
+ Flowable.error(new TestException()),
+ Flowable.range(3, 3)
+ ), 1, 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatEagerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatEagerTest.java
new file mode 100644
index 0000000000..5a1b42bb15
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatEagerTest.java
@@ -0,0 +1,152 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.maybe;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
+
+public class MaybeConcatEagerTest {
+
+ @Test
+ public void iterableNormal() {
+ Maybe.concatEager(Arrays.asList(
+ Maybe.just(1),
+ Maybe.empty(),
+ Maybe.just(2)
+ ))
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void iterableNormalMaxConcurrency() {
+ Maybe.concatEager(Arrays.asList(
+ Maybe.just(1),
+ Maybe.empty(),
+ Maybe.just(2)
+ ), 1)
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void iterableError() {
+ Maybe.concatEager(Arrays.asList(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1);
+ }
+
+ @Test
+ public void iterableErrorMaxConcurrency() {
+ Maybe.concatEager(Arrays.asList(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class, 1);
+ }
+
+ @Test
+ public void publisherNormal() {
+ Maybe.concatEager(Flowable.fromArray(
+ Maybe.just(1),
+ Maybe.empty(),
+ Maybe.just(2)
+ ))
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void publisherNormalMaxConcurrency() {
+ Maybe.concatEager(Flowable.fromArray(
+ Maybe.just(1),
+ Maybe.empty(),
+ Maybe.just(2)
+ ), 1)
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void publisherError() {
+ Maybe.concatEager(Flowable.fromArray(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1);
+ }
+
+ @Test
+ public void iterableDelayError() {
+ Maybe.concatEagerDelayError(Arrays.asList(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void iterableDelayErrorMaxConcurrency() {
+ Maybe.concatEagerDelayError(Arrays.asList(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void publisherDelayError() {
+ Maybe.concatEagerDelayError(Flowable.fromArray(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void publisherDelayErrorMaxConcurrency() {
+ Maybe.concatEagerDelayError(Flowable.fromArray(
+ Maybe.just(1),
+ Maybe.error(new TestException()),
+ Maybe.empty(),
+ Maybe.just(2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapEagerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapEagerTest.java
index e13124bf5d..054ab91d8f 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapEagerTest.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapEagerTest.java
@@ -1034,4 +1034,48 @@ public Observable apply(Integer v) throws Throwable {
}
});
}
+
+ @Test
+ public void iterableDelayError() {
+ Observable.concatEagerDelayError(Arrays.asList(
+ Observable.range(1, 2),
+ Observable.error(new TestException()),
+ Observable.range(3, 3)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
+
+ @Test
+ public void iterableDelayErrorMaxConcurrency() {
+ Observable.concatEagerDelayError(Arrays.asList(
+ Observable.range(1, 2),
+ Observable.error(new TestException()),
+ Observable.range(3, 3)
+ ), 1, 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
+
+ @Test
+ public void observerDelayError() {
+ Observable.concatEagerDelayError(Observable.fromArray(
+ Observable.range(1, 2),
+ Observable.error(new TestException()),
+ Observable.range(3, 3)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
+
+ @Test
+ public void observerDelayErrorMaxConcurrency() {
+ Observable.concatEagerDelayError(Observable.fromArray(
+ Observable.range(1, 2),
+ Observable.error(new TestException()),
+ Observable.range(3, 3)
+ ), 1, 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2, 3, 4, 5);
+ }
}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatEagerTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatEagerTest.java
new file mode 100644
index 0000000000..c21c5347d4
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatEagerTest.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.operators.single;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.exceptions.TestException;
+
+public class SingleConcatEagerTest {
+
+ @Test
+ public void iterableNormal() {
+ Single.concatEager(Arrays.asList(
+ Single.just(1),
+ Single.just(2)
+ ))
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void iterableNormalMaxConcurrency() {
+ Single.concatEager(Arrays.asList(
+ Single.just(1),
+ Single.just(2)
+ ), 1)
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void iterableError() {
+ Single.concatEager(Arrays.asList(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1);
+ }
+
+ @Test
+ public void iterableErrorMaxConcurrency() {
+ Single.concatEager(Arrays.asList(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class, 1);
+ }
+
+ @Test
+ public void publisherNormal() {
+ Single.concatEager(Flowable.fromArray(
+ Single.just(1),
+ Single.just(2)
+ ))
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void publisherNormalMaxConcurrency() {
+ Single.concatEager(Flowable.fromArray(
+ Single.just(1),
+ Single.just(2)
+ ), 1)
+ .test()
+ .assertResult(1, 2);
+ }
+
+ @Test
+ public void publisherError() {
+ Single.concatEager(Flowable.fromArray(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1);
+ }
+
+ @Test
+ public void iterableDelayError() {
+ Single.concatEagerDelayError(Arrays.asList(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void iterableDelayErrorMaxConcurrency() {
+ Single.concatEagerDelayError(Arrays.asList(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void publisherDelayError() {
+ Single.concatEagerDelayError(Flowable.fromArray(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ))
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+
+ @Test
+ public void publisherDelayErrorMaxConcurrency() {
+ Single.concatEagerDelayError(Flowable.fromArray(
+ Single.just(1),
+ Single.error(new TestException()),
+ Single.just(2)
+ ), 1)
+ .test()
+ .assertFailure(TestException.class, 1, 2);
+ }
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/util/MarbleDimensions.java b/src/test/java/io/reactivex/rxjava3/internal/util/MarbleDimensions.java
index 5c0886ea9d..aa388a684a 100644
--- a/src/test/java/io/reactivex/rxjava3/internal/util/MarbleDimensions.java
+++ b/src/test/java/io/reactivex/rxjava3/internal/util/MarbleDimensions.java
@@ -98,7 +98,7 @@ public static void main(String[] args) throws Throwable {
}
}
- static final int SLEEP_PER_IMAGE_MILLIS = 100;
+ static final int SLEEP_PER_IMAGE_MILLIS = 25;
static final Class>[] CLASSES = {
Flowable.class, Observable.class, Maybe.class, Single.class, Completable.class, ParallelFlowable.class
diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationNaming.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationNaming.java
index 2ce716e342..0445bfbe71 100644
--- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationNaming.java
+++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationNaming.java
@@ -527,6 +527,7 @@ static final class ValidatorStrings {
new ValidatorStrings("itemDelayIndicator", "* @throws NullPointerException"),
new ValidatorStrings("future", "* @throws NullPointerException"),
+ new ValidatorStrings("maxConcurrency", "* @throws IllegalArgumentException"),
new ValidatorStrings("parallelism", "* @throws IllegalArgumentException"),
new ValidatorStrings("prefetch", "* @throws IllegalArgumentException"),
new ValidatorStrings("bufferSize", "* @throws IllegalArgumentException"),