Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add concatEagerDelayError across #6899

Merged
merged 2 commits into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 129 additions & 129 deletions docs/Operator-Matrix.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ public static Completable mergeDelayError(@NonNull Publisher<@NonNull ? extends
* at a time to the inner {@code CompletableSource}s
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
171 changes: 162 additions & 9 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1648,9 +1648,80 @@ public static <T> Flowable<T> concatDelayError(@NonNull Publisher<@NonNull ? ext
return fromPublisher(sources).concatMapDelayError((Function)Functions.identity(), tillTheEnd, prefetch);
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
* <p>
* <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.i.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and the inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEager(sources, bufferSize(), bufferSize());
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values and
* runs a limited number of inner sequences at once.
* <p>
* <img width="640" height="375" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.in.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
* in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrently running inner {@code Publisher}s; {@link Integer#MAX_VALUE}
* is interpreted as all inner {@code Publisher}s can be active at the same time
* @param prefetch the number of elements to prefetch from each inner {@code Publisher} source
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
* @since 2.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.BOUNDARY));
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
* <p>
* <img width="640" height="490" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.p.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
* {@code Publisher}s and then drains them in order, each one after the previous one completes.
Expand All @@ -1677,7 +1748,10 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values.
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values and
* runs a limited number of inner sequences at once.
* <p>
* <img width="640" height="421" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEager.pn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
Expand Down Expand Up @@ -1713,7 +1787,10 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner sequences terminate.
* <p>
* <img width="640" height="428" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.i.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
Expand All @@ -1730,18 +1807,22 @@ public static <T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
* @since 3.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEager(sources, bufferSize(), bufferSize());
public static <T> Flowable<T> concatEagerDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEagerDelayError(sources, bufferSize(), bufferSize());
}

/**
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values.
* Concatenates a sequence of {@link Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner sequences terminate and runs a limited number
* of inner sequences at once.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.in.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* source {@code Publisher}s. The operator buffers the values emitted by these {@code Publisher}s and then drains them
Expand All @@ -1762,18 +1843,89 @@ public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends P
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
* @since 2.0
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
public static <T> Flowable<T> concatEagerDelayError(@NonNull Iterable<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.IMMEDIATE));
return RxJavaPlugins.onAssembly(new FlowableConcatMapEager(new FlowableFromIterable(sources), Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner and the outer sequences terminate.
* <p>
* <img width="640" height="496" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.p.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
* {@code Publisher}s and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @since 3.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources) {
return concatEagerDelayError(sources, bufferSize(), bufferSize());
}

/**
* Concatenates a {@link Publisher} sequence of {@code Publisher}s eagerly into a single stream of values,
* delaying errors until all the inner and outer sequences terminate and runs a limited number of inner
* sequences at once.
* <p>
* <img width="640" height="421" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Flowable.concatEagerDelayError.pn.png" alt="">
* <p>
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
* emitted source {@code Publisher}s as they are observed. The operator buffers the values emitted by these
* {@code Publisher}s and then drains them in order, each one after the previous one completes.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>Backpressure is honored towards the downstream and both the outer and inner {@code Publisher}s are
* expected to support backpressure. Violating this assumption, the operator will
* signal {@link MissingBackpressureException}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources a sequence of {@code Publisher}s that need to be eagerly concatenated
* @param maxConcurrency the maximum number of concurrently running inner {@code Publisher}s; {@link Integer#MAX_VALUE}
* is interpreted as all inner {@code Publisher}s can be active at the same time
* @param prefetch the number of elements to prefetch from each inner {@code Publisher} source
* @return the new {@code Flowable} instance with the specified concatenation behavior
* @throws NullPointerException if {@code sources} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} or {@code prefetch} is non-positive
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Flowable<T> concatEagerDelayError(@NonNull Publisher<@NonNull ? extends Publisher<@NonNull ? extends T>> sources, int maxConcurrency, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapEagerPublisher(sources, Functions.identity(), maxConcurrency, prefetch, ErrorMode.END));
}

/**
Expand Down Expand Up @@ -10134,6 +10286,7 @@ public final <R> Flowable<R> flatMap(@NonNull Function<? super T, ? extends Publ
* if {@code false}, the first one signaling an exception will terminate the whole sequence immediately
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code mapper} is {@code null}
* @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0
*/
Expand Down
Loading