From f4f0138d9a19948ba786a58ccc32b88f1c03752a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 7 Jan 2020 09:45:59 +0100 Subject: [PATCH] 3.x: Remove fromFuture(..., Scheduler) overloads --- .../io/reactivex/rxjava3/core/Flowable.java | 89 ------------------- .../io/reactivex/rxjava3/core/Observable.java | 78 ---------------- .../io/reactivex/rxjava3/core/Single.java | 67 -------------- .../rxjava3/flowable/FlowableNullTests.java | 10 --- .../flowable/FlowableToFutureTest.java | 4 +- .../observable/ObservableFromTest.java | 3 +- .../observable/ObservableToFutureTest.java | 2 +- .../operators/single/SingleFromTest.java | 6 +- .../observable/ObservableNullTests.java | 11 --- .../rxjava3/single/SingleNullTests.java | 20 ----- .../ParamValidationCheckerTest.java | 3 - 11 files changed, 9 insertions(+), 284 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index c61ea17163..b5135bf356 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -2089,95 +2089,6 @@ public static Flowable error(@NonNull Throwable throwable) { return RxJavaPlugins.onAssembly(new FlowableFromFuture<>(future, timeout, unit)); } - /** - * Converts a {@link Future} into a {@link Publisher}, with a timeout on the {@code Future}. - *

- * - *

- * You can convert any object that supports the {@code Future} interface into a {@code Publisher} that emits the - * return value of the {@link Future#get} method of that object by passing the object into the {@code from} - * method. - *

- * Unlike 1.x, canceling the {@code Flowable} won't cancel the future. If necessary, one can use composition to achieve the - * cancellation effect: {@code futurePublisher.doOnCancel(() -> future.cancel(true));}. - *

- * Important note: This {@code Publisher} is blocking; you cannot cancel it. - *

- * Also note that this operator will consume a {@link CompletionStage}-based {@code Future} subclass (such as - * {@link CompletableFuture}) in a blocking manner as well. Use the {@link #fromCompletionStage(CompletionStage)} - * operator to convert and consume such sources in a non-blocking fashion instead. - *

- *
Backpressure:
- *
The operator honors backpressure from downstream.
- *
Scheduler:
- *
{@code fromFuture} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param future - * the source {@code Future} - * @param timeout - * the maximum time to wait before calling {@code get} - * @param unit - * the {@link TimeUnit} of the {@code timeout} argument - * @param scheduler - * the {@code Scheduler} to wait for the {@code Future} on. Use a {@code Scheduler} such as - * {@link Schedulers#io()} that can block and wait on the {@code Future} - * @param - * the type of object that the {@code Future} returns, and also the type of item to be emitted by - * the resulting {@code Publisher} - * @return a {@code Flowable} that emits the item from the source {@code Future} - * @see ReactiveX operators documentation: From - * @see #fromCompletionStage(CompletionStage) - */ - @SuppressWarnings({ "unchecked" }) - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.CUSTOM) - public static <@NonNull T> Flowable fromFuture(Future future, long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { - Objects.requireNonNull(scheduler, "scheduler is null"); - return fromFuture((Future)future, timeout, unit).subscribeOn(scheduler); - } - - /** - * Converts a {@link Future}, operating on a specified {@link Scheduler}, into a {@link Publisher}. - *

- * - *

- * You can convert any object that supports the {@code Future} interface into a {@code Publisher} that emits the - * return value of the {@link Future#get} method of that object by passing the object into the {@code from} - * method. - *

- * Unlike 1.x, canceling the {@code Flowable} won't cancel the future. If necessary, one can use composition to achieve the - * cancellation effect: {@code futurePublisher.doOnCancel(() -> future.cancel(true));}. - *

- *
Backpressure:
- *
The operator honors backpressure from downstream.
- *
Scheduler:
- *
You specify which {@code Scheduler} this operator will use.
- *
- * - * @param future - * the source {@code Future} - * @param scheduler - * the {@code Scheduler} to wait for the {@code Future} on. Use a {@code Scheduler} such as - * {@link Schedulers#io()} that can block and wait on the {@code Future} - * @param - * the type of object that the {@code Future} returns, and also the type of item to be emitted by - * the resulting {@code Publisher} - * @return a {@code Flowable} that emits the item from the source {@code Future} - * @see ReactiveX operators documentation: From - */ - @SuppressWarnings({ "unchecked" }) - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.CUSTOM) - public static <@NonNull T> Flowable fromFuture(Future future, @NonNull Scheduler scheduler) { - Objects.requireNonNull(scheduler, "scheduler is null"); - return fromFuture((Future)future).subscribeOn(scheduler); - } - /** * Converts an {@link Iterable} sequence into a {@link Publisher} that emits the items in the sequence. *

diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 7a8d808072..c8db1606e1 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -1839,84 +1839,6 @@ public static Observable fromFuture(@NonNull Future future, return RxJavaPlugins.onAssembly(new ObservableFromFuture<>(future, timeout, unit)); } - /** - * Converts a {@link Future} into an ObservableSource, with a timeout on the Future. - *

- * - *

- * You can convert any object that supports the {@link Future} interface into an ObservableSource that emits the - * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} - * method. - *

- * Unlike 1.x, disposing the Observable won't cancel the future. If necessary, one can use composition to achieve the - * cancellation effect: {@code futureObservableSource.doOnDispose(() -> future.cancel(true));}. - *

- * Important note: This ObservableSource is blocking; you cannot dispose it. - *

- *
Scheduler:
- *
{@code fromFuture} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param future - * the source {@link Future} - * @param timeout - * the maximum time to wait before calling {@code get} - * @param unit - * the {@link TimeUnit} of the {@code timeout} argument - * @param scheduler - * the {@link Scheduler} to wait for the Future on. Use a Scheduler such as - * {@link Schedulers#io()} that can block and wait on the Future - * @param - * the type of object that the {@link Future} returns, and also the type of item to be emitted by - * the resulting ObservableSource - * @return an Observable that emits the item from the source {@link Future} - * @see ReactiveX operators documentation: From - */ - @CheckReturnValue - @NonNull - @SchedulerSupport(SchedulerSupport.CUSTOM) - public static Observable fromFuture(@NonNull Future future, long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { - Objects.requireNonNull(scheduler, "scheduler is null"); - Observable o = fromFuture(future, timeout, unit); - return o.subscribeOn(scheduler); - } - - /** - * Converts a {@link Future}, operating on a specified {@link Scheduler}, into an ObservableSource. - *

- * - *

- * You can convert any object that supports the {@link Future} interface into an ObservableSource that emits the - * return value of the {@link Future#get} method of that object, by passing the object into the {@code from} - * method. - *

- * Unlike 1.x, disposing the Observable won't cancel the future. If necessary, one can use composition to achieve the - * cancellation effect: {@code futureObservableSource.doOnDispose(() -> future.cancel(true));}. - *

- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param future - * the source {@link Future} - * @param scheduler - * the {@link Scheduler} to wait for the Future on. Use a Scheduler such as - * {@link Schedulers#io()} that can block and wait on the Future - * @param - * the type of object that the {@link Future} returns, and also the type of item to be emitted by - * the resulting ObservableSource - * @return an Observable that emits the item from the source {@link Future} - * @see ReactiveX operators documentation: From - */ - @CheckReturnValue - @NonNull - @SchedulerSupport(SchedulerSupport.CUSTOM) - public static Observable fromFuture(@NonNull Future future, @NonNull Scheduler scheduler) { - Objects.requireNonNull(scheduler, "scheduler is null"); - Observable o = fromFuture(future); - return o.subscribeOn(scheduler); - } - /** * Converts an {@link Iterable} sequence into an ObservableSource that emits the items in the sequence. *

diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index ed676e44fb..8da48508cd 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -722,73 +722,6 @@ public static Single error(@NonNull Throwable exception) { return toSingle(Flowable.fromFuture(future, timeout, unit)); } - /** - * Converts a {@link Future} into a {@code Single}, with a timeout on the {@code Future}. - *

- * - *

- * You can convert any object that supports the {@code Future} interface into a {@code Single} that emits - * the return value of the {@link Future#get} method of that object, by passing the object into the - * {@code from} method. - *

- * Important note: This {@code Single} is blocking; you cannot dispose it. - *

- *
Scheduler:
- *
You specify the {@link Scheduler} where the blocking wait will happen.
- *
- * - * @param future - * the source {@code Future} - * @param timeout - * the maximum time to wait before calling {@code get} - * @param unit - * the {@link TimeUnit} of the {@code timeout} argument - * @param scheduler - * the {@code Scheduler} to use for the blocking wait - * @param - * the type of object that the {@code Future} returns, and also the type of item to be emitted by - * the resulting {@code Single} - * @return the new {@code Single} that emits the item from the source {@code Future} - * @see ReactiveX operators documentation: From - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.CUSTOM) - @NonNull - public static <@NonNull T> Single fromFuture(@NonNull Future future, long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { - return toSingle(Flowable.fromFuture(future, timeout, unit, scheduler)); - } - - /** - * Converts a {@link Future}, operating on a specified {@link Scheduler}, into a {@code Single}. - *

- * - *

- * You can convert any object that supports the {@code Future} interface into a {@code Single} that emits - * the return value of the {@link Future#get} method of that object, by passing the object into the - * {@code from} method. - *

- *
Scheduler:
- *
You specify which {@code Scheduler} this operator will use.
- *
- * - * @param future - * the source {@code Future} - * @param scheduler - * the {@code Scheduler} to wait for the {@code Future} on. Use a {@code Scheduler} such as - * {@link Schedulers#io()} that can block and wait on the {@code Future} - * @param - * the type of object that the {@code Future} returns, and also the type of item to be emitted by - * the resulting {@code Single} - * @return the new {@code Single} that emits the item from the source {@code Future} - * @see ReactiveX operators documentation: From - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.CUSTOM) - @NonNull - public static <@NonNull T> Single fromFuture(@NonNull Future future, @NonNull Scheduler scheduler) { - return toSingle(Flowable.fromFuture(future, scheduler)); - } - /** * Wraps a specific {@link Publisher} into a {@code Single} and signals its single element or error. *

diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java index fd9712b64d..d76b90de70 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java @@ -252,11 +252,6 @@ public void fromFutureTimedUnitNull() { Flowable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, null); } - @Test(expected = NullPointerException.class) - public void fromFutureTimedSchedulerNull() { - Flowable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null); - } - @Test(expected = NullPointerException.class) public void fromFutureTimedReturnsNull() { FutureTask f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null); @@ -264,11 +259,6 @@ public void fromFutureTimedReturnsNull() { Flowable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast(); } - @Test(expected = NullPointerException.class) - public void fromFutureSchedulerNull() { - Flowable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), null); - } - @Test(expected = NullPointerException.class) public void fromIterableNull() { Flowable.fromIterable(null); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableToFutureTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableToFutureTest.java index 524a7216e1..4241dc8be9 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableToFutureTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableToFutureTest.java @@ -62,7 +62,7 @@ public void successOperatesOnSuppliedScheduler() throws Exception { TestScheduler scheduler = new TestScheduler(); TestSubscriber ts = new TestSubscriber<>(subscriber); - Flowable.fromFuture(future, scheduler).subscribe(ts); + Flowable.fromFuture(future).subscribeOn(scheduler).subscribe(ts); verify(subscriber, never()).onNext(value); @@ -234,7 +234,7 @@ public void run() { TestSubscriber ts = TestSubscriber.create(); - Flowable.fromFuture(task, Schedulers.computation()).subscribe(ts); + Flowable.fromFuture(task).subscribeOn(Schedulers.computation()).subscribe(ts); task.run(); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromTest.java index 0bc7a09997..e3f6f91954 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromTest.java @@ -30,7 +30,8 @@ public class ObservableFromTest extends RxJavaTest { @Test public void fromFutureTimeout() throws Exception { Observable.fromFuture(Observable.never() - .toFuture(), 100, TimeUnit.MILLISECONDS, Schedulers.io()) + .toFuture(), 100, TimeUnit.MILLISECONDS) + .subscribeOn(Schedulers.io()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TimeoutException.class); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToFutureTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToFutureTest.java index 164d7f6fd6..2631dfac4e 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToFutureTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToFutureTest.java @@ -61,7 +61,7 @@ public void successOperatesOnSuppliedScheduler() throws Exception { TestScheduler scheduler = new TestScheduler(); TestObserver to = new TestObserver<>(o); - Observable.fromFuture(future, scheduler).subscribe(to); + Observable.fromFuture(future).subscribeOn(scheduler).subscribe(to); verify(o, never()).onNext(value); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromTest.java index 46847bedd3..5c7bdfd5d2 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFromTest.java @@ -24,7 +24,8 @@ public class SingleFromTest extends RxJavaTest { @Test public void fromFuture() throws Exception { - Single.fromFuture(Flowable.just(1).toFuture(), Schedulers.io()) + Single.fromFuture(Flowable.just(1).toFuture()) + .subscribeOn(Schedulers.io()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertResult(1); @@ -32,7 +33,8 @@ public void fromFuture() throws Exception { @Test public void fromFutureTimeout() throws Exception { - Single.fromFuture(Flowable.never().toFuture(), 1, TimeUnit.SECONDS, Schedulers.io()) + Single.fromFuture(Flowable.never().toFuture(), 1, TimeUnit.SECONDS) + .subscribeOn(Schedulers.io()) .test() .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TimeoutException.class); diff --git a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java index 0fa7c22711..d4bcd85cc9 100644 --- a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java @@ -299,11 +299,6 @@ public void fromFutureTimedUnitNull() { Observable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, null); } - @Test(expected = NullPointerException.class) - public void fromFutureTimedSchedulerNull() { - Observable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null); - } - @Test(expected = NullPointerException.class) public void fromFutureTimedReturnsNull() { FutureTask f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null); @@ -311,12 +306,6 @@ public void fromFutureTimedReturnsNull() { Observable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast(); } - @Test(expected = NullPointerException.class) - public void fromFutureSchedulerNull() { - FutureTask f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null); - Observable.fromFuture(f, null); - } - @Test(expected = NullPointerException.class) public void fromIterableNull() { Observable.fromIterable(null); diff --git a/src/test/java/io/reactivex/rxjava3/single/SingleNullTests.java b/src/test/java/io/reactivex/rxjava3/single/SingleNullTests.java index 6ffdb2ebf8..712d93f1aa 100644 --- a/src/test/java/io/reactivex/rxjava3/single/SingleNullTests.java +++ b/src/test/java/io/reactivex/rxjava3/single/SingleNullTests.java @@ -194,16 +194,6 @@ public Object call() throws Exception { }), 1, null); } - @Test(expected = NullPointerException.class) - public void fromFutureTimedSchedulerNull() { - Single.fromFuture(new FutureTask<>(new Callable() { - @Override - public Object call() throws Exception { - return null; - } - }), 1, TimeUnit.SECONDS, null); - } - @Test(expected = NullPointerException.class) public void fromFutureTimedReturnsNull() { FutureTask f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null); @@ -211,16 +201,6 @@ public void fromFutureTimedReturnsNull() { Single.fromFuture(f, 1, TimeUnit.SECONDS).blockingGet(); } - @Test(expected = NullPointerException.class) - public void fromFutureSchedulerNull() { - Single.fromFuture(new FutureTask<>(new Callable() { - @Override - public Object call() throws Exception { - return null; - } - }), null); - } - @Test(expected = NullPointerException.class) public void fromPublisherNull() { Single.fromPublisher(null); diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index d8c3e32636..85a6c4b0bd 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -125,7 +125,6 @@ public void checkParallelFlowable() { // negative timeout is allowed addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class)); - addOverride(new ParamOverride(Flowable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class, Scheduler.class)); // null default is allowed addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "blockingLast", Object.class)); @@ -325,7 +324,6 @@ public void checkParallelFlowable() { // negative timeout is allowed addOverride(new ParamOverride(Single.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class)); - addOverride(new ParamOverride(Single.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class, Scheduler.class)); // negative time is considered as zero time addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class)); @@ -381,7 +379,6 @@ public void checkParallelFlowable() { // negative timeout is allowed addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class)); - addOverride(new ParamOverride(Observable.class, 1, ParamMode.ANY, "fromFuture", Future.class, Long.TYPE, TimeUnit.class, Scheduler.class)); // null default is allowed addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "blockingLast", Object.class));