From 68d3d5c0ef361a75055d841beb5db5540bcd8aa3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 28 Dec 2019 15:43:11 +0100 Subject: [PATCH 1/2] 3.x: [Java 8] Add flattenStreamAsX to Maybe/Single --- .../java/io/reactivex/rxjava3/core/Maybe.java | 85 ++++ .../io/reactivex/rxjava3/core/Observable.java | 1 + .../io/reactivex/rxjava3/core/Single.java | 87 ++++ .../jdk8/MaybeFlattenStreamAsFlowable.java | 285 +++++++++++ .../jdk8/MaybeFlattenStreamAsObservable.java | 262 ++++++++++ .../jdk8/SingleFlattenStreamAsFlowable.java | 47 ++ .../jdk8/SingleFlattenStreamAsObservable.java | 45 ++ .../MaybeFlattenStreamAsFlowableTest.java | 453 ++++++++++++++++++ .../MaybeFlattenStreamAsObservableTest.java | 431 +++++++++++++++++ .../SingleFlattenStreamAsFlowableTest.java | 440 +++++++++++++++++ .../SingleFlattenStreamAsObservableTest.java | 418 ++++++++++++++++ .../validators/JavadocForAnnotations.java | 6 +- .../rxjava3/validators/JavadocWording.java | 14 +- 13 files changed, 2567 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowable.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservable.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowable.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservable.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservableTest.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index dfe8c4f0b4..84a088d5c7 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -15,6 +15,7 @@ import java.util.*; import java.util.concurrent.*; +import java.util.stream.*; import org.reactivestreams.*; @@ -3152,6 +3153,7 @@ public final Maybe flatMap(@NonNull FunctionReactiveX operators documentation: FlatMap + * @see #flattenStreamAsFlowable(Function) */ @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @@ -5004,4 +5006,87 @@ public final CompletionStage toCompletionStage() { public final CompletionStage toCompletionStage(@Nullable T defaultItem) { return subscribeWith(new CompletionStageConsumer<>(true, defaultItem)); } + + /** + * Maps the upstream succecss value into a Java {@link Stream} and emits its + * items to the downstream consumer as a {@link Flowable}. + *

+ * + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}: + *


+     * source.flattenAsFlowable(item -> createStream(item)::iterator);
+     * 
+ *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flattenStreamAsFlowable(item -> IntStream.rangeClosed(1, 10).boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and iterates the given {@code Stream} + * on demand (i.e., when requested).
+ *
Scheduler:
+ *
{@code flattenStreamAsFlowable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the {@code Stream} and the output {@code Flowable} + * @param mapper the function that receives the upstream success item and should + * return a {@code Stream} of values to emit. + * @return the new Flowable instance + * @since 3.0.0 + * @see #flattenAsFlowable(Function) + * @see #flattenStreamAsObservable(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @NonNull + public final Flowable flattenStreamAsFlowable(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new MaybeFlattenStreamAsFlowable<>(this, mapper)); + } + + /** + * Maps the upstream succecss value into a Java {@link Stream} and emits its + * items to the downstream consumer as an {@link Observable}. + * + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsObservable(Function)}: + *


+     * source.flattenAsObservable(item -> createStream(item)::iterator);
+     * 
+ *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flattenStreamAsObservable(item -> IntStream.rangeClosed(1, 10).boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Scheduler:
+ *
{@code flattenStreamAsObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the {@code Stream} and the output {@code Observable} + * @param mapper the function that receives the upstream success item and should + * return a {@code Stream} of values to emit. + * @return the new Observable instance + * @since 3.0.0 + * @see #flattenAsObservable(Function) + * @see #flattenStreamAsFlowable(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Observable flattenStreamAsObservable(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new MaybeFlattenStreamAsObservable<>(this, mapper)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 373a576888..7c2558036e 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -6517,6 +6517,7 @@ public final Observable concatMap(@NonNull FunctionReactiveX operators documentation: FlatMap */ @CheckReturnValue diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index f0022aef17..28177cc9e3 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -15,10 +15,12 @@ import java.util.*; import java.util.concurrent.*; +import java.util.stream.*; import org.reactivestreams.Publisher; import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.*; @@ -2799,6 +2801,7 @@ public final Flowable flatMapPublisher(@NonNull FunctionReactiveX operators documentation: FlatMap + * @see #flattenStreamAsFlowable(Function) */ @BackpressureSupport(BackpressureKind.FULL) @CheckReturnValue @@ -2826,6 +2829,7 @@ public final Flowable flattenAsFlowable(@NonNull FunctionReactiveX operators documentation: FlatMap + * @see #flattenStreamAsObservable(Function) */ @CheckReturnValue @NonNull @@ -4308,4 +4312,87 @@ private static Single toSingle(@NonNull Flowable source) { public final CompletionStage toCompletionStage() { return subscribeWith(new CompletionStageConsumer<>(false, null)); } + + /** + * Maps the upstream succecss value into a Java {@link Stream} and emits its + * items to the downstream consumer as a {@link Flowable}. + * + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}: + *


+     * source.flattenAsFlowable(item -> createStream(item)::iterator);
+     * 
+ *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flattenStreamAsFlowable(item -> IntStream.rangeClosed(1, 10).boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and iterates the given {@code Stream} + * on demand (i.e., when requested).
+ *
Scheduler:
+ *
{@code flattenStreamAsFlowable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the {@code Stream} and the output {@code Flowable} + * @param mapper the function that receives the upstream success item and should + * return a {@code Stream} of values to emit. + * @return the new Flowable instance + * @since 3.0.0 + * @see #flattenAsFlowable(Function) + * @see #flattenStreamAsObservable(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.FULL) + @NonNull + public final Flowable flattenStreamAsFlowable(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new SingleFlattenStreamAsFlowable<>(this, mapper)); + } + + /** + * Maps the upstream succecss value into a Java {@link Stream} and emits its + * items to the downstream consumer as an {@link Observable}. + *

+ * + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #flattenAsFlowable(Function)}: + *


+     * source.flattenAsObservable(item -> createStream(item)::iterator);
+     * 
+ *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * source.flattenStreamAsObservable(item -> IntStream.rangeClosed(1, 10).boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Scheduler:
+ *
{@code flattenStreamAsObservable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the {@code Stream} and the output {@code Observable} + * @param mapper the function that receives the upstream success item and should + * return a {@code Stream} of values to emit. + * @return the new Observable instance + * @since 3.0.0 + * @see #flattenAsObservable(Function) + * @see #flattenStreamAsFlowable(Function) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Observable flattenStreamAsObservable(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new SingleFlattenStreamAsObservable<>(this, mapper)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowable.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowable.java new file mode 100644 index 0000000000..0ac724bdf4 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowable.java @@ -0,0 +1,285 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.internal.subscriptions.*; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Map the success value into a Java {@link Stream} and emits its values. + * + * @param the source value type + * @param the output value type + * @since 3.0.0 + */ +public final class MaybeFlattenStreamAsFlowable extends Flowable { + + final Maybe source; + + final Function> mapper; + + public MaybeFlattenStreamAsFlowable(Maybe source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(@NonNull Subscriber s) { + source.subscribe(new FlattenStreamMultiObserver<>(s, mapper)); + } + + static final class FlattenStreamMultiObserver + extends BasicIntQueueSubscription + implements MaybeObserver, SingleObserver { + + private static final long serialVersionUID = 7363336003027148283L; + + final Subscriber downstream; + + final Function> mapper; + + final AtomicLong requested; + + Disposable upstream; + + volatile Iterator iterator; + + AutoCloseable close; + + boolean once; + + volatile boolean cancelled; + + boolean outputFused; + + long emitted; + + FlattenStreamMultiObserver(Subscriber downstream, Function> mapper) { + this.downstream = downstream; + this.mapper = mapper; + this.requested = new AtomicLong(); + } + + @Override + public void onSubscribe(@NonNull Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(@NonNull T t) { + try { + Stream stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream"); + Iterator iterator = stream.iterator(); + AutoCloseable c = stream; + + if (!iterator.hasNext()) { + downstream.onComplete(); + close(c); + return; + } + this.iterator = iterator; + this.close = stream; + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + drain(); + } + + @Override + public void onError(@NonNull Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); + } + } + + @Override + public void cancel() { + cancelled = true; + upstream.dispose(); + if (!outputFused) { + drain(); + } + } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public @Nullable R poll() throws Throwable { + Iterator it = iterator; + if (it != null) { + if (once) { + if (!it.hasNext()) { + clear(); + return null; + } + } else { + once = true; + } + return it.next(); + } + return null; + } + + @Override + public boolean isEmpty() { + Iterator it = iterator; + if (it != null) { + if (!once) { + return false; + } + if (it.hasNext()) { + return false; + } + clear(); + } + return true; + } + + @Override + public void clear() { + iterator = null; + AutoCloseable close = this.close; + this.close = null; + close(close); + } + + void close(AutoCloseable c) { + try { + if (c != null) { + c.close(); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber downstream = this.downstream; + long emitted = this.emitted; + long requested = this.requested.get(); + Iterator it = iterator; + + for (;;) { + + if (cancelled) { + clear(); + } else { + if (outputFused) { + if (it != null) { + downstream.onNext(null); + downstream.onComplete(); + } + } else { + if (it != null && emitted != requested) { + R item; + try { + item = it.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + cancelled = true; + continue; + } + + if (cancelled) { + continue; + } + + downstream.onNext(item); + emitted++; + + if (cancelled) { + continue; + } + + boolean has; + try { + has = it.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + cancelled = true; + continue; + } + + if (cancelled) { + continue; + } + + if (!has) { + downstream.onComplete(); + cancelled = true; + } + continue; + } + } + } + + this.emitted = emitted; + missed = addAndGet(-missed); + if (missed == 0) { + return; + } + + requested = this.requested.get(); + if (it == null) { + it = iterator; + } + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservable.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservable.java new file mode 100644 index 0000000000..e7a74f0593 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservable.java @@ -0,0 +1,262 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; +import java.util.stream.Stream; + +import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.internal.observers.BasicIntQueueDisposable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Map the success value into a Java {@link Stream} and emits its values. + * + * @param the source value type + * @param the output value type + * @since 3.0.0 + */ +public final class MaybeFlattenStreamAsObservable extends Observable { + + final Maybe source; + + final Function> mapper; + + public MaybeFlattenStreamAsObservable(Maybe source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(@NonNull Observer s) { + source.subscribe(new FlattenStreamMultiObserver<>(s, mapper)); + } + + static final class FlattenStreamMultiObserver + extends BasicIntQueueDisposable + implements MaybeObserver, SingleObserver { + + private static final long serialVersionUID = 7363336003027148283L; + + final Observer downstream; + + final Function> mapper; + + Disposable upstream; + + volatile Iterator iterator; + + AutoCloseable close; + + boolean once; + + volatile boolean disposed; + + boolean outputFused; + + FlattenStreamMultiObserver(Observer downstream, Function> mapper) { + this.downstream = downstream; + this.mapper = mapper; + } + + @Override + public void onSubscribe(@NonNull Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(@NonNull T t) { + try { + Stream stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream"); + Iterator iterator = stream.iterator(); + AutoCloseable c = stream; + + if (!iterator.hasNext()) { + downstream.onComplete(); + close(c); + return; + } + this.iterator = iterator; + this.close = stream; + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + drain(); + } + + @Override + public void onError(@NonNull Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void dispose() { + disposed = true; + upstream.dispose(); + if (!outputFused) { + drain(); + } + } + + @Override + public boolean isDisposed() { + return disposed; + } + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + outputFused = true; + return ASYNC; + } + return NONE; + } + + @Override + public @Nullable R poll() throws Throwable { + Iterator it = iterator; + if (it != null) { + if (once) { + if (!it.hasNext()) { + clear(); + return null; + } + } else { + once = true; + } + return it.next(); + } + return null; + } + + @Override + public boolean isEmpty() { + Iterator it = iterator; + if (it != null) { + if (!once) { + return false; + } + if (it.hasNext()) { + return false; + } + clear(); + } + return true; + } + + @Override + public void clear() { + iterator = null; + AutoCloseable close = this.close; + this.close = null; + close(close); + } + + void close(AutoCloseable c) { + try { + if (c != null) { + c.close(); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + void drain() { + if (getAndIncrement() != 0) { + return; + } + + int missed = 1; + Observer downstream = this.downstream; + Iterator it = iterator; + + for (;;) { + + if (disposed) { + clear(); + } else { + if (outputFused) { + downstream.onNext(null); + downstream.onComplete(); + } else { + R item; + try { + item = it.next(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + disposed = true; + continue; + } + + if (disposed) { + continue; + } + + downstream.onNext(item); + + if (disposed) { + continue; + } + + boolean has; + try { + has = it.hasNext(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + disposed = true; + continue; + } + + if (disposed) { + continue; + } + + if (!has) { + downstream.onComplete(); + disposed = true; + } + continue; + } + } + + missed = addAndGet(-missed); + if (missed == 0) { + return; + } + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowable.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowable.java new file mode 100644 index 0000000000..d7286d5932 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowable.java @@ -0,0 +1,47 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.Stream; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.jdk8.MaybeFlattenStreamAsFlowable.FlattenStreamMultiObserver; + +/** + * Map the success value into a Java {@link Stream} and emits its values. + * + * @param the source value type + * @param the output value type + * @since 3.0.0 + */ +public final class SingleFlattenStreamAsFlowable extends Flowable { + + final Single source; + + final Function> mapper; + + public SingleFlattenStreamAsFlowable(Single source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(@NonNull Subscriber s) { + source.subscribe(new FlattenStreamMultiObserver<>(s, mapper)); + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservable.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservable.java new file mode 100644 index 0000000000..81b2d5fd7b --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservable.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.Stream; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.jdk8.MaybeFlattenStreamAsObservable.FlattenStreamMultiObserver; + +/** + * Map the success value into a Java {@link Stream} and emits its values. + * + * @param the source value type + * @param the output value type + * @since 3.0.0 + */ +public final class SingleFlattenStreamAsObservable extends Observable { + + final Single source; + + final Function> mapper; + + public SingleFlattenStreamAsObservable(Single source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(@NonNull Observer s) { + source.subscribe(new FlattenStreamMultiObserver<>(s, mapper)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTest.java new file mode 100644 index 0000000000..8e1e75b8e0 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTest.java @@ -0,0 +1,453 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.*; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.*; + +public class MaybeFlattenStreamAsFlowableTest extends RxJavaTest { + + @Test + public void successJust() { + Maybe.just(1) + .flattenStreamAsFlowable(Stream::of) + .test() + .assertResult(1); + } + + @Test + public void successEmpty() { + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of()) + .test() + .assertResult(); + } + + @Test + public void successMany() { + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of(2, 3, 4, 5, 6)) + .test() + .assertResult(2, 3, 4, 5, 6); + } + + @Test + public void successManyTake() { + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of(2, 3, 4, 5, 6)) + .take(3) + .test() + .assertResult(2, 3, 4); + } + + @Test + public void empty() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Maybe.empty() + .flattenStreamAsFlowable(f) + .test() + .assertResult(); + + verify(f, never()).apply(any()); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Maybe.error(new TestException()) + .flattenStreamAsFlowable(f) + .test() + .assertFailure(TestException.class); + + verify(f, never()).apply(any()); + } + + @Test + public void mapperCrash() { + Maybe.just(1) + .flattenStreamAsFlowable(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.never().flattenStreamAsFlowable(Stream::of)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToFlowable(m -> m.flattenStreamAsFlowable(Stream::of)); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(MaybeSubject.create().flattenStreamAsFlowable(Stream::of)); + } + + @Test + public void fusedEmpty() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of()) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + } + + @Test + public void fusedJust() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void fusedMany() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1, 2, 3); + } + + @Test + public void fusedManyRejected() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.SYNC); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1, 2, 3); + } + + @Test + public void manyBackpressured() { + Maybe.just(1) + .flattenStreamAsFlowable(v -> IntStream.rangeClosed(1, 5).boxed()) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(1) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void manyBackpressured2() { + Maybe.just(1) + .flattenStreamAsFlowable(v -> IntStream.rangeClosed(1, 5).boxed()) + .rebatchRequests(1) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(1) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void fusedStreamAvailableLater() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + MaybeSubject ms = MaybeSubject.create(); + + ms + .flattenStreamAsFlowable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertEmpty(); + + ms.onSuccess(1); + + ts + .assertResult(1, 2, 3); + } + + @Test + public void fused() throws Throwable { + AtomicReference> qsr = new AtomicReference<>(); + + MaybeSubject ms = MaybeSubject.create(); + + ms + .flattenStreamAsFlowable(Stream::of) + .subscribe(new FlowableSubscriber() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(@NonNull Subscription s) { + qsr.set((QueueSubscription)s); + } + }); + + QueueSubscription qs = qsr.get(); + + assertEquals(QueueFuseable.ASYNC, qs.requestFusion(QueueFuseable.ASYNC)); + + assertTrue(qs.isEmpty()); + assertNull(qs.poll()); + + ms.onSuccess(1); + + assertFalse(qs.isEmpty()); + assertEquals(1, qs.poll().intValue()); + + assertTrue(qs.isEmpty()); + assertNull(qs.poll()); + + qs.cancel(); + + assertTrue(qs.isEmpty()); + assertNull(qs.poll()); + } + + @Test + public void requestOneByOne() { + TestSubscriber ts = new TestSubscriber<>(); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of(1, 2, 3, 4, 5)) + .subscribe(new FlowableSubscriber() { + + Subscription upstream; + + @Override + public void onSubscribe(@NonNull Subscription s) { + ts.onSubscribe(new BooleanSubscription()); + upstream = s; + s.request(1); + } + + @Override + public void onNext(Integer t) { + ts.onNext(t); + upstream.request(1); + } + + @Override + public void onError(Throwable t) { + ts.onError(t); + } + + @Override + public void onComplete() { + ts.onComplete(); + } + }); + + ts.assertResult(1, 2, 3, 4, 5); + } + + @Test + public void streamCloseCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Maybe.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v).onClose(() -> { throw new TestException(); })) + .test() + .assertResult(1); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void hasNextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + throw new TestException(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> stream) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void nextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + throw new TestException(); + } + }); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> stream) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelAfterHasNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestSubscriber ts = new TestSubscriber<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + ts.cancel(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> stream) + .subscribeWith(ts) + .assertValuesOnly(1); + } + + @Test + public void cancelAfterNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestSubscriber ts = new TestSubscriber<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + ts.cancel(); + return 1; + } + }); + + Maybe.just(1) + .flattenStreamAsFlowable(v -> stream) + .subscribeWith(ts) + .assertEmpty(); + } + + @Test + public void requestSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = new TestSubscriber<>(0L); + + ms.flattenStreamAsFlowable(Stream::of) + .subscribe(ts); + + Runnable r1 = () -> ms.onSuccess(1); + Runnable r2 = () -> ts.request(1); + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservableTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservableTest.java new file mode 100644 index 0000000000..dd0fb5c2ed --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsObservableTest.java @@ -0,0 +1,431 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.testsupport.*; + +public class MaybeFlattenStreamAsObservableTest extends RxJavaTest { + + @Test + public void successJust() { + Maybe.just(1) + .flattenStreamAsObservable(Stream::of) + .test() + .assertResult(1); + } + + @Test + public void successEmpty() { + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of()) + .test() + .assertResult(); + } + + @Test + public void successMany() { + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of(2, 3, 4, 5, 6)) + .test() + .assertResult(2, 3, 4, 5, 6); + } + + @Test + public void successManyTake() { + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of(2, 3, 4, 5, 6)) + .take(3) + .test() + .assertResult(2, 3, 4); + } + + @Test + public void empty() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Maybe.empty() + .flattenStreamAsObservable(f) + .test() + .assertResult(); + + verify(f, never()).apply(any()); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Maybe.error(new TestException()) + .flattenStreamAsObservable(f) + .test() + .assertFailure(TestException.class); + + verify(f, never()).apply(any()); + } + + @Test + public void mapperCrash() { + Maybe.just(1) + .flattenStreamAsObservable(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Maybe.never().flattenStreamAsObservable(Stream::of)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToObservable(m -> m.flattenStreamAsObservable(Stream::of)); + } + + @Test + public void fusedEmpty() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of()) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + } + + @Test + public void fusedJust() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of(v)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void fusedMany() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1, 2, 3); + } + + @Test + public void fusedManyRejected() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.SYNC); + + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1, 2, 3); + } + + @Test + public void fusedStreamAvailableLater() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + MaybeSubject ms = MaybeSubject.create(); + + ms + .flattenStreamAsObservable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertEmpty(); + + ms.onSuccess(1); + + to + .assertResult(1, 2, 3); + } + + @Test + public void fused() throws Throwable { + AtomicReference> qdr = new AtomicReference<>(); + + MaybeSubject ms = MaybeSubject.create(); + + ms + .flattenStreamAsObservable(Stream::of) + .subscribe(new Observer() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(Disposable d) { + qdr.set((QueueDisposable)d); + } + }); + + QueueDisposable qd = qdr.get(); + + assertEquals(QueueFuseable.ASYNC, qd.requestFusion(QueueFuseable.ASYNC)); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + ms.onSuccess(1); + + assertFalse(qd.isEmpty()); + assertEquals(1, qd.poll().intValue()); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + qd.dispose(); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + } + + @Test + public void fused2() throws Throwable { + AtomicReference> qdr = new AtomicReference<>(); + + MaybeSubject ms = MaybeSubject.create(); + + ms + .flattenStreamAsObservable(v -> Stream.of(v, v + 1)) + .subscribe(new Observer() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(Disposable d) { + qdr.set((QueueDisposable)d); + } + }); + + QueueDisposable qd = qdr.get(); + + assertEquals(QueueFuseable.ASYNC, qd.requestFusion(QueueFuseable.ASYNC)); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + ms.onSuccess(1); + + assertFalse(qd.isEmpty()); + assertEquals(1, qd.poll().intValue()); + + assertFalse(qd.isEmpty()); + assertEquals(2, qd.poll().intValue()); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + qd.dispose(); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + } + + @Test + public void streamCloseCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Maybe.just(1) + .flattenStreamAsObservable(v -> Stream.of(v).onClose(() -> { throw new TestException(); })) + .test() + .assertResult(1); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void hasNextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + throw new TestException(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Maybe.just(1) + .flattenStreamAsObservable(v -> stream) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void nextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + throw new TestException(); + } + }); + + Maybe.just(1) + .flattenStreamAsObservable(v -> stream) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelAfterHasNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestObserver to = new TestObserver<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + to.dispose(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Maybe.just(1) + .flattenStreamAsObservable(v -> stream) + .subscribeWith(to) + .assertValuesOnly(1); + } + + @Test + public void cancelAfterNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestObserver to = new TestObserver<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + to.dispose(); + return 1; + } + }); + + Maybe.just(1) + .flattenStreamAsObservable(v -> stream) + .subscribeWith(to) + .assertEmpty(); + } + + @Test + public void cancelSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = new TestObserver<>(); + + ms.flattenStreamAsObservable(Stream::of) + .subscribe(to); + + Runnable r1 = () -> ms.onSuccess(1); + Runnable r2 = () -> to.dispose(); + + TestHelper.race(r1, r2); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTest.java new file mode 100644 index 0000000000..72dc8fb239 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTest.java @@ -0,0 +1,440 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.*; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.*; + +public class SingleFlattenStreamAsFlowableTest extends RxJavaTest { + + @Test + public void successJust() { + Single.just(1) + .flattenStreamAsFlowable(Stream::of) + .test() + .assertResult(1); + } + + @Test + public void successEmpty() { + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of()) + .test() + .assertResult(); + } + + @Test + public void successMany() { + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of(2, 3, 4, 5, 6)) + .test() + .assertResult(2, 3, 4, 5, 6); + } + + @Test + public void successManyTake() { + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of(2, 3, 4, 5, 6)) + .take(3) + .test() + .assertResult(2, 3, 4); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Single.error(new TestException()) + .flattenStreamAsFlowable(f) + .test() + .assertFailure(TestException.class); + + verify(f, never()).apply(any()); + } + + @Test + public void mapperCrash() { + Single.just(1) + .flattenStreamAsFlowable(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.never().flattenStreamAsFlowable(Stream::of)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToFlowable(m -> m.flattenStreamAsFlowable(Stream::of)); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(SingleSubject.create().flattenStreamAsFlowable(Stream::of)); + } + + @Test + public void fusedEmpty() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of()) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + } + + @Test + public void fusedJust() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void fusedMany() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1, 2, 3); + } + + @Test + public void fusedManyRejected() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.SYNC); + + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1, 2, 3); + } + + @Test + public void manyBackpressured() { + Single.just(1) + .flattenStreamAsFlowable(v -> IntStream.rangeClosed(1, 5).boxed()) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(1) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void manyBackpressured2() { + Single.just(1) + .flattenStreamAsFlowable(v -> IntStream.rangeClosed(1, 5).boxed()) + .rebatchRequests(1) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(1) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void fusedStreamAvailableLater() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + SingleSubject ss = SingleSubject.create(); + + ss + .flattenStreamAsFlowable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(ts); + + ts.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertEmpty(); + + ss.onSuccess(1); + + ts + .assertResult(1, 2, 3); + } + + @Test + public void fused() throws Throwable { + AtomicReference> qsr = new AtomicReference<>(); + + SingleSubject ss = SingleSubject.create(); + + ss + .flattenStreamAsFlowable(Stream::of) + .subscribe(new FlowableSubscriber() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(@NonNull Subscription s) { + qsr.set((QueueSubscription)s); + } + }); + + QueueSubscription qs = qsr.get(); + + assertEquals(QueueFuseable.ASYNC, qs.requestFusion(QueueFuseable.ASYNC)); + + assertTrue(qs.isEmpty()); + assertNull(qs.poll()); + + ss.onSuccess(1); + + assertFalse(qs.isEmpty()); + assertEquals(1, qs.poll().intValue()); + + assertTrue(qs.isEmpty()); + assertNull(qs.poll()); + + qs.cancel(); + + assertTrue(qs.isEmpty()); + assertNull(qs.poll()); + } + + @Test + public void requestOneByOne() { + TestSubscriber ts = new TestSubscriber<>(); + + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of(1, 2, 3, 4, 5)) + .subscribe(new FlowableSubscriber() { + + Subscription upstream; + + @Override + public void onSubscribe(@NonNull Subscription s) { + ts.onSubscribe(new BooleanSubscription()); + upstream = s; + s.request(1); + } + + @Override + public void onNext(Integer t) { + ts.onNext(t); + upstream.request(1); + } + + @Override + public void onError(Throwable t) { + ts.onError(t); + } + + @Override + public void onComplete() { + ts.onComplete(); + } + }); + + ts.assertResult(1, 2, 3, 4, 5); + } + + @Test + public void streamCloseCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Single.just(1) + .flattenStreamAsFlowable(v -> Stream.of(v).onClose(() -> { throw new TestException(); })) + .test() + .assertResult(1); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void hasNextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + throw new TestException(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Single.just(1) + .flattenStreamAsFlowable(v -> stream) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void nextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + throw new TestException(); + } + }); + + Single.just(1) + .flattenStreamAsFlowable(v -> stream) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelAfterHasNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestSubscriber ts = new TestSubscriber<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + ts.cancel(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Single.just(1) + .flattenStreamAsFlowable(v -> stream) + .subscribeWith(ts) + .assertValuesOnly(1); + } + + @Test + public void cancelAfterNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestSubscriber ts = new TestSubscriber<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + ts.cancel(); + return 1; + } + }); + + Single.just(1) + .flattenStreamAsFlowable(v -> stream) + .subscribeWith(ts) + .assertEmpty(); + } + + @Test + public void requestSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + SingleSubject ss = SingleSubject.create(); + + TestSubscriber ts = new TestSubscriber<>(0L); + + ss.flattenStreamAsFlowable(Stream::of) + .subscribe(ts); + + Runnable r1 = () -> ss.onSuccess(1); + Runnable r2 = () -> ts.request(1); + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservableTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservableTest.java new file mode 100644 index 0000000000..2b3a624d50 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsObservableTest.java @@ -0,0 +1,418 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.testsupport.*; + +public class SingleFlattenStreamAsObservableTest extends RxJavaTest { + + @Test + public void successJust() { + Single.just(1) + .flattenStreamAsObservable(Stream::of) + .test() + .assertResult(1); + } + + @Test + public void successEmpty() { + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of()) + .test() + .assertResult(); + } + + @Test + public void successMany() { + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of(2, 3, 4, 5, 6)) + .test() + .assertResult(2, 3, 4, 5, 6); + } + + @Test + public void successManyTake() { + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of(2, 3, 4, 5, 6)) + .take(3) + .test() + .assertResult(2, 3, 4); + } + + @Test + public void error() throws Throwable { + @SuppressWarnings("unchecked") + Function> f = mock(Function.class); + + Single.error(new TestException()) + .flattenStreamAsObservable(f) + .test() + .assertFailure(TestException.class); + + verify(f, never()).apply(any()); + } + + @Test + public void mapperCrash() { + Single.just(1) + .flattenStreamAsObservable(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Single.never().flattenStreamAsObservable(Stream::of)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToObservable(m -> m.flattenStreamAsObservable(Stream::of)); + } + + @Test + public void fusedEmpty() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of()) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + } + + @Test + public void fusedJust() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of(v)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1); + } + + @Test + public void fusedMany() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1, 2, 3); + } + + @Test + public void fusedManyRejected() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.SYNC); + + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1, 2, 3); + } + + @Test + public void fusedStreamAvailableLater() { + TestObserverEx to = new TestObserverEx<>(); + to.setInitialFusionMode(QueueFuseable.ANY); + + SingleSubject ss = SingleSubject.create(); + + ss + .flattenStreamAsObservable(v -> Stream.of(v, v + 1, v + 2)) + .subscribe(to); + + to.assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertEmpty(); + + ss.onSuccess(1); + + to + .assertResult(1, 2, 3); + } + + @Test + public void fused() throws Throwable { + AtomicReference> qdr = new AtomicReference<>(); + + SingleSubject ss = SingleSubject.create(); + + ss + .flattenStreamAsObservable(Stream::of) + .subscribe(new Observer() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(Disposable d) { + qdr.set((QueueDisposable)d); + } + }); + + QueueDisposable qd = qdr.get(); + + assertEquals(QueueFuseable.ASYNC, qd.requestFusion(QueueFuseable.ASYNC)); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + ss.onSuccess(1); + + assertFalse(qd.isEmpty()); + assertEquals(1, qd.poll().intValue()); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + qd.dispose(); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + } + + @Test + public void fused2() throws Throwable { + AtomicReference> qdr = new AtomicReference<>(); + + SingleSubject ss = SingleSubject.create(); + + ss + .flattenStreamAsObservable(v -> Stream.of(v, v + 1)) + .subscribe(new Observer() { + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(Disposable d) { + qdr.set((QueueDisposable)d); + } + }); + + QueueDisposable qd = qdr.get(); + + assertEquals(QueueFuseable.ASYNC, qd.requestFusion(QueueFuseable.ASYNC)); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + ss.onSuccess(1); + + assertFalse(qd.isEmpty()); + assertEquals(1, qd.poll().intValue()); + + assertFalse(qd.isEmpty()); + assertEquals(2, qd.poll().intValue()); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + + qd.dispose(); + + assertTrue(qd.isEmpty()); + assertNull(qd.poll()); + } + + @Test + public void streamCloseCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Single.just(1) + .flattenStreamAsObservable(v -> Stream.of(v).onClose(() -> { throw new TestException(); })) + .test() + .assertResult(1); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void hasNextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + throw new TestException(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Single.just(1) + .flattenStreamAsObservable(v -> stream) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void nextThrowsInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + throw new TestException(); + } + }); + + Single.just(1) + .flattenStreamAsObservable(v -> stream) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelAfterHasNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestObserver to = new TestObserver<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + int count; + + @Override + public boolean hasNext() { + if (count++ > 0) { + to.dispose(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + }); + + Single.just(1) + .flattenStreamAsObservable(v -> stream) + .subscribeWith(to) + .assertValuesOnly(1); + } + + @Test + public void cancelAfterNextInDrain() { + @SuppressWarnings("unchecked") + Stream stream = mock(Stream.class); + + TestObserver to = new TestObserver<>(); + + when(stream.iterator()).thenReturn(new Iterator() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Integer next() { + to.dispose(); + return 1; + } + }); + + Single.just(1) + .flattenStreamAsObservable(v -> stream) + .subscribeWith(to) + .assertEmpty(); + } + + @Test + public void cancelSuccessRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + SingleSubject ss = SingleSubject.create(); + + TestObserver to = new TestObserver<>(); + + ss.flattenStreamAsObservable(Stream::of) + .subscribe(to); + + Runnable r1 = () -> ss.onSuccess(1); + Runnable r2 = () -> to.dispose(); + + TestHelper.race(r1, r2); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/validators/JavadocForAnnotations.java b/src/test/java/io/reactivex/rxjava3/validators/JavadocForAnnotations.java index 2f16d05c14..1411295126 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/JavadocForAnnotations.java +++ b/src/test/java/io/reactivex/rxjava3/validators/JavadocForAnnotations.java @@ -156,9 +156,9 @@ static final void scanForBadMethod(StringBuilder sourceCode, String annotation, ; int lc = lineNumber(sourceCode, idx); - e.append(" at io.reactivex.").append(baseClassName) - .append(" (").append(baseClassName).append(".java:") - .append(lc).append(")").append("\r\n\r\n"); + e.append(" at io.reactivex.rxjava3.core.").append(baseClassName) + .append(".method(").append(baseClassName).append(".java:") + .append(lc).append(")").append("\r\n"); } } } diff --git a/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java b/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java index 4a4baaf8ae..fc0d1db9cf 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java +++ b/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java @@ -139,8 +139,11 @@ public void maybeDocRefersToMaybeTypes() throws Exception { int idx = m.javadoc.indexOf("Flowable", jdx); if (idx >= 0) { if (!m.signature.contains("Flowable")) { - e.append("java.lang.RuntimeException: Maybe doc mentions Flowable but not in the signature\r\n at io.reactivex.rxjava3.core.") - .append("Maybe.method(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + Pattern p = Pattern.compile("@see\\s+#[A-Za-z0-9 _.,()]*Flowable"); + if (!p.matcher(m.javadoc).find()) { + e.append("java.lang.RuntimeException: Maybe doc mentions Flowable but not in the signature\r\n at io.reactivex.rxjava3.core.") + .append("Maybe.method(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } } jdx = idx + 6; } else { @@ -180,8 +183,11 @@ public void maybeDocRefersToMaybeTypes() throws Exception { int idx = m.javadoc.indexOf("Observable", jdx); if (idx >= 0) { if (!m.signature.contains("Observable")) { - e.append("java.lang.RuntimeException: Maybe doc mentions Observable but not in the signature\r\n at io.reactivex.rxjava3.core.") - .append("Maybe.method(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + Pattern p = Pattern.compile("@see\\s+#[A-Za-z0-9 _.,()]*Observable"); + if (!p.matcher(m.javadoc).find()) { + e.append("java.lang.RuntimeException: Maybe doc mentions Observable but not in the signature\r\n at io.reactivex.rxjava3.core.") + .append("Maybe.method(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } } jdx = idx + 6; } else { From f455cd232b9f0157e2486f86f40d55ac57aaf9cd Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 28 Dec 2019 16:25:27 +0100 Subject: [PATCH 2/2] Add RS TCK tests for flattenStreamAsFlowable --- .../MaybeFlattenStreamAsFlowableTckTest.java | 40 +++++++++++++++++++ .../SingleFlattenStreamAsFlowableTckTest.java | 40 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTckTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTckTest.java diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTckTest.java new file mode 100644 index 0000000000..94906fdd2f --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MaybeFlattenStreamAsFlowableTckTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class MaybeFlattenStreamAsFlowableTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Maybe.just(1).flattenStreamAsFlowable(v -> IntStream.range(0, (int)elements).boxed()) + ; + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Maybe.just(1).flattenStreamAsFlowable(v -> stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTckTest.java new file mode 100644 index 0000000000..509429d4af --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/SingleFlattenStreamAsFlowableTckTest.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class SingleFlattenStreamAsFlowableTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Single.just(1).flattenStreamAsFlowable(v -> IntStream.range(0, (int)elements).boxed()) + ; + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Single.just(1).flattenStreamAsFlowable(v -> stream); + } +}