Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: [Java 8] Add flattenStreamAsX to Maybe/Single #6805

Merged
merged 2 commits into from
Dec 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

import org.reactivestreams.*;

Expand Down Expand Up @@ -3152,6 +3153,7 @@ public final <U, R> Maybe<R> flatMap(@NonNull Function<? super T, ? extends Mayb
* source Maybe
* @return the new Flowable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #flattenStreamAsFlowable(Function)
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
Expand Down Expand Up @@ -5004,4 +5006,87 @@ public final CompletionStage<T> toCompletionStage() {
public final CompletionStage<T> toCompletionStage(@Nullable T defaultItem) {
return subscribeWith(new CompletionStageConsumer<>(true, defaultItem));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as a {@link Flowable}.
* <p>
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsFlowable.m.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}:
* <pre><code>
* source.flattenAsFlowable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsFlowable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and iterates the given {@code Stream}
* on demand (i.e., when requested).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsFlowable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Flowable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Flowable instance
* @since 3.0.0
* @see #flattenAsFlowable(Function)
* @see #flattenStreamAsObservable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
public final <R> Flowable<R> flattenStreamAsFlowable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlattenStreamAsFlowable<>(this, mapper));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as an {@link Observable}.
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsObservable.m.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsObservable(Function)}:
* <pre><code>
* source.flattenAsObservable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsObservable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Observable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Observable instance
* @since 3.0.0
* @see #flattenAsObservable(Function)
* @see #flattenStreamAsFlowable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> flattenStreamAsObservable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlattenStreamAsObservable<>(this, mapper));
}
}
1 change: 1 addition & 0 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6517,6 +6517,7 @@ public final <R> Observable<R> concatMap(@NonNull Function<? super T, ? extends
* the scheduler where the {@code mapper} function will be executed
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source ObservableSource and concatenating the ObservableSources obtained from this transformation
* @since 3.0.0
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
Expand Down
87 changes: 87 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

import org.reactivestreams.Publisher;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
Expand Down Expand Up @@ -2799,6 +2801,7 @@ public final <R> Flowable<R> flatMapPublisher(@NonNull Function<? super T, ? ext
* source Single
* @return the new Flowable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #flattenStreamAsFlowable(Function)
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
Expand Down Expand Up @@ -2826,6 +2829,7 @@ public final <U> Flowable<U> flattenAsFlowable(@NonNull Function<? super T, ? ex
* source Single
* @return the new Observable instance
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #flattenStreamAsObservable(Function)
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -4308,4 +4312,87 @@ private static <T> Single<T> toSingle(@NonNull Flowable<T> source) {
public final CompletionStage<T> toCompletionStage() {
return subscribeWith(new CompletionStageConsumer<>(false, null));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as a {@link Flowable}.
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsFlowable.s.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}:
* <pre><code>
* source.flattenAsFlowable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsFlowable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and iterates the given {@code Stream}
* on demand (i.e., when requested).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsFlowable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Flowable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Flowable instance
* @since 3.0.0
* @see #flattenAsFlowable(Function)
* @see #flattenStreamAsObservable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
@NonNull
public final <R> Flowable<R> flattenStreamAsFlowable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlattenStreamAsFlowable<>(this, mapper));
}

/**
* Maps the upstream succecss value into a Java {@link Stream} and emits its
* items to the downstream consumer as an {@link Observable}.
* <p>
* <img width="640" height="247" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flattenStreamAsObservable.s.png" alt="">
* <p>
* The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when
* closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}.
* If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}:
* <pre><code>
* source.flattenAsObservable(item -&gt; createStream(item)::iterator);
* </code></pre>
* <p>
* Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}):
* <pre><code>
* source.flattenStreamAsObservable(item -&gt; IntStream.rangeClosed(1, 10).boxed());
* </code></pre>
* <p>
* {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times
* from multiple threads can lead to undefined behavior.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flattenStreamAsObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the element type of the {@code Stream} and the output {@code Observable}
* @param mapper the function that receives the upstream success item and should
* return a {@code Stream} of values to emit.
* @return the new Observable instance
* @since 3.0.0
* @see #flattenAsObservable(Function)
* @see #flattenStreamAsFlowable(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <R> Observable<R> flattenStreamAsObservable(@NonNull Function<? super T, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlattenStreamAsObservable<>(this, mapper));
}
}
Loading