diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 4177a2d9e6..09cb859b83 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -18997,4 +18997,289 @@ public final CompletionStage singleOrErrorStage() { public final CompletionStage lastOrErrorStage() { return subscribeWith(new FlowableLastStageSubscriber<>(false, null)); } + + /** + * Creates a sequential {@link Stream} to consume or process this {@code Flowable} in a blocking manner via + * the Java {@code Stream} API. + *

+ * + *

+ * Cancellation of the upstream is done via {@link Stream#close()}, therefore, it is strongly recommended the + * consumption is performed within a try-with-resources construct: + *


+     * Flowable<Integer> source = Flowable.range(1, 10)
+     *        .subscribeOn(Schedulers.computation());
+     *
+     * try (Stream<Integer> stream = source.blockingStream()) {
+     *     stream.limit(3).forEach(System.out::println);
+     * }
+     * 
+ *
+ *
Backpressure:
+ *
The operator requests {@link #bufferSize()} amount upfront and 75% of it after each 75% of the amount received.
+ *
Scheduler:
+ *
{@code blockingStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new Stream instance + * @since 3.0.0 + * @see #blockingStream(int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Stream blockingStream() { + return blockingStream(bufferSize()); + } + + /** + * Creates a sequential {@link Stream} to consume or process this {@code Flowable} in a blocking manner via + * the Java {@code Stream} API. + *

+ * + *

+ * Cancellation of the upstream is done via {@link Stream#close()}, therefore, it is strongly recommended the + * consumption is performed within a try-with-resources construct: + *


+     * Flowable<Integer> source = Flowable.range(1, 10)
+     *        .subscribeOn(Schedulers.computation());
+     *
+     * try (Stream<Integer> stream = source.blockingStream(4)) {
+     *     stream.limit(3).forEach(System.out::println);
+     * }
+     * 
+ *
+ *
Backpressure:
+ *
The operator requests the given {@code prefetch} amount upfront and 75% of it after each 75% of the amount received.
+ *
Scheduler:
+ *
{@code blockingStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param prefetch the number of items to request from the upstream to limit the number of + * in-flight items and item generation. + * @return the new Stream instance + * @since 3.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Stream blockingStream(int prefetch) { + Iterator iterator = blockingIterable(prefetch).iterator(); + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false) + .onClose(() -> ((Disposable)iterator).dispose()); + } + + /** + * Maps each upstream item into a {@link Stream} and emits the {@code Stream}'s items to the downstream in a sequential fashion. + *

+ * + *

+ * Due to the blocking and sequential nature of Java {@link Stream}s, the streams are mapped and consumed in a sequential fashion + * without interleaving (unlike a more general {@link #flatMap(Function)}). Therefore, {@code flatMapStream} and + * {@code concatMapStream} are identical operators and are provided as aliases. + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #concatMapIterable(Function)}: + *


+     * source.concatMapIterable(v -> createStream(v)::iterator);
+     * 
+ *

+ * Note that {@code Stream}s can be consumed only once; any subsequent attempt to consume a {@code Stream} + * will result in an {@link IllegalStateException}. + *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.concatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator + * prefetches {@link #bufferSize} items of the upstream (then 75% of it after the 75% received) + * and caches them until they are ready to be mapped into {@code Stream}s + * after the current {@code Stream} has been consumed.
+ *
Scheduler:
+ *
{@code concatMapStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the {@code Stream}s and the result + * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements + * will be emitted to the downstream + * @return the new Flowable instance + * @see #concatMap(Function) + * @see #concatMapIterable(Function) + * @see #concatMapStream(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> Flowable concatMapStream(@NonNull Function> mapper) { + return flatMapStream(mapper, bufferSize()); + } + + /** + * Maps each upstream item into a {@link Stream} and emits the {@code Stream}'s items to the downstream in a sequential fashion. + *

+ * + *

+ * Due to the blocking and sequential nature of Java {@link Stream}s, the streams are mapped and consumed in a sequential fashion + * without interleaving (unlike a more general {@link #flatMap(Function)}). Therefore, {@code flatMapStream} and + * {@code concatMapStream} are identical operators and are provided as aliases. + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #concatMapIterable(Function, int)}: + *


+     * source.concatMapIterable(v -> createStream(v)::iterator, 32);
+     * 
+ *

+ * Note that {@code Stream}s can be consumed only once; any subsequent attempt to consume a {@code Stream} + * will result in an {@link IllegalStateException}. + *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.concatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed(), 32);
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator + * prefetches the given amount of upstream items and caches them until they are ready to be mapped into {@code Stream}s + * after the current {@code Stream} has been consumed.
+ *
Scheduler:
+ *
{@code concatMapStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the {@code Stream}s and the result + * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements + * will be emitted to the downstream + * @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received + * @return the new Flowable instance + * @see #concatMap(Function, int) + * @see #concatMapIterable(Function, int) + * @see #flatMapStream(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> Flowable concatMapStream(@NonNull Function> mapper, int prefetch) { + Objects.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableFlatMapStream<>(this, mapper, prefetch)); + } + + /** + * Maps each upstream item into a {@link Stream} and emits the {@code Stream}'s items to the downstream in a sequential fashion. + *

+ * + *

+ * Due to the blocking and sequential nature of Java {@link Stream}s, the streams are mapped and consumed in a sequential fashion + * without interleaving (unlike a more general {@link #flatMap(Function)}). Therefore, {@code flatMapStream} and + * {@code concatMapStream} are identical operators and are provided as aliases. + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flatMapIterable(Function)}: + *


+     * source.flatMapIterable(v -> createStream(v)::iterator);
+     * 
+ *

+ * Note that {@code Stream}s can be consumed only once; any subsequent attempt to consume a {@code Stream} + * will result in an {@link IllegalStateException}. + *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator + * prefetches {@link #bufferSize} items of the upstream (then 75% of it after the 75% received) + * and caches them until they are ready to be mapped into {@code Stream}s + * after the current {@code Stream} has been consumed.
+ *
Scheduler:
+ *
{@code flatMapStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the {@code Stream}s and the result + * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements + * will be emitted to the downstream + * @return the new Flowable instance + * @see #flatMap(Function) + * @see #flatMapIterable(Function) + * @see #flatMapStream(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> Flowable flatMapStream(@NonNull Function> mapper) { + return flatMapStream(mapper, bufferSize()); + } + + /** + * Maps each upstream item into a {@link Stream} and emits the {@code Stream}'s items to the downstream in a sequential fashion. + *

+ * + *

+ * Due to the blocking and sequential nature of Java {@link Stream}s, the streams are mapped and consumed in a sequential fashion + * without interleaving (unlike a more general {@link #flatMap(Function)}). Therefore, {@code flatMapStream} and + * {@code concatMapStream} are identical operators and are provided as aliases. + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flatMapIterable(Function, int)}: + *


+     * source.flatMapIterable(v -> createStream(v)::iterator, 32);
+     * 
+ *

+ * Note that {@code Stream}s can be consumed only once; any subsequent attempt to consume a {@code Stream} + * will result in an {@link IllegalStateException}. + *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flatMapStream(v -> IntStream.rangeClosed(v + 1, v + 10).boxed(), 32);
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors the downstream backpressure and consumes the inner stream only on demand. The operator + * prefetches the given amount of upstream items and caches them until they are ready to be mapped into {@code Stream}s + * after the current {@code Stream} has been consumed.
+ *
Scheduler:
+ *
{@code flatMapStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the element type of the {@code Stream}s and the result + * @param mapper the function that receives an upstream item and should return a {@code Stream} whose elements + * will be emitted to the downstream + * @param prefetch the number of upstream items to request upfront, then 75% of this amount after each 75% upstream items received + * @return the new Flowable instance + * @see #flatMap(Function, int) + * @see #flatMapIterable(Function, int) + * @see #concatMapStream(Function, int) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> Flowable flatMapStream(@NonNull Function> mapper, int prefetch) { + Objects.requireNonNull(mapper, "mapper is null"); + ObjectHelper.verifyPositive(prefetch, "prefetch"); + return RxJavaPlugins.onAssembly(new FlowableFlatMapStream<>(this, mapper, prefetch)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java new file mode 100644 index 0000000000..ad1b3ad140 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java @@ -0,0 +1,332 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; +import java.util.concurrent.atomic.*; +import java.util.stream.Stream; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.MissingBackpressureException; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; +import io.reactivex.rxjava3.internal.subscriptions.*; +import io.reactivex.rxjava3.internal.util.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Maps the upstream values onto {@link Stream}s and emits their items in order to the downstream. + * + * @param the upstream element type + * @param the inner {@code Stream} and result element type + * @since 3.0.0 + */ +public final class FlowableFlatMapStream extends Flowable { + + final Flowable source; + + final Function> mapper; + + final int prefetch; + + public FlowableFlatMapStream(Flowable source, Function> mapper, int prefetch) { + this.source = source; + this.mapper = mapper; + this.prefetch = prefetch; + } + + @Override + protected void subscribeActual(Subscriber s) { + if (source instanceof Supplier) { + Stream stream = null; + try { + @SuppressWarnings("unchecked") + T t = ((Supplier)source).get(); + if (t != null) { + stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream"); + } + } catch (Throwable ex) { + EmptySubscription.error(ex, s); + return; + } + + if (stream != null) { + FlowableFromStream.subscribeStream(s, stream); + } else { + EmptySubscription.complete(s); + } + } else { + source.subscribe(new FlatMapStreamSubscriber<>(s, mapper, prefetch)); + } + } + + static final class FlatMapStreamSubscriber extends AtomicInteger + implements FlowableSubscriber, Subscription { + + private static final long serialVersionUID = -5127032662980523968L; + + final Subscriber downstream; + + final Function> mapper; + + final int prefetch; + + final AtomicLong requested; + + SimpleQueue queue; + + Subscription upstream; + + Iterator currentIterator; + + AutoCloseable currentCloseable; + + volatile boolean cancelled; + + volatile boolean upstreamDone; + final AtomicThrowable error; + + long emitted; + + int consumed; + + int sourceMode; + + FlatMapStreamSubscriber(Subscriber downstream, Function> mapper, int prefetch) { + this.downstream = downstream; + this.mapper = mapper; + this.prefetch = prefetch; + this.requested = new AtomicLong(); + this.error = new AtomicThrowable(); + } + + @Override + public void onSubscribe(@NonNull Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + if (s instanceof QueueSubscription) { + + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + + int m = qs.requestFusion(QueueFuseable.ANY | QueueFuseable.BOUNDARY); + if (m == QueueFuseable.SYNC) { + sourceMode = m; + queue = qs; + upstreamDone = true; + + downstream.onSubscribe(this); + return; + } + else if (m == QueueFuseable.ASYNC) { + sourceMode = m; + queue = qs; + + downstream.onSubscribe(this); + + s.request(prefetch); + return; + } + } + + queue = new SpscArrayQueue<>(prefetch); + + downstream.onSubscribe(this); + + s.request(prefetch); + } + } + + @Override + public void onNext(T t) { + if (sourceMode != QueueFuseable.ASYNC) { + if (!queue.offer(t)) { + upstream.cancel(); + onError(new MissingBackpressureException("Queue full?!")); + return; + } + } + drain(); + } + + @Override + public void onError(Throwable t) { + if (error.compareAndSet(null, t)) { + upstreamDone = true; + drain(); + } else { + RxJavaPlugins.onError(t); + } + } + + @Override + public void onComplete() { + upstreamDone = true; + drain(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + cancelled = true; + upstream.cancel(); + drain(); + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + + final Subscriber downstream = this.downstream; + final SimpleQueue queue = this.queue; + final AtomicThrowable error = this.error; + Iterator iterator = this.currentIterator; + long requested = this.requested.get(); + long emitted = this.emitted; + final int limit = prefetch - (prefetch >> 2); + boolean canRequest = sourceMode != QueueFuseable.SYNC; + + for (;;) { + if (cancelled) { + queue.clear(); + clearCurrentSuppressCloseError(); + } else { + boolean isDone = upstreamDone; + if (error.get() != null) { + downstream.onError(error.get()); + cancelled = true; + continue; + } + + if (iterator == null) { + T t; + + try { + t = queue.poll(); + } catch (Throwable ex) { + trySignalError(downstream, ex); + continue; + } + + boolean isEmpty = t == null; + + if (isDone && isEmpty) { + downstream.onComplete(); + cancelled = true; + } + else if (!isEmpty) { + if (canRequest && ++consumed == limit) { + consumed = 0; + upstream.request(limit); + } + + Stream stream; + try { + stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream"); + iterator = stream.iterator(); + + if (iterator.hasNext()) { + currentIterator = iterator; + currentCloseable = stream; + } else { + iterator = null; + } + } catch (Throwable ex) { + trySignalError(downstream, ex); + } + continue; + } + } + if (iterator != null && emitted != requested) { + R item; + + try { + item = Objects.requireNonNull(iterator.next(), "The Stream.Iterator returned a null value"); + } catch (Throwable ex) { + trySignalError(downstream, ex); + continue; + } + + if (!cancelled) { + downstream.onNext(item); + emitted++; + + if (!cancelled) { + try { + if (!iterator.hasNext()) { + iterator = null; + clearCurrentRethrowCloseError(); + } + } catch (Throwable ex) { + trySignalError(downstream, ex); + } + } + } + + continue; + } + } + + this.emitted = emitted; + missed = addAndGet(-missed); + if (missed == 0) { + break; + } + requested = this.requested.get(); + } + } + + void clearCurrentRethrowCloseError() throws Throwable { + currentIterator = null; + AutoCloseable ac = currentCloseable; + currentCloseable = null; + if (ac != null) { + ac.close(); + } + } + + void clearCurrentSuppressCloseError() { + try { + clearCurrentRethrowCloseError(); + } catch (Throwable ex) { + RxJavaPlugins.onError(ex); + } + } + + void trySignalError(Subscriber downstream, Throwable ex) { + if (error.compareAndSet(null, ex)) { + upstream.cancel(); + cancelled = true; + downstream.onError(ex); + } else { + RxJavaPlugins.onError(ex); + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java index 0f4e4b16ba..ecc18fb154 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java @@ -42,6 +42,16 @@ public FlowableFromStream(Stream stream) { @Override protected void subscribeActual(Subscriber s) { + subscribeStream(s, stream); + } + + /** + * Subscribes to the Stream by picking the normal or conditional stream Subscription implementation. + * @param the element type of the flow + * @param s the subscriber to drive + * @param stream the sequence to consume + */ + public static void subscribeStream(Subscriber s, Stream stream) { Iterator iterator; try { iterator = stream.iterator(); diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream0HTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream0HTckTest.java new file mode 100644 index 0000000000..57365f5857 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream0HTckTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class FlatMapStream0HTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.just(1).hide().flatMapStream(v -> IntStream.range(0, (int)elements).boxed()) + ; + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Flowable.just(1).hide().flatMapStream(v -> stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream0TckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream0TckTest.java new file mode 100644 index 0000000000..232ded7321 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream0TckTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class FlatMapStream0TckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.just(1).flatMapStream(v -> IntStream.range(0, (int)elements).boxed()) + ; + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Flowable.just(1).flatMapStream(v -> stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream1HTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream1HTckTest.java new file mode 100644 index 0000000000..aa65a19ca7 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream1HTckTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class FlatMapStream1HTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.range(1, (int)elements).hide().flatMapStream(v -> Stream.of(v)) + ; + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Flowable.just(1).hide().flatMapStream(v -> stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream1TckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream1TckTest.java new file mode 100644 index 0000000000..20e99b1ec1 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream1TckTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class FlatMapStream1TckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.range(1, (int)elements).flatMapStream(v -> Stream.of(v)) + ; + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Flowable.just(1).flatMapStream(v -> stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream2HTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream2HTckTest.java new file mode 100644 index 0000000000..0c06e4de40 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream2HTckTest.java @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class FlatMapStream2HTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + if (elements % 2 == 0) { + return Flowable.range(0, (int)elements / 2).hide().flatMapStream(v -> Stream.of(v, v + 1)); + } + return + Flowable.range(-1, 1 + (int)elements / 2).hide().flatMapStream(v -> { + if (v != -1) { + return Stream.of(v, v + 1); + } + return Stream.of(v); + }); + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Flowable.just(1).hide().flatMapStream(v -> stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream2TckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream2TckTest.java new file mode 100644 index 0000000000..d799a720c9 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream2TckTest.java @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class FlatMapStream2TckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + if (elements % 2 == 0) { + return Flowable.range(0, (int)elements / 2).flatMapStream(v -> Stream.of(v, v + 1)); + } + return + Flowable.range(-1, 1 + (int)elements / 2).flatMapStream(v -> { + if (v != -1) { + return Stream.of(v, v + 1); + } + return Stream.of(v); + }); + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Flowable.just(1).flatMapStream(v -> stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableBlockingStreamTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableBlockingStreamTest.java new file mode 100644 index 0000000000..2cf932f40e --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableBlockingStreamTest.java @@ -0,0 +1,107 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.stream.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.processors.UnicastProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; + +public class FlowableBlockingStreamTest extends RxJavaTest { + + @Test + public void empty() { + try (Stream stream = Flowable.empty().blockingStream()) { + assertEquals(0, stream.toArray().length); + } + } + + @Test + public void just() { + try (Stream stream = Flowable.just(1).blockingStream()) { + assertArrayEquals(new Integer[] { 1 }, stream.toArray(Integer[]::new)); + } + } + + @Test + public void range() { + try (Stream stream = Flowable.range(1, 5).blockingStream()) { + assertArrayEquals(new Integer[] { 1, 2, 3, 4, 5 }, stream.toArray(Integer[]::new)); + } + } + + @Test + public void rangeBackpressured() { + try (Stream stream = Flowable.range(1, 5).blockingStream(1)) { + assertArrayEquals(new Integer[] { 1, 2, 3, 4, 5 }, stream.toArray(Integer[]::new)); + } + } + + @Test + public void rangeAsyncBackpressured() { + try (Stream stream = Flowable.range(1, 1000).subscribeOn(Schedulers.computation()).blockingStream()) { + List list = stream.collect(Collectors.toList()); + + assertEquals(1000, list.size()); + for (int i = 1; i <= 1000; i++) { + assertEquals(i, list.get(i - 1).intValue()); + } + } + } + + @Test + public void rangeAsyncBackpressured1() { + try (Stream stream = Flowable.range(1, 1000).subscribeOn(Schedulers.computation()).blockingStream(1)) { + List list = stream.collect(Collectors.toList()); + + assertEquals(1000, list.size()); + for (int i = 1; i <= 1000; i++) { + assertEquals(i, list.get(i - 1).intValue()); + } + } + } + + @Test + public void error() { + try (Stream stream = Flowable.error(new TestException()).blockingStream()) { + stream.toArray(Integer[]::new); + fail("Should have thrown!"); + } catch (TestException expected) { + // expected + } + } + + @Test + public void close() { + UnicastProcessor up = UnicastProcessor.create(); + up.onNext(1); + up.onNext(2); + up.onNext(3); + up.onNext(4); + up.onNext(5); + + try (Stream stream = up.blockingStream()) { + assertArrayEquals(new Integer[] { 1, 2, 3 }, stream.limit(3).toArray(Integer[]::new)); + } + + assertFalse(up.hasSubscribers()); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java new file mode 100644 index 0000000000..548a66a709 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStreamTest.java @@ -0,0 +1,451 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.*; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.processors.*; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class FlowableFlatMapStreamTest extends RxJavaTest { + + @Test + public void empty() { + Flowable.empty() + .flatMapStream(v -> Stream.of(1, 2, 3, 4, 5)) + .test() + .assertResult(); + } + + @Test + public void emptyHidden() { + Flowable.empty() + .hide() + .flatMapStream(v -> Stream.of(1, 2, 3, 4, 5)) + .test() + .assertResult(); + } + + @Test + public void just() { + Flowable.just(1) + .flatMapStream(v -> Stream.of(v + 1, v + 2, v + 3, v + 4, v + 5)) + .test() + .assertResult(2, 3, 4, 5, 6); + } + + @Test + public void justHidden() { + Flowable.just(1).hide() + .flatMapStream(v -> Stream.of(v + 1, v + 2, v + 3, v + 4, v + 5)) + .test() + .assertResult(2, 3, 4, 5, 6); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .flatMapStream(v -> Stream.of(1, 2, 3, 4, 5)) + .test() + .assertFailure(TestException.class); + } + + @Test + public void supplierFusedError() { + Flowable.fromCallable(() -> { throw new TestException(); }) + .flatMapStream(v -> Stream.of(1, 2, 3, 4, 5)) + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorHidden() { + Flowable.error(new TestException()) + .hide() + .flatMapStream(v -> Stream.of(1, 2, 3, 4, 5)) + .test() + .assertFailure(TestException.class); + } + + @Test + public void range() { + Flowable.range(1, 5) + .flatMapStream(v -> IntStream.range(v * 10, v * 10 + 5).boxed()) + .test() + .assertResult( + 10, 11, 12, 13, 14, + 20, 21, 22, 23, 24, + 30, 31, 32, 33, 34, + 40, 41, 42, 43, 44, + 50, 51, 52, 53, 54 + ); + } + + @Test + public void rangeHidden() { + Flowable.range(1, 5) + .hide() + .flatMapStream(v -> IntStream.range(v * 10, v * 10 + 5).boxed()) + .test() + .assertResult( + 10, 11, 12, 13, 14, + 20, 21, 22, 23, 24, + 30, 31, 32, 33, 34, + 40, 41, 42, 43, 44, + 50, 51, 52, 53, 54 + ); + } + + @Test + public void rangeToEmpty() { + Flowable.range(1, 5) + .flatMapStream(v -> Stream.of()) + .test() + .assertResult(); + } + + @Test + public void rangeTake() { + Flowable.range(1, 5) + .flatMapStream(v -> IntStream.range(v * 10, v * 10 + 5).boxed()) + .take(12) + .test() + .assertResult( + 10, 11, 12, 13, 14, + 20, 21, 22, 23, 24, + 30, 31 + ); + } + + @Test + public void rangeTakeHidden() { + Flowable.range(1, 5) + .hide() + .flatMapStream(v -> IntStream.range(v * 10, v * 10 + 5).boxed()) + .take(12) + .test() + .assertResult( + 10, 11, 12, 13, 14, + 20, 21, 22, 23, 24, + 30, 31 + ); + } + + @Test + public void upstreamCancelled() { + PublishProcessor pp = PublishProcessor.create(); + + AtomicInteger calls = new AtomicInteger(); + + TestSubscriber ts = pp + .flatMapStream(v -> Stream.of(v + 1, v + 2).onClose(() -> calls.getAndIncrement())) + .test(1); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts.assertValuesOnly(2); + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + + assertEquals(1, calls.get()); + } + + @Test + public void upstreamCancelledCloseCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .flatMapStream(v -> Stream.of(v + 1, v + 2).onClose(() -> { throw new TestException(); })) + .test(1); + + assertTrue(pp.hasSubscribers()); + + pp.onNext(1); + + ts.assertValuesOnly(2); + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void crossMap() { + Flowable.range(1, 1000) + .flatMapStream(v -> IntStream.range(v * 1000, v * 1000 + 1000).boxed()) + .test() + .assertValueCount(1_000_000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void crossMapHidden() { + Flowable.range(1, 1000) + .hide() + .flatMapStream(v -> IntStream.range(v * 1000, v * 1000 + 1000).boxed()) + .test() + .assertValueCount(1_000_000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void crossMapBackpressured() { + for (int n = 1; n < 2048; n *= 2) { + Flowable.range(1, 1000) + .flatMapStream(v -> IntStream.range(v * 1000, v * 1000 + 1000).boxed()) + .rebatchRequests(n) + .test() + .withTag("rebatch: " + n) + .assertValueCount(1_000_000) + .assertNoErrors() + .assertComplete(); + } + } + + @Test + public void crossMapBackpressuredHidden() { + for (int n = 1; n < 2048; n *= 2) { + Flowable.range(1, 1000) + .hide() + .flatMapStream(v -> IntStream.range(v * 1000, v * 1000 + 1000).boxed()) + .rebatchRequests(n) + .test() + .withTag("rebatch: " + n) + .assertValueCount(1_000_000) + .assertNoErrors() + .assertComplete(); + } + } + + @Test + public void onSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(f -> f.flatMapStream(v -> Stream.of(1, 2))); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(UnicastProcessor.create().flatMapStream(v -> Stream.of(1, 2))); + } + + @Test + public void queueOverflow() throws Throwable { + TestHelper.withErrorTracking(errors -> { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onNext(3); + s.onError(new TestException()); + } + } + .flatMapStream(v -> Stream.of(1, 2), 1) + .test(0) + .assertFailure(MissingBackpressureException.class); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void mapperThrows() { + Flowable.just(1).hide() + .concatMapStream(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void mapperNull() { + Flowable.just(1).hide() + .concatMapStream(v -> null) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void streamNull() { + Flowable.just(1).hide() + .concatMapStream(v -> Stream.of(1, null)) + .test() + .assertFailure(NullPointerException.class, 1); + } + + @Test + public void hasNextThrows() { + Flowable.just(1).hide() + .concatMapStream(v -> Stream.generate(() -> { throw new TestException(); })) + .test() + .assertFailure(TestException.class); + } + + @Test + public void hasNextThrowsLater() { + AtomicInteger counter = new AtomicInteger(); + Flowable.just(1).hide() + .concatMapStream(v -> Stream.generate(() -> { + if (counter.getAndIncrement() == 0) { + return 1; + } + throw new TestException(); + })) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void mapperThrowsWhenUpstreamErrors() throws Throwable { + TestHelper.withErrorTracking(errors -> { + PublishProcessor pp = PublishProcessor.create(); + + AtomicInteger counter = new AtomicInteger(); + + TestSubscriber ts = pp.hide() + .concatMapStream(v -> { + if (counter.getAndIncrement() == 0) { + return Stream.of(1, 2); + } + pp.onError(new IOException()); + throw new TestException(); + }) + .test(); + + pp.onNext(1); + pp.onNext(2); + + ts + .assertFailure(IOException.class, 1, 2); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void rangeBackpressured() { + Flowable.range(1, 5) + .hide() + .concatMapStream(v -> Stream.of(v), 1) + .test(0) + .assertEmpty() + .requestMore(5) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void cancelAfterIteratorNext() throws Exception { + TestSubscriber ts = new TestSubscriber<>(); + + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + ts.cancel(); + return 1; + } + }); + + Flowable.just(1) + .hide() + .concatMapStream(v -> stream) + .subscribe(ts); + + ts.assertEmpty(); + } + + @Test + public void asyncUpstreamFused() { + UnicastProcessor up = UnicastProcessor.create(); + + TestSubscriber ts = up.flatMapStream(v -> Stream.of(1, 2)) + .test(); + + assertTrue(up.hasSubscribers()); + + up.onNext(1); + + ts.assertValuesOnly(1, 2); + + up.onComplete(); + + ts.assertResult(1, 2); + } + + @Test + public void asyncUpstreamFusionBoundary() { + UnicastProcessor up = UnicastProcessor.create(); + + TestSubscriber ts = up + .map(v -> v + 1) + .flatMapStream(v -> Stream.of(1, 2)) + .test(); + + assertTrue(up.hasSubscribers()); + + up.onNext(1); + + ts.assertValuesOnly(1, 2); + + up.onComplete(); + + ts.assertResult(1, 2); + } + + @Test + public void fusedPollCrash() { + UnicastProcessor up = UnicastProcessor.create(); + + TestSubscriber ts = up + .map(v -> { throw new TestException(); }) + .compose(TestHelper.flowableStripBoundary()) + .flatMapStream(v -> Stream.of(1, 2)) + .test(); + + assertTrue(up.hasSubscribers()); + + up.onNext(1); + + assertFalse(up.hasSubscribers()); + + ts.assertFailure(TestException.class); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java index 865b31a082..f31475ae58 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java @@ -2987,6 +2987,17 @@ public void cancel() { } } + /** + * Strips the {@link QueueFuseable#BOUNDARY} mode flag when the downstream calls {@link QueueSubscription#requestFusion(int)}. + *

+ * By default, many operators use {@link QueueFuseable#BOUNDARY} to indicate upstream side-effects + * should not leak over a fused boundary. However, some tests want to verify if {@link QueueSubscription#poll()} crashes + * are handled correctly and the most convenient way is to crash {@link Flowable#map} that won't fuse with {@code BOUNDARY} + * flag. This transformer strips this flag and thus allows the function of {@code map} to be executed as part of the + * {@code poll()} chain. + * @param the element type of the flow + * @return the new Transformer instance + */ public static FlowableTransformer flowableStripBoundary() { return new FlowableStripBoundary<>(null); } @@ -3092,6 +3103,17 @@ public boolean isDisposed() { } } + /** + * Strips the {@link QueueFuseable#BOUNDARY} mode flag when the downstream calls {@link QueueDisposable#requestFusion(int)}. + *

+ * By default, many operators use {@link QueueFuseable#BOUNDARY} to indicate upstream side-effects + * should not leak over a fused boundary. However, some tests want to verify if {@link QueueDisposable#poll()} crashes + * are handled correctly and the most convenient way is to crash {@link Observable#map} that won't fuse with {@code BOUNDARY} + * flag. This transformer strips this flag and thus allows the function of {@code map} to be executed as part of the + * {@code poll()} chain. + * @param the element type of the flow + * @return the new Transformer instance + */ public static ObservableTransformer observableStripBoundary() { return new ObservableStripBoundary<>(null); }