From ead65c3aee20e19dd3209e4070533cdc47b4bb6a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 17 Dec 2019 15:11:47 +0100 Subject: [PATCH 1/3] 3.x: [Java 8] Upgrade to Java 8, add Flowable.fromX operators --- build.gradle | 13 +- .../rxjava3/annotations/NonNull.java | 4 +- .../io/reactivex/rxjava3/core/Flowable.java | 172 +++++- .../jdk8/FlowableFromCompletionStage.java | 94 ++++ .../internal/jdk8/FlowableFromStream.java | 299 ++++++++++ .../rxjava3/observers/BaseTestConsumer.java | 5 +- .../io/reactivex/rxjava3/flowable/Burst.java | 2 +- .../jdk8/FlowableFromCompletionStageTest.java | 65 +++ .../jdk8/FlowableFromOptionalTest.java | 38 ++ .../internal/jdk8/FlowableFromStreamTest.java | 515 ++++++++++++++++++ .../jdk8/FromCompletionStageTckTest.java | 48 ++ .../internal/jdk8/FromOptional0TckTest.java | 42 ++ .../internal/jdk8/FromOptional1TckTest.java | 42 ++ .../internal/jdk8/FromStreamTckTest.java | 44 ++ .../internal/operators/observable/Burst.java | 2 +- .../rxjava3/testsupport/TestHelper.java | 106 ++-- .../ParamValidationCheckerTest.java | 10 + 17 files changed, 1416 insertions(+), 85 deletions(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStage.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStageTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromOptionalTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStreamTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FromCompletionStageTckTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional0TckTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional1TckTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FromStreamTckTest.java diff --git a/build.gradle b/build.gradle index 439a0c5680..4dcead40c4 100644 --- a/build.gradle +++ b/build.gradle @@ -66,15 +66,15 @@ apply plugin: "com.jfrog.bintray" apply plugin: "com.jfrog.artifactory" apply plugin: "eclipse" -sourceCompatibility = JavaVersion.VERSION_1_6 -targetCompatibility = JavaVersion.VERSION_1_6 +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 repositories { mavenCentral() } dependencies { - signature "org.codehaus.mojo.signature:java16:1.1@signature" + signature "org.codehaus.mojo.signature:java18:1.0@signature" api "org.reactivestreams:reactive-streams:$reactiveStreamsVersion" jmh "org.reactivestreams:reactive-streams:$reactiveStreamsVersion" @@ -103,14 +103,9 @@ javadoc { options.stylesheetFile = new File(projectDir, "gradle/stylesheet.css"); options.links( - "https://docs.oracle.com/javase/7/docs/api/", + "https://docs.oracle.com/javase/8/docs/api/", "http://www.reactive-streams.org/reactive-streams-${reactiveStreamsVersion}-javadoc/" ) - - if (JavaVersion.current().isJava7()) { - // "./gradle/stylesheet.css" only supports Java 7 - options.addStringOption("stylesheetfile", rootProject.file("./gradle/stylesheet.css").toString()) - } } animalsniffer { diff --git a/src/main/java/io/reactivex/rxjava3/annotations/NonNull.java b/src/main/java/io/reactivex/rxjava3/annotations/NonNull.java index 06f1d4ee8b..03e5180588 100644 --- a/src/main/java/io/reactivex/rxjava3/annotations/NonNull.java +++ b/src/main/java/io/reactivex/rxjava3/annotations/NonNull.java @@ -19,10 +19,10 @@ import java.lang.annotation.*; /** - * Indicates that a field/parameter/variable/return type is never null. + * Indicates that a field/parameter/variable/type parameter/return type is never null. */ @Documented -@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE}) +@Target(value = {FIELD, METHOD, PARAMETER, LOCAL_VARIABLE, TYPE_PARAMETER, TYPE_USE}) @Retention(value = CLASS) public @interface NonNull { } diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 23e666896d..b8cd707d57 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -14,6 +14,7 @@ import java.util.*; import java.util.concurrent.*; +import java.util.stream.*; import org.reactivestreams.*; @@ -24,6 +25,7 @@ import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.*; import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier; +import io.reactivex.rxjava3.internal.jdk8.*; import io.reactivex.rxjava3.internal.operators.flowable.*; import io.reactivex.rxjava3.internal.operators.mixed.*; import io.reactivex.rxjava3.internal.operators.observable.ObservableFromPublisher; @@ -211,6 +213,7 @@ public static Flowable amb(Iterable> sou @NonNull @BackpressureSupport(BackpressureKind.PASS_THROUGH) @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs public static Flowable ambArray(Publisher... sources) { ObjectHelper.requireNonNull(sources, "sources is null"); int len = sources.length; @@ -1224,7 +1227,6 @@ public static Flowable concat(Publisher> * without interleaving them * @see ReactiveX operators documentation: Concat */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -1261,7 +1263,6 @@ public static Flowable concat(Publisher source1, PublisherReactiveX operators documentation: Concat */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -1303,7 +1304,6 @@ public static Flowable concat( * without interleaving them * @see ReactiveX operators documentation: Concat */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -1341,6 +1341,7 @@ public static Flowable concat( @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs public static Flowable concatArray(Publisher... sources) { if (sources.length == 0) { return empty(); @@ -1373,6 +1374,7 @@ public static Flowable concatArray(Publisher... sources) { @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs public static Flowable concatArrayDelayError(Publisher... sources) { if (sources.length == 0) { return empty(); @@ -1408,6 +1410,7 @@ public static Flowable concatArrayDelayError(Publisher... so @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs public static Flowable concatArrayEager(Publisher... sources) { return concatArrayEager(bufferSize(), bufferSize(), sources); } @@ -1475,6 +1478,7 @@ public static Flowable concatArrayEager(int maxConcurrency, int prefetch, @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @BackpressureSupport(BackpressureKind.FULL) + @SafeVarargs public static Flowable concatArrayEagerDelayError(Publisher... sources) { return concatArrayEagerDelayError(bufferSize(), bufferSize(), sources); } @@ -1914,6 +1918,7 @@ public static Flowable error(final Throwable throwable) { @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs public static Flowable fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { @@ -1977,6 +1982,10 @@ public static Flowable fromCallable(Callable supplier) { *

* Important note: This Publisher is blocking on the thread it gets subscribed on; you cannot cancel it. *

+ * Also note that this operator will consume a {@link CompletionStage}-based {@code Future} subclass (such as + * {@link CompletableFuture}) in a blocking manner as well. Use the {@link #fromCompletionStage(CompletionStage)} + * operator to convert and consume such sources in a non-blocking fashion instead. + *

* Unlike 1.x, canceling the Flowable won't cancel the future. If necessary, one can use composition to achieve the * cancellation effect: {@code futurePublisher.doOnCancel(() -> future.cancel(true));}. *

@@ -1993,6 +2002,7 @@ public static Flowable fromCallable(Callable supplier) { * the resulting Publisher * @return a Flowable that emits the item from the source {@link Future} * @see ReactiveX operators documentation: From + * @see #fromCompletionStage(CompletionStage) */ @CheckReturnValue @NonNull @@ -2016,6 +2026,10 @@ public static Flowable fromFuture(Future future) { * cancellation effect: {@code futurePublisher.doOnCancel(() -> future.cancel(true));}. *

* Important note: This Publisher is blocking on the thread it gets subscribed on; you cannot cancel it. + *

+ * Also note that this operator will consume a {@link CompletionStage}-based {@code Future} subclass (such as + * {@link CompletableFuture}) in a blocking manner as well. Use the {@link #fromCompletionStage(CompletionStage)} + * operator to convert and consume such sources in a non-blocking fashion instead. *

*
Backpressure:
*
The operator honors backpressure from downstream.
@@ -2034,6 +2048,7 @@ public static Flowable fromFuture(Future future) { * the resulting Publisher * @return a Flowable that emits the item from the source {@link Future} * @see ReactiveX operators documentation: From + * @see #fromCompletionStage(CompletionStage) */ @CheckReturnValue @NonNull @@ -2058,6 +2073,10 @@ public static Flowable fromFuture(Future future, long timeou * cancellation effect: {@code futurePublisher.doOnCancel(() -> future.cancel(true));}. *

* Important note: This Publisher is blocking; you cannot cancel it. + *

+ * Also note that this operator will consume a {@link CompletionStage}-based {@code Future} subclass (such as + * {@link CompletableFuture}) in a blocking manner as well. Use the {@link #fromCompletionStage(CompletionStage)} + * operator to convert and consume such sources in a non-blocking fashion instead. *

*
Backpressure:
*
The operator honors backpressure from downstream.
@@ -2079,8 +2098,9 @@ public static Flowable fromFuture(Future future, long timeou * the resulting Publisher * @return a Flowable that emits the item from the source {@link Future} * @see ReactiveX operators documentation: From + * @see #fromCompletionStage(CompletionStage) */ - @SuppressWarnings({ "unchecked", "cast" }) + @SuppressWarnings({ "unchecked" }) @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2119,7 +2139,7 @@ public static Flowable fromFuture(Future future, long timeou * @return a Flowable that emits the item from the source {@link Future} * @see ReactiveX operators documentation: From */ - @SuppressWarnings({ "cast", "unchecked" }) + @SuppressWarnings({ "unchecked" }) @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2148,6 +2168,7 @@ public static Flowable fromFuture(Future future, Scheduler s * resulting Publisher * @return a Flowable that emits each item in the source {@link Iterable} sequence * @see ReactiveX operators documentation: From + * @see #fromStream(Stream) */ @CheckReturnValue @NonNull @@ -2656,7 +2677,6 @@ public static Flowable just(T item) { * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2690,7 +2710,6 @@ public static Flowable just(T item1, T item2) { * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2727,7 +2746,6 @@ public static Flowable just(T item1, T item2, T item3) { * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2767,7 +2785,6 @@ public static Flowable just(T item1, T item2, T item3, T item4) { * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2810,7 +2827,6 @@ public static Flowable just(T item1, T item2, T item3, T item4, T item5) * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2856,7 +2872,6 @@ public static Flowable just(T item1, T item2, T item3, T item4, T item5, * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2905,7 +2920,6 @@ public static Flowable just(T item1, T item2, T item3, T item4, T item5, * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -2957,7 +2971,6 @@ public static Flowable just(T item1, T item2, T item3, T item4, T item5, * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -3012,7 +3025,6 @@ public static Flowable just(T item1, T item2, T item3, T item4, T item5, * @return a Flowable that emits each item * @see ReactiveX operators documentation: Just */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -4655,7 +4667,6 @@ public static Flowable zip(Iterable> * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -4717,7 +4728,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -4780,7 +4790,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -4845,7 +4854,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -4914,7 +4922,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -4988,7 +4995,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -5065,7 +5071,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -5146,7 +5151,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -5232,7 +5236,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -5322,7 +5325,6 @@ public static Flowable zip( * @return a Flowable that emits the zipped results * @see ReactiveX operators documentation: Zip */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -5398,6 +5400,7 @@ public static Flowable zip( @NonNull @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs public static Flowable zipArray(Function zipper, boolean delayError, int bufferSize, Publisher... sources) { if (sources.length == 0) { @@ -5460,7 +5463,6 @@ public final Single all(Predicate predicate) { * emitted an item or sent a termination notification * @see ReactiveX operators documentation: Amb */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -14548,7 +14550,6 @@ public final Flowable sorted(Comparator sortFunction) { * @see #startWithItem(Object) * @since 3.0.0 */ - @SuppressWarnings("unchecked") @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @@ -14576,7 +14577,6 @@ public final Flowable startWithIterable(Iterable items) { * emitted by the source Publisher * @see ReactiveX operators documentation: StartWith */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -14609,7 +14609,6 @@ public final Flowable startWith(Publisher other) { * @see #startWithIterable(Iterable) * @since 3.0.0 */ - @SuppressWarnings("unchecked") @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.FULL) @@ -18620,4 +18619,121 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No return ts; } + // ------------------------------------------------------------------------- + // JDK 8 Support + // ------------------------------------------------------------------------- + + /** + * Converts the existing value of the provided optional into a {@link #just(Object)} + * or an empty optional into an {@link #empty()} {@code Flowable} instance. + *

+ * + *

+ * Note that the operator takes an already instantiated optional reference and does not + * by any means create this original optional. If the optional is to be created per + * consumer upon subscription, use {@link #defer(Supplier)} around {@code fromOptional}: + *


+     * Flowable.defer(() -> Flowable.fromOptional(createOptional()));
+     * 
+ *
+ *
Backpressure:
+ *
The returned {@code Flowable} supports backpressure.
+ *
Scheduler:
+ *
{@code fromOptional} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the optional value + * @param optional the optional value to convert into a {@code Flowable} + * @return the new Flowable instance + * @see #just(Object) + * @see #empty() + * @since 3.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable fromOptional(@NonNull Optional optional) { + ObjectHelper.requireNonNull(optional, "optional is null"); + return optional.map(Flowable::just).orElseGet(Flowable::empty); + } + + /** + * Signals the completion value or error of the given (hot) {@link CompletionStage}-based asynchronous calculation. + *

+ * + *

+ * Note that the operator takes an already instantiated, running or terminated {@code CompletionStage}. + * If the optional is to be created per consumer upon subscription, use {@link #defer(Supplier)} + * around {@code fromCompletionStage}: + *


+     * Flowable.defer(() -> Flowable.fromCompletionStage(createCompletionStage()));
+     * 
+ *

+ * If the {@code CompletionStage} completes with {@code null}, a {@link NullPointerException} is signaled. + *

+ * Canceling the flow can't cancel the execution of the {@code CompletionStage} because {@code CompletionStage} + * itself doesn't support cancellation. Instead, the operator detaches from the {@code CompletionStage}. + *

+ *
Backpressure:
+ *
The returned {@code Flowable} supports backpressure and caches the completion value until the + * downstream is ready to receive it.
+ *
Scheduler:
+ *
{@code fromCompletionStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the CompletionStage + * @param stage the CompletionStage to convert to Flowable and signal its terminal value or error + * @return the new Flowable instance + * @since 3.0.0 + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable fromCompletionStage(@NonNull CompletionStage stage) { + ObjectHelper.requireNonNull(stage, "stage is null"); + return RxJavaPlugins.onAssembly(new FlowableFromCompletionStage<>(stage)); + } + + /** + * Converts a {@link Stream} into a finite {@code Flowable} and emits its items in the sequence. + *

+ * + *

+ * The operator closes the {@code Stream} upon cancellation and when it terminates. Exceptions raised when + * closing a {@code Stream} are routed to the global error handler ({@link RxJavaPlugins#onError(Throwable)}. + * If a {@code Stream} should not be closed, turn it into an {@link Iterable} and use {@link #fromIterable(Iterable)}: + *


+     * Stream<T> stream = ...
+     * Flowable.fromIterable(stream::iterator);
+     * 
+ *

+ * Note that {@code Stream}s can be consumed only once; any subsequent attempt to consume a {@code Stream} + * will result in an {@link IllegalStateException}. + *

+ * Primitive streams are not supported and items have to be boxed manually (e.g., via {@link IntStream#boxed()}): + *


+     * IntStream intStream = IntStream.rangeClosed(1, 10);
+     * Flowable.fromStream(intStream.boxed());
+     * 
+ *

+ * {@code Stream} does not support concurrent usage so creating and/or consuming the same instance multiple times + * from multiple threads can lead to undefined behavior. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream and iterates the given {@code Stream} + * on demand (i.e., when requested).
+ *
Scheduler:
+ *
{@code fromStream} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the element type of the source {@code Stream} + * @param stream the {@code Stream} of values to emit + * @return the new Flowable instance + * @since 3.0.0 + * @see #fromIterable(Iterable) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public static Flowable fromStream(@NonNull Stream stream) { + ObjectHelper.requireNonNull(stream, "stream is null"); + return RxJavaPlugins.onAssembly(new FlowableFromStream<>(stream)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStage.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStage.java new file mode 100644 index 0000000000..f864cdd96d --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStage.java @@ -0,0 +1,94 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.internal.subscriptions.DeferredScalarSubscription; + +/** + * Wrap a CompletionStage and signal its outcome. + * @param the element type + * @since 3.0.0 + */ +public final class FlowableFromCompletionStage extends Flowable { + + final CompletionStage stage; + + public FlowableFromCompletionStage(CompletionStage stage) { + this.stage = stage; + } + + @Override + protected void subscribeActual(Subscriber s) { + // We need an indirection because one can't detach from a whenComplete + // and cancellation should not hold onto the stage. + BiConsumerAtomicReference whenReference = new BiConsumerAtomicReference<>(); + CompletionStageHandler handler = new CompletionStageHandler<>(s, whenReference); + whenReference.lazySet(handler); + + s.onSubscribe(handler); + stage.whenComplete(whenReference); + } + + static final class CompletionStageHandler + extends DeferredScalarSubscription + implements BiConsumer { + + private static final long serialVersionUID = 4665335664328839859L; + + final BiConsumerAtomicReference whenReference; + + CompletionStageHandler(Subscriber downstream, BiConsumerAtomicReference whenReference) { + super(downstream); + this.whenReference = whenReference; + } + + @Override + public void accept(T item, Throwable error) { + if (error != null) { + downstream.onError(error); + } + else if (item != null) { + complete(item); + } else { + downstream.onError(new NullPointerException("The CompletionStage terminated with null.")); + } + } + + @Override + public void cancel() { + super.cancel(); + whenReference.set(null); + } + } + + static final class BiConsumerAtomicReference extends AtomicReference> + implements BiConsumer { + + private static final long serialVersionUID = 45838553147237545L; + + @Override + public void accept(T t, Throwable u) { + BiConsumer biConsumer = get(); + if (biConsumer != null) { + biConsumer.accept(t, u); + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java new file mode 100644 index 0000000000..db34238b7a --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStream.java @@ -0,0 +1,299 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.annotations.*; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.internal.functions.ObjectHelper; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.subscriptions.*; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Wraps a {@link Stream} and emits its values as a Flowable sequence. + * @param the element type of the Stream + * @since 3.0.0 + */ +public final class FlowableFromStream extends Flowable { + + final Stream stream; + + public FlowableFromStream(Stream stream) { + this.stream = stream; + } + + @Override + protected void subscribeActual(Subscriber s) { + Iterator iterator; + try { + iterator = stream.iterator(); + + if (!iterator.hasNext()) { + EmptySubscription.complete(s); + closeSafely(stream); + return; + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptySubscription.error(ex, s); + closeSafely(stream); + return; + } + + if (s instanceof ConditionalSubscriber) { + s.onSubscribe(new StreamConditionalSubscription((ConditionalSubscriber)s, iterator, stream)); + } else { + s.onSubscribe(new StreamSubscription<>(s, iterator, stream)); + } + } + + static void closeSafely(AutoCloseable c) { + try { + c.close(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + RxJavaPlugins.onError(ex); + } + } + + abstract static class AbstractStreamSubscription extends AtomicLong implements QueueSubscription { + + private static final long serialVersionUID = -9082954702547571853L; + + Iterator iterator; + + AutoCloseable closeable; + + volatile boolean cancelled; + + boolean once; + + AbstractStreamSubscription(Iterator iterator, AutoCloseable closeable) { + this.iterator = iterator; + this.closeable = closeable; + } + + @Override + public void request(long n) { + if (SubscriptionHelper.validate(n)) { + if (BackpressureHelper.add(this, n) == 0L) { + run(n); + } + } + } + + abstract void run(long n); + + @Override + public void cancel() { + cancelled = true; + request(1L); + } + + @Override + public int requestFusion(int mode) { + if ((mode & SYNC) != 0) { + lazySet(Long.MAX_VALUE); + return SYNC; + } + return NONE; + } + + @Override + public boolean offer(@NonNull T value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(@NonNull T v1, @NonNull T v2) { + throw new UnsupportedOperationException(); + } + + @Nullable + @Override + public T poll() { + if (iterator == null) { + return null; + } + if (!once) { + once = true; + } else { + if (!iterator.hasNext()) { + return null; + } + } + return ObjectHelper.requireNonNull(iterator.next(), "Iterator.next() returned a null value"); + } + + @Override + public boolean isEmpty() { + return iterator == null || !iterator.hasNext(); + } + + @Override + public void clear() { + iterator = null; + AutoCloseable c = closeable; + closeable = null; + if (c != null) { + closeSafely(c); + } + } + } + + static final class StreamSubscription extends AbstractStreamSubscription { + + private static final long serialVersionUID = -9082954702547571853L; + + final Subscriber downstream; + + StreamSubscription(Subscriber downstream, Iterator iterator, AutoCloseable closeable) { + super(iterator, closeable); + this.downstream = downstream; + } + + @Override + public void run(long n) { + long emitted = 0L; + Iterator iterator = this.iterator; + Subscriber downstream = this.downstream; + + for (;;) { + + if (cancelled) { + clear(); + break; + } else { + T next; + try { + next = ObjectHelper.requireNonNull(iterator.next(), "The Stream's Iterator returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + cancelled = true; + continue; + } + + downstream.onNext(next); + + if (cancelled) { + continue; + } + + try { + if (!iterator.hasNext()) { + downstream.onComplete(); + cancelled = true; + continue; + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + cancelled = true; + continue; + } + + if (++emitted != n) { + continue; + } + } + + n = get(); + if (emitted == n) { + if (compareAndSet(n, 0L)) { + break; + } + n = get(); + } + } + } + } + + static final class StreamConditionalSubscription extends AbstractStreamSubscription { + + private static final long serialVersionUID = -9082954702547571853L; + + final ConditionalSubscriber downstream; + + StreamConditionalSubscription(ConditionalSubscriber downstream, Iterator iterator, AutoCloseable closeable) { + super(iterator, closeable); + this.downstream = downstream; + } + + @Override + public void run(long n) { + long emitted = 0L; + Iterator iterator = this.iterator; + ConditionalSubscriber downstream = this.downstream; + + for (;;) { + + if (cancelled) { + clear(); + break; + } else { + T next; + try { + next = ObjectHelper.requireNonNull(iterator.next(), "The Stream's Iterator returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + cancelled = true; + continue; + } + + if (downstream.tryOnNext(next)) { + emitted++; + } + + if (cancelled) { + continue; + } + + try { + if (!iterator.hasNext()) { + downstream.onComplete(); + cancelled = true; + continue; + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + cancelled = true; + continue; + } + + if (emitted != n) { + continue; + } + } + + n = get(); + if (emitted == n) { + if (compareAndSet(n, 0L)) { + break; + } + n = get(); + } + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java b/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java index ee2dcf1853..59aa62e796 100644 --- a/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java +++ b/src/main/java/io/reactivex/rxjava3/observers/BaseTestConsumer.java @@ -228,7 +228,7 @@ public final U assertError(Throwable error) { * @param errorClass the error class to expect * @return this */ - @SuppressWarnings({ "unchecked", "rawtypes", "cast" }) + @SuppressWarnings({ "unchecked", "rawtypes" }) public final U assertError(Class errorClass) { return (U)assertError((Predicate)Functions.isInstanceOf(errorClass)); } @@ -435,6 +435,7 @@ public final U assertValues(T... values) { * @return this * @since 2.2 */ + @SafeVarargs public final U assertValuesOnly(T... values) { return assertSubscribed() .assertValues(values) @@ -493,6 +494,7 @@ public final U assertValueSequence(Iterable sequence) { * @return this * @see #assertFailure(Class, Object...) */ + @SafeVarargs public final U assertResult(T... values) { return assertSubscribed() .assertValues(values) @@ -507,6 +509,7 @@ public final U assertResult(T... values) { * @param values the expected values, asserted in order * @return this */ + @SafeVarargs public final U assertFailure(Class error, T... values) { return assertSubscribed() .assertValues(values) diff --git a/src/test/java/io/reactivex/rxjava3/flowable/Burst.java b/src/test/java/io/reactivex/rxjava3/flowable/Burst.java index 8acf477242..1e6c255a30 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/Burst.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/Burst.java @@ -52,11 +52,11 @@ protected void subscribeActual(final Subscriber subscriber) { } - @SuppressWarnings("unchecked") public static Builder item(T item) { return items(item); } + @SafeVarargs public static Builder items(T... items) { return new Builder(Arrays.asList(items)); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStageTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStageTest.java new file mode 100644 index 0000000000..8a71bbd9b3 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStageTest.java @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletableFuture; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.subscribers.TestSubscriber; + +public class FlowableFromCompletionStageTest extends RxJavaTest { + + @Test + public void syncSuccess() { + Flowable.fromCompletionStage(CompletableFuture.completedFuture(1)) + .test() + .assertResult(1); + } + + @Test + public void syncFailure() { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(new TestException()); + + Flowable.fromCompletionStage(cf) + .test() + .assertFailure(TestException.class); + } + + @Test + public void syncNull() { + Flowable.fromCompletionStage(CompletableFuture.completedFuture(null)) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void cancel() { + CompletableFuture cf = new CompletableFuture<>(); + + TestSubscriber ts = Flowable.fromCompletionStage(cf) + .test(); + + ts.assertEmpty(); + + ts.cancel(); + + cf.complete(1); + + ts.assertEmpty(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromOptionalTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromOptionalTest.java new file mode 100644 index 0000000000..6676bcee61 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromOptionalTest.java @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Optional; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; + +public class FlowableFromOptionalTest extends RxJavaTest { + + @Test + public void hasValue() { + Flowable.fromOptional(Optional.of(1)) + .test() + .assertResult(1); + } + + @Test + public void empty() { + Flowable.fromOptional(Optional.empty()) + .test() + .assertResult(); + } + +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStreamTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStreamTest.java new file mode 100644 index 0000000000..940ac95c4d --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromStreamTest.java @@ -0,0 +1,515 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.stream.*; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.testsupport.*; + +public class FlowableFromStreamTest extends RxJavaTest { + + @Test + public void empty() { + Flowable.fromStream(Stream.of()) + .test() + .assertResult(); + } + + @Test + public void just() { + Flowable.fromStream(Stream.of(1)) + .test() + .assertResult(1); + } + + @Test + public void many() { + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5)) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void manyBackpressured() { + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5)) + .test(0L) + .assertEmpty() + .requestMore(1) + .assertValuesOnly(1) + .requestMore(2) + .assertValuesOnly(1, 2, 3) + .requestMore(2) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void noReuse() { + Flowable source = Flowable.fromStream(Stream.of(1, 2, 3, 4, 5)); + + source + .test() + .assertResult(1, 2, 3, 4, 5); + + source + .test() + .assertFailure(IllegalStateException.class); + } + + @Test + public void take() { + Flowable.fromStream(IntStream.rangeClosed(1, 10).boxed()) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void emptyConditional() { + Flowable.fromStream(Stream.of()) + .filter(v -> true) + .test() + .assertResult(); + } + + @Test + public void justConditional() { + Flowable.fromStream(Stream.of(1)) + .filter(v -> true) + .test() + .assertResult(1); + } + + @Test + public void manyConditional() { + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5)) + .filter(v -> true) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void manyBackpressuredConditional() { + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5)) + .filter(v -> true) + .test(0L) + .assertEmpty() + .requestMore(1) + .assertValuesOnly(1) + .requestMore(2) + .assertValuesOnly(1, 2, 3) + .requestMore(2) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void manyConditionalSkip() { + Flowable.fromStream(IntStream.rangeClosed(1, 10).boxed()) + .filter(v -> v % 2 == 0) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void takeConditional() { + Flowable.fromStream(IntStream.rangeClosed(1, 10).boxed()) + .filter(v -> true) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void noOfferNoCrashAfterClear() throws Throwable { + AtomicReference> queue = new AtomicReference<>(); + + Flowable.fromStream(IntStream.rangeClosed(1, 10).boxed()) + .subscribe(new FlowableSubscriber() { + @Override + public void onSubscribe(@NonNull Subscription s) { + queue.set((SimpleQueue)s); + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + SimpleQueue q = queue.get(); + TestHelper.assertNoOffer(q); + + assertFalse(q.isEmpty()); + + q.clear(); + + assertNull(q.poll()); + + assertTrue(q.isEmpty()); + + q.clear(); + + assertNull(q.poll()); + + assertTrue(q.isEmpty()); + } + + @Test + public void streamOfNull() { + Flowable.fromStream(Stream.of((Integer)null)) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void streamOfNullConditional() { + Flowable.fromStream(Stream.of((Integer)null)) + .filter(v -> true) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void syncFusionSupport() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ANY); + + Flowable.fromStream(IntStream.rangeClosed(1, 10).boxed()) + .subscribeWith(ts) + .assertFuseable() + .assertFusionMode(QueueFuseable.SYNC) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void asyncFusionNotSupported() { + TestSubscriberEx ts = new TestSubscriberEx<>(); + ts.setInitialFusionMode(QueueFuseable.ASYNC); + + Flowable.fromStream(IntStream.rangeClosed(1, 10).boxed()) + .subscribeWith(ts) + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void fusedForParallel() { + Flowable.fromStream(IntStream.rangeClosed(1, 1000).boxed()) + .parallel() + .runOn(Schedulers.computation(), 1) + .map(v -> v + 1) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(1000) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void runToEndCloseCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Stream stream = Stream.of(1, 2, 3, 4, 5).onClose(() -> { throw new TestException(); }); + + Flowable.fromStream(stream) + .test() + .assertResult(1, 2, 3, 4, 5); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void takeCloseCrash() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Stream stream = Stream.of(1, 2, 3, 4, 5).onClose(() -> { throw new TestException(); }); + + Flowable.fromStream(stream) + .take(3) + .test() + .assertResult(1, 2, 3); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void hasNextCrash() { + AtomicInteger v = new AtomicInteger(); + Flowable.fromStream(Stream.generate(() -> { + int value = v.getAndIncrement(); + if (value == 1) { + throw new TestException(); + } + return value; + })) + .test() + .assertFailure(TestException.class, 0); + } + + @Test + public void hasNextCrashConditional() { + AtomicInteger counter = new AtomicInteger(); + Flowable.fromStream(Stream.generate(() -> { + int value = counter.getAndIncrement(); + if (value == 1) { + throw new TestException(); + } + return value; + })) + .filter(v -> true) + .test() + .assertFailure(TestException.class, 0); + } + + void requestOneByOneBase(boolean conditional) { + List list = new ArrayList<>(); + + Flowable source = Flowable.fromStream(IntStream.rangeClosed(1, 10).boxed()); + if (conditional) { + source = source.filter(v -> true); + } + + source.subscribe(new FlowableSubscriber() { + + @NonNull Subscription upstream; + + @Override + public void onSubscribe(@NonNull Subscription s) { + this.upstream = s; + s.request(1); + } + + @Override + public void onNext(Integer t) { + list.add(t); + upstream.request(1); + } + + @Override + public void onError(Throwable t) { + list.add(t); + } + + @Override + public void onComplete() { + list.add(100); + } + }); + + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 100), list); + } + + @Test + public void requestOneByOne() { + requestOneByOneBase(false); + } + + @Test + public void requestOneByOneConditional() { + requestOneByOneBase(true); + } + + void requestRaceBase(boolean conditional) throws Exception { + ExecutorService exec = Executors.newCachedThreadPool(); + try { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + + AtomicInteger counter = new AtomicInteger(); + + int max = 100; + + Flowable source = Flowable.fromStream(IntStream.rangeClosed(1, max).boxed()); + if (conditional) { + source = source.filter(v -> true); + } + + CountDownLatch cdl = new CountDownLatch(1); + + source + .subscribe(new FlowableSubscriber() { + + @NonNull Subscription upstream; + + @Override + public void onSubscribe(@NonNull Subscription s) { + + this.upstream = s; + s.request(1); + + } + + @Override + public void onNext(Integer t) { + counter.getAndIncrement(); + + AtomicInteger sync = new AtomicInteger(2); + exec.submit(() -> { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + upstream.request(1); + }); + + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + cdl.countDown(); + } + + @Override + public void onComplete() { + counter.getAndIncrement(); + cdl.countDown(); + } + }); + + assertTrue(cdl.await(60, TimeUnit.SECONDS)); + + assertEquals(max + 1, counter.get()); + } + } finally { + exec.shutdown(); + } + } + + @Test + public void requestRace() throws Exception { + requestRaceBase(false); + } + + @Test + public void requestRaceConditional() throws Exception { + requestRaceBase(true); + } + + @Test + public void closeCalledOnEmpty() { + AtomicInteger calls = new AtomicInteger(); + + Flowable.fromStream(Stream.of().onClose(() -> calls.getAndIncrement())) + .test() + .assertResult(); + + assertEquals(1, calls.get()); + } + + @Test + public void closeCalledAfterItems() { + AtomicInteger calls = new AtomicInteger(); + + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5).onClose(() -> calls.getAndIncrement())) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls.get()); + } + + @Test + public void closeCalledOnCancel() { + AtomicInteger calls = new AtomicInteger(); + + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5).onClose(() -> calls.getAndIncrement())) + .take(3) + .test() + .assertResult(1, 2, 3); + + assertEquals(1, calls.get()); + } + + @Test + public void closeCalledOnItemCrash() { + AtomicInteger calls = new AtomicInteger(); + AtomicInteger counter = new AtomicInteger(); + Flowable.fromStream(Stream.generate(() -> { + int value = counter.getAndIncrement(); + if (value == 1) { + throw new TestException(); + } + return value; + }).onClose(() -> calls.getAndIncrement())) + .test() + .assertFailure(TestException.class, 0); + + assertEquals(1, calls.get()); + } + + @Test + public void closeCalledAfterItemsConditional() { + AtomicInteger calls = new AtomicInteger(); + + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5).onClose(() -> calls.getAndIncrement())) + .filter(v -> true) + .test() + .assertResult(1, 2, 3, 4, 5); + + assertEquals(1, calls.get()); + } + + @Test + public void closeCalledOnCancelConditional() { + AtomicInteger calls = new AtomicInteger(); + + Flowable.fromStream(Stream.of(1, 2, 3, 4, 5).onClose(() -> calls.getAndIncrement())) + .filter(v -> true) + .take(3) + .test() + .assertResult(1, 2, 3); + + assertEquals(1, calls.get()); + } + + @Test + public void closeCalledOnItemCrashConditional() { + AtomicInteger calls = new AtomicInteger(); + AtomicInteger counter = new AtomicInteger(); + Flowable.fromStream(Stream.generate(() -> { + int value = counter.getAndIncrement(); + if (value == 1) { + throw new TestException(); + } + return value; + }).onClose(() -> calls.getAndIncrement())) + .filter(v -> true) + .test() + .assertFailure(TestException.class, 0); + + assertEquals(1, calls.get()); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromCompletionStageTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromCompletionStageTckTest.java new file mode 100644 index 0000000000..2c48611cd9 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromCompletionStageTckTest.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class FromCompletionStageTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.fromCompletionStage(CompletableFuture.completedFuture(1L)) + ; + } + + @Override + public Publisher createFailedPublisher() { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(new TestException()); + return + Flowable.fromCompletionStage(cf) + ; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional0TckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional0TckTest.java new file mode 100644 index 0000000000..76c1246eb3 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional0TckTest.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Optional; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +/** + * Test Optional.empty() wrapping. + * @see FromOptional1TckTest + */ +@Test +public class FromOptional0TckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.fromOptional(Optional.empty()) + ; + } + + @Override + public long maxElementsFromPublisher() { + return 0; + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional1TckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional1TckTest.java new file mode 100644 index 0000000000..27e596d708 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromOptional1TckTest.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Optional; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +/** + * Test Optional.of wrapping. + * @see FromOptional0TckTest + */ +@Test +public class FromOptional1TckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.fromOptional(Optional.of(1L)) + ; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromStreamTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromStreamTckTest.java new file mode 100644 index 0000000000..06f3b02b97 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FromStreamTckTest.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.stream.*; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +/** + * Test Optional.of wrapping. + * @see FromOptional0TckTest + */ +@Test +public class FromStreamTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.fromStream(IntStream.rangeClosed(1, (int)elements).boxed()) + ; + } + + @Override + public Publisher createFailedPublisher() { + Stream stream = Stream.of(1); + stream.forEach(v -> { }); + return Flowable.fromStream(stream); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/Burst.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/Burst.java index 4fa76d1628..a7aca59ada 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/Burst.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/Burst.java @@ -47,11 +47,11 @@ protected void subscribeActual(final Observer observer) { } } - @SuppressWarnings("unchecked") public static Builder item(T item) { return items(item); } + @SafeVarargs public static Builder items(T... items) { return new Builder(Arrays.asList(items)); } diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java index 9037d203f6..1f2a834650 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java @@ -624,7 +624,7 @@ public static void doubleOnSubscribe(MaybeObserver observer) { * @param source the source to test */ public static void checkDisposed(Flowable source) { - final TestSubscriber ts = new TestSubscriber(0L); + final TestSubscriber ts = new TestSubscriber<>(0L); source.subscribe(new FlowableSubscriber() { @Override public void onSubscribe(Subscription s) { @@ -2123,10 +2123,10 @@ protected void subscribeActual(CompletableObserver observer) { public static void checkDisposedMaybe(Function, ? extends MaybeSource> composer) { PublishProcessor pp = PublishProcessor.create(); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); try { - new MaybeToFlowable(composer.apply(pp.singleElement())).subscribe(ts); + new MaybeToFlowable<>(composer.apply(pp.singleElement())).subscribe(ts); } catch (Throwable ex) { throw ExceptionHelper.wrapOrThrow(ex); } @@ -2145,7 +2145,7 @@ public static void checkDisposedMaybe(Function, ? extends MaybeS public static void checkDisposedCompletable(Function composer) { PublishProcessor pp = PublishProcessor.create(); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); try { new CompletableToFlowable(composer.apply(pp.ignoreElements())).subscribe(ts); @@ -2169,10 +2169,10 @@ public static void checkDisposedCompletable(Function void checkDisposedMaybeToSingle(Function, ? extends SingleSource> composer) { PublishProcessor pp = PublishProcessor.create(); - TestSubscriber ts = new TestSubscriber(); + TestSubscriber ts = new TestSubscriber<>(); try { - new SingleToFlowable(composer.apply(pp.singleElement())).subscribe(ts); + new SingleToFlowable<>(composer.apply(pp.singleElement())).subscribe(ts); } catch (Throwable ex) { throw ExceptionHelper.wrapOrThrow(ex); } @@ -2190,6 +2190,7 @@ public static void checkDisposedMaybeToSingle(Function, ? extend * @param ts the TestSubscriber instance * @param classes the array of expected Throwables inside the Composite */ + @SafeVarargs public static void assertCompositeExceptions(TestSubscriberEx ts, Class... classes) { ts .assertSubscribed() @@ -2234,6 +2235,7 @@ public static void assertCompositeExceptions(TestSubscriberEx ts, Object... c * @param to the TestSubscriber instance * @param classes the array of expected Throwables inside the Composite */ + @SafeVarargs public static void assertCompositeExceptions(TestObserverEx to, Class... classes) { to .assertSubscribed() @@ -2278,6 +2280,7 @@ public static void assertCompositeExceptions(TestObserverEx to, Object... cla * @param p the target processor * @param values the values to emit */ + @SafeVarargs public static void emit(Processor p, T... values) { for (T v : values) { p.onNext(v); @@ -2291,6 +2294,7 @@ public static void emit(Processor p, T... values) { * @param p the target subject * @param values the values to emit */ + @SafeVarargs public static void emit(Subject p, T... values) { for (T v : values) { p.onNext(v); @@ -2495,7 +2499,7 @@ protected void subscribeActual(Observer observer) { if (o instanceof ObservableSource) { ObservableSource os = (ObservableSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2517,7 +2521,7 @@ protected void subscribeActual(Observer observer) { if (o instanceof Publisher) { Publisher os = (Publisher) o; - TestSubscriberEx ts = new TestSubscriberEx(); + TestSubscriberEx ts = new TestSubscriberEx<>(); os.subscribe(ts); @@ -2539,7 +2543,7 @@ protected void subscribeActual(Observer observer) { if (o instanceof SingleSource) { SingleSource os = (SingleSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2561,7 +2565,7 @@ protected void subscribeActual(Observer observer) { if (o instanceof MaybeSource) { MaybeSource os = (MaybeSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2583,7 +2587,7 @@ protected void subscribeActual(Observer observer) { if (o instanceof CompletableSource) { CompletableSource os = (CompletableSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2654,7 +2658,7 @@ protected void subscribeActual(Subscriber subscriber) { if (o instanceof ObservableSource) { ObservableSource os = (ObservableSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2676,7 +2680,7 @@ protected void subscribeActual(Subscriber subscriber) { if (o instanceof Publisher) { Publisher os = (Publisher) o; - TestSubscriberEx ts = new TestSubscriberEx(); + TestSubscriberEx ts = new TestSubscriberEx<>(); os.subscribe(ts); @@ -2698,7 +2702,7 @@ protected void subscribeActual(Subscriber subscriber) { if (o instanceof SingleSource) { SingleSource os = (SingleSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2720,7 +2724,7 @@ protected void subscribeActual(Subscriber subscriber) { if (o instanceof MaybeSource) { MaybeSource os = (MaybeSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2742,7 +2746,7 @@ protected void subscribeActual(Subscriber subscriber) { if (o instanceof CompletableSource) { CompletableSource os = (CompletableSource) o; - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); os.subscribe(to); @@ -2778,7 +2782,7 @@ public static void checkInvalidParallelSubscribers(ParallelFlowable sourc @SuppressWarnings("unchecked") TestSubscriber[] tss = new TestSubscriber[n + 1]; for (int i = 0; i <= n; i++) { - tss[i] = new TestSubscriber().withTag("" + i); + tss[i] = new TestSubscriber<>().withTag("" + i); } source.subscribe(tss); @@ -2893,7 +2897,7 @@ static final class FlowableStripBoundary extends Flowable implements Flowa @Override public Flowable apply(Flowable upstream) { - return new FlowableStripBoundary(upstream); + return new FlowableStripBoundary<>(upstream); } @Override @@ -2985,7 +2989,7 @@ public void cancel() { } public static FlowableTransformer flowableStripBoundary() { - return new FlowableStripBoundary(null); + return new FlowableStripBoundary<>(null); } static final class ObservableStripBoundary extends Observable implements ObservableTransformer { @@ -2998,7 +3002,7 @@ static final class ObservableStripBoundary extends Observable implements O @Override public Observable apply(Observable upstream) { - return new ObservableStripBoundary(upstream); + return new ObservableStripBoundary<>(upstream); } @Override @@ -3090,23 +3094,23 @@ public boolean isDisposed() { } public static ObservableTransformer observableStripBoundary() { - return new ObservableStripBoundary(null); + return new ObservableStripBoundary<>(null); } public static TestConsumerExConverters testConsumer() { - return new TestConsumerExConverters(false, 0); + return new TestConsumerExConverters<>(false, 0); } public static TestConsumerExConverters testConsumer(boolean cancelled) { - return new TestConsumerExConverters(cancelled, 0); + return new TestConsumerExConverters<>(cancelled, 0); } public static TestConsumerExConverters testConsumer(final int fusionMode, final boolean cancelled) { - return new TestConsumerExConverters(cancelled, fusionMode); + return new TestConsumerExConverters<>(cancelled, fusionMode); } public static TestConsumerExConverters testConsumer(final boolean cancelled, final int fusionMode) { - return new TestConsumerExConverters(cancelled, fusionMode); + return new TestConsumerExConverters<>(cancelled, fusionMode); } public static FlowableConverter> testSubscriber(final long initialRequest) { @@ -3125,7 +3129,7 @@ public static FlowableConverter> testSubscriber(final return new FlowableConverter>() { @Override public TestSubscriberEx apply(Flowable f) { - TestSubscriberEx tse = new TestSubscriberEx(initialRequest); + TestSubscriberEx tse = new TestSubscriberEx<>(initialRequest); if (cancelled) { tse.cancel(); } @@ -3153,7 +3157,7 @@ public static final class TestConsumerExConverters implements @Override public TestObserverEx apply(Completable upstream) { - TestObserverEx toe = new TestObserverEx(); + TestObserverEx toe = new TestObserverEx<>(); if (cancelled) { toe.dispose(); } @@ -3163,7 +3167,7 @@ public TestObserverEx apply(Completable upstream) { @Override public TestObserverEx apply(Maybe upstream) { - TestObserverEx toe = new TestObserverEx(); + TestObserverEx toe = new TestObserverEx<>(); if (cancelled) { toe.dispose(); } @@ -3173,7 +3177,7 @@ public TestObserverEx apply(Maybe upstream) { @Override public TestObserverEx apply(Single upstream) { - TestObserverEx toe = new TestObserverEx(); + TestObserverEx toe = new TestObserverEx<>(); if (cancelled) { toe.dispose(); } @@ -3183,7 +3187,7 @@ public TestObserverEx apply(Single upstream) { @Override public TestObserverEx apply(Observable upstream) { - TestObserverEx toe = new TestObserverEx(); + TestObserverEx toe = new TestObserverEx<>(); if (cancelled) { toe.dispose(); } @@ -3193,7 +3197,7 @@ public TestObserverEx apply(Observable upstream) { @Override public TestSubscriberEx apply(Flowable upstream) { - TestSubscriberEx tse = new TestSubscriberEx(); + TestSubscriberEx tse = new TestSubscriberEx<>(); if (cancelled) { tse.dispose(); } @@ -3202,8 +3206,9 @@ public TestSubscriberEx apply(Flowable upstream) { } } + @SafeVarargs public static TestSubscriberEx assertValueSet(TestSubscriberEx ts, T... values) { - Set expectedSet = new HashSet(Arrays.asList(values)); + Set expectedSet = new HashSet<>(Arrays.asList(values)); for (T t : ts.values()) { if (!expectedSet.contains(t)) { throw ts.failWith("Item not in the set: " + BaseTestConsumer.valueAndClass(t)); @@ -3212,8 +3217,9 @@ public static TestSubscriberEx assertValueSet(TestSubscriberEx ts, T.. return ts; } + @SafeVarargs public static TestObserverEx assertValueSet(TestObserverEx to, T... values) { - Set expectedSet = new HashSet(Arrays.asList(values)); + Set expectedSet = new HashSet<>(Arrays.asList(values)); for (T t : to.values()) { if (!expectedSet.contains(t)) { throw to.failWith("Item not in the set: " + BaseTestConsumer.valueAndClass(t)); @@ -3280,35 +3286,35 @@ public Integer apply(Integer v) throws Throwable { .to(transform); if (result instanceof MaybeSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((MaybeSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof SingleSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((SingleSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof CompletableSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((CompletableSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof ObservableSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((ObservableSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof Publisher) { - TestSubscriberEx ts = new TestSubscriberEx(); + TestSubscriberEx ts = new TestSubscriberEx<>(); disposable.set(Disposables.fromSubscription(ts)); ((Publisher)result) @@ -3348,35 +3354,35 @@ public Integer apply(Integer v) throws Throwable { .to(transform); if (result instanceof MaybeSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((MaybeSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof SingleSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((SingleSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof CompletableSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((CompletableSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof ObservableSource) { - TestObserverEx to = new TestObserverEx(); + TestObserverEx to = new TestObserverEx<>(); disposable.set(to); ((ObservableSource)result) .subscribe(to); to.assertEmpty(); } else if (result instanceof Publisher) { - TestSubscriberEx ts = new TestSubscriberEx(); + TestSubscriberEx ts = new TestSubscriberEx<>(); disposable.set(Disposables.fromSubscription(ts)); ((Publisher)result) @@ -3419,4 +3425,18 @@ public static long awaitGC(long oneSleep, int maxLoop, long expectedMemoryUsage) } return bean.getHeapMemoryUsage().getUsed(); } + + /** + * Enable thracking of the global errors for the duration of the action. + * @param action the action to run with a list of errors encountered + * @throws Throwable the exception rethrown from the action + */ + public static void withErrorTracking(Consumer> action) throws Throwable { + List errors = trackPluginErrors(); + try { + action.accept(errors); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index d8b5d14f7e..2d4387d543 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -14,8 +14,10 @@ package io.reactivex.rxjava3.validators; import java.lang.reflect.*; +import java.time.Duration; import java.util.*; import java.util.concurrent.*; +import java.util.stream.*; import org.junit.Test; import org.reactivestreams.*; @@ -584,6 +586,14 @@ public void checkParallelFlowable() { defaultValues.put(ParallelFailureHandling.class, ParallelFailureHandling.ERROR); + // JDK 8 types + + defaultValues.put(Optional.class, Optional.of(1)); + defaultValues.put(CompletionStage.class, CompletableFuture.completedFuture(1)); + defaultValues.put(Stream.class, Stream.of(1, 2, 3)); + defaultValues.put(Duration.class, Duration.ofSeconds(1)); + defaultValues.put(Collector.class, Collectors.toList()); + @SuppressWarnings("rawtypes") class MixedConverters implements FlowableConverter, ObservableConverter, SingleConverter, MaybeConverter, CompletableConverter, ParallelFlowableConverter { From 19be540911af0188e8a50b041029e65dc1d86410 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 17 Dec 2019 15:38:02 +0100 Subject: [PATCH 2/3] Add NonNull annotation to the new fromX methods --- src/main/java/io/reactivex/rxjava3/core/Flowable.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index b8cd707d57..db2131688f 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -18651,6 +18651,7 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @NonNull public static Flowable fromOptional(@NonNull Optional optional) { ObjectHelper.requireNonNull(optional, "optional is null"); return optional.map(Flowable::just).orElseGet(Flowable::empty); @@ -18687,6 +18688,7 @@ public static Flowable fromOptional(@NonNull Optional optional) { @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @NonNull public static Flowable fromCompletionStage(@NonNull CompletionStage stage) { ObjectHelper.requireNonNull(stage, "stage is null"); return RxJavaPlugins.onAssembly(new FlowableFromCompletionStage<>(stage)); @@ -18732,6 +18734,7 @@ public static Flowable fromCompletionStage(@NonNull CompletionStage st @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @NonNull public static Flowable fromStream(@NonNull Stream stream) { ObjectHelper.requireNonNull(stream, "stream is null"); return RxJavaPlugins.onAssembly(new FlowableFromStream<>(stream)); From 3c19f8d93ae7a5a837b173dad8efdaa5dd9240f5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 17 Dec 2019 16:03:12 +0100 Subject: [PATCH 3/3] Annotate return type argument to Flowable<@NonNull T> --- src/main/java/io/reactivex/rxjava3/core/Flowable.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index db2131688f..3fed4742e2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -18652,7 +18652,7 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @NonNull - public static Flowable fromOptional(@NonNull Optional optional) { + public static Flowable<@NonNull T> fromOptional(@NonNull Optional optional) { ObjectHelper.requireNonNull(optional, "optional is null"); return optional.map(Flowable::just).orElseGet(Flowable::empty); } @@ -18689,7 +18689,7 @@ public static Flowable fromOptional(@NonNull Optional optional) { @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @NonNull - public static Flowable fromCompletionStage(@NonNull CompletionStage stage) { + public static Flowable<@NonNull T> fromCompletionStage(@NonNull CompletionStage stage) { ObjectHelper.requireNonNull(stage, "stage is null"); return RxJavaPlugins.onAssembly(new FlowableFromCompletionStage<>(stage)); } @@ -18735,7 +18735,7 @@ public static Flowable fromCompletionStage(@NonNull CompletionStage st @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) @NonNull - public static Flowable fromStream(@NonNull Stream stream) { + public static Flowable<@NonNull T> fromStream(@NonNull Stream stream) { ObjectHelper.requireNonNull(stream, "stream is null"); return RxJavaPlugins.onAssembly(new FlowableFromStream<>(stream)); }