Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Remove fromFuture(..., Scheduler) overloads #6814

Merged
merged 1 commit into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 0 additions & 89 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2089,95 +2089,6 @@ public static <T> Flowable<T> error(@NonNull Throwable throwable) {
return RxJavaPlugins.onAssembly(new FlowableFromFuture<>(future, timeout, unit));
}

/**
* Converts a {@link Future} into a {@link Publisher}, with a timeout on the {@code Future}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.Future.png" alt="">
* <p>
* You can convert any object that supports the {@code Future} interface into a {@code Publisher} that emits the
* return value of the {@link Future#get} method of that object by passing the object into the {@code from}
* method.
* <p>
* Unlike 1.x, canceling the {@code Flowable} won't cancel the future. If necessary, one can use composition to achieve the
* cancellation effect: {@code futurePublisher.doOnCancel(() -> future.cancel(true));}.
* <p>
* <em>Important note:</em> This {@code Publisher} is blocking; you cannot cancel it.
* <p>
* Also note that this operator will consume a {@link CompletionStage}-based {@code Future} subclass (such as
* {@link CompletableFuture}) in a blocking manner as well. Use the {@link #fromCompletionStage(CompletionStage)}
* operator to convert and consume such sources in a non-blocking fashion instead.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param future
* the source {@code Future}
* @param timeout
* the maximum time to wait before calling {@code get}
* @param unit
* the {@link TimeUnit} of the {@code timeout} argument
* @param scheduler
* the {@code Scheduler} to wait for the {@code Future} on. Use a {@code Scheduler} such as
* {@link Schedulers#io()} that can block and wait on the {@code Future}
* @param <T>
* the type of object that the {@code Future} returns, and also the type of item to be emitted by
* the resulting {@code Publisher}
* @return a {@code Flowable} that emits the item from the source {@code Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
* @see #fromCompletionStage(CompletionStage)
*/
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <@NonNull T> Flowable<T> fromFuture(Future<? extends T> future, long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return fromFuture((Future<T>)future, timeout, unit).subscribeOn(scheduler);
}

/**
* Converts a {@link Future}, operating on a specified {@link Scheduler}, into a {@link Publisher}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/from.Future.s.png" alt="">
* <p>
* You can convert any object that supports the {@code Future} interface into a {@code Publisher} that emits the
* return value of the {@link Future#get} method of that object by passing the object into the {@code from}
* method.
* <p>
* Unlike 1.x, canceling the {@code Flowable} won't cancel the future. If necessary, one can use composition to achieve the
* cancellation effect: {@code futurePublisher.doOnCancel(() -> future.cancel(true));}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param future
* the source {@code Future}
* @param scheduler
* the {@code Scheduler} to wait for the {@code Future} on. Use a {@code Scheduler} such as
* {@link Schedulers#io()} that can block and wait on the {@code Future}
* @param <T>
* the type of object that the {@code Future} returns, and also the type of item to be emitted by
* the resulting {@code Publisher}
* @return a {@code Flowable} that emits the item from the source {@code Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@SuppressWarnings({ "unchecked" })
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <@NonNull T> Flowable<T> fromFuture(Future<? extends T> future, @NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return fromFuture((Future<T>)future).subscribeOn(scheduler);
}

/**
* Converts an {@link Iterable} sequence into a {@link Publisher} that emits the items in the sequence.
* <p>
Expand Down
78 changes: 0 additions & 78 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1839,84 +1839,6 @@ public static <T> Observable<T> fromFuture(@NonNull Future<? extends T> future,
return RxJavaPlugins.onAssembly(new ObservableFromFuture<>(future, timeout, unit));
}

/**
* Converts a {@link Future} into an ObservableSource, with a timeout on the Future.
* <p>
* <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromFuture.timeout.scheduler.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into an ObservableSource that emits the
* return value of the {@link Future#get} method of that object, by passing the object into the {@code from}
* method.
* <p>
* Unlike 1.x, disposing the Observable won't cancel the future. If necessary, one can use composition to achieve the
* cancellation effect: {@code futureObservableSource.doOnDispose(() -> future.cancel(true));}.
* <p>
* <em>Important note:</em> This ObservableSource is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param timeout
* the maximum time to wait before calling {@code get}
* @param unit
* the {@link TimeUnit} of the {@code timeout} argument
* @param scheduler
* the {@link Scheduler} to wait for the Future on. Use a Scheduler such as
* {@link Schedulers#io()} that can block and wait on the Future
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting ObservableSource
* @return an Observable that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Observable<T> fromFuture(@NonNull Future<? extends T> future, long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
Observable<T> o = fromFuture(future, timeout, unit);
return o.subscribeOn(scheduler);
}

/**
* Converts a {@link Future}, operating on a specified {@link Scheduler}, into an ObservableSource.
* <p>
* <img width="640" height="294" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/fromFuture.scheduler.png" alt="">
* <p>
* You can convert any object that supports the {@link Future} interface into an ObservableSource that emits the
* return value of the {@link Future#get} method of that object, by passing the object into the {@code from}
* method.
* <p>
* Unlike 1.x, disposing the Observable won't cancel the future. If necessary, one can use composition to achieve the
* cancellation effect: {@code futureObservableSource.doOnDispose(() -> future.cancel(true));}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param future
* the source {@link Future}
* @param scheduler
* the {@link Scheduler} to wait for the Future on. Use a Scheduler such as
* {@link Schedulers#io()} that can block and wait on the Future
* @param <T>
* the type of object that the {@link Future} returns, and also the type of item to be emitted by
* the resulting ObservableSource
* @return an Observable that emits the item from the source {@link Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static <T> Observable<T> fromFuture(@NonNull Future<? extends T> future, @NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
Observable<T> o = fromFuture(future);
return o.subscribeOn(scheduler);
}

/**
* Converts an {@link Iterable} sequence into an ObservableSource that emits the items in the sequence.
* <p>
Expand Down
67 changes: 0 additions & 67 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -722,73 +722,6 @@ public static <T> Single<T> error(@NonNull Throwable exception) {
return toSingle(Flowable.fromFuture(future, timeout, unit));
}

/**
* Converts a {@link Future} into a {@code Single}, with a timeout on the {@code Future}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.png" alt="">
* <p>
* You can convert any object that supports the {@code Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <p>
* <em>Important note:</em> This {@code Single} is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify the {@link Scheduler} where the blocking wait will happen.</dd>
* </dl>
*
* @param future
* the source {@code Future}
* @param timeout
* the maximum time to wait before calling {@code get}
* @param unit
* the {@link TimeUnit} of the {@code timeout} argument
* @param scheduler
* the {@code Scheduler} to use for the blocking wait
* @param <T>
* the type of object that the {@code Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return the new {@code Single} that emits the item from the source {@code Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public static <@NonNull T> Single<T> fromFuture(@NonNull Future<? extends T> future, long timeout, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
return toSingle(Flowable.fromFuture(future, timeout, unit, scheduler));
}

/**
* Converts a {@link Future}, operating on a specified {@link Scheduler}, into a {@code Single}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.from.Future.s.png" alt="">
* <p>
* You can convert any object that supports the {@code Future} interface into a {@code Single} that emits
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
*
* @param future
* the source {@code Future}
* @param scheduler
* the {@code Scheduler} to wait for the {@code Future} on. Use a {@code Scheduler} such as
* {@link Schedulers#io()} that can block and wait on the {@code Future}
* @param <T>
* the type of object that the {@code Future} returns, and also the type of item to be emitted by
* the resulting {@code Single}
* @return the new {@code Single} that emits the item from the source {@code Future}
* @see <a href="http://reactivex.io/documentation/operators/from.html">ReactiveX operators documentation: From</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public static <@NonNull T> Single<T> fromFuture(@NonNull Future<? extends T> future, @NonNull Scheduler scheduler) {
return toSingle(Flowable.fromFuture(future, scheduler));
}

/**
* Wraps a specific {@link Publisher} into a {@code Single} and signals its single element or error.
* <p>
Expand Down
10 changes: 0 additions & 10 deletions src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,23 +252,13 @@ public void fromFutureTimedUnitNull() {
Flowable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, null);
}

@Test(expected = NullPointerException.class)
public void fromFutureTimedSchedulerNull() {
Flowable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null);
}

@Test(expected = NullPointerException.class)
public void fromFutureTimedReturnsNull() {
FutureTask<Object> f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
f.run();
Flowable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast();
}

@Test(expected = NullPointerException.class)
public void fromFutureSchedulerNull() {
Flowable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), null);
}

@Test(expected = NullPointerException.class)
public void fromIterableNull() {
Flowable.fromIterable(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void successOperatesOnSuppliedScheduler() throws Exception {
TestScheduler scheduler = new TestScheduler();
TestSubscriber<Object> ts = new TestSubscriber<>(subscriber);

Flowable.fromFuture(future, scheduler).subscribe(ts);
Flowable.fromFuture(future).subscribeOn(scheduler).subscribe(ts);

verify(subscriber, never()).onNext(value);

Expand Down Expand Up @@ -234,7 +234,7 @@ public void run() {

TestSubscriber<Integer> ts = TestSubscriber.create();

Flowable.fromFuture(task, Schedulers.computation()).subscribe(ts);
Flowable.fromFuture(task).subscribeOn(Schedulers.computation()).subscribe(ts);

task.run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public class ObservableFromTest extends RxJavaTest {
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS, Schedulers.io())
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void successOperatesOnSuppliedScheduler() throws Exception {
TestScheduler scheduler = new TestScheduler();
TestObserver<Object> to = new TestObserver<>(o);

Observable.fromFuture(future, scheduler).subscribe(to);
Observable.fromFuture(future).subscribeOn(scheduler).subscribe(to);

verify(o, never()).onNext(value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ public class SingleFromTest extends RxJavaTest {

@Test
public void fromFuture() throws Exception {
Single.fromFuture(Flowable.just(1).toFuture(), Schedulers.io())
Single.fromFuture(Flowable.just(1).toFuture())
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}

@Test
public void fromFutureTimeout() throws Exception {
Single.fromFuture(Flowable.never().toFuture(), 1, TimeUnit.SECONDS, Schedulers.io())
Single.fromFuture(Flowable.never().toFuture(), 1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,24 +299,13 @@ public void fromFutureTimedUnitNull() {
Observable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, null);
}

@Test(expected = NullPointerException.class)
public void fromFutureTimedSchedulerNull() {
Observable.fromFuture(new FutureTask<>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null);
}

@Test(expected = NullPointerException.class)
public void fromFutureTimedReturnsNull() {
FutureTask<Object> f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
f.run();
Observable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast();
}

@Test(expected = NullPointerException.class)
public void fromFutureSchedulerNull() {
FutureTask<Object> f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
Observable.fromFuture(f, null);
}

@Test(expected = NullPointerException.class)
public void fromIterableNull() {
Observable.fromIterable(null);
Expand Down
20 changes: 0 additions & 20 deletions src/test/java/io/reactivex/rxjava3/single/SingleNullTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,33 +194,13 @@ public Object call() throws Exception {
}), 1, null);
}

@Test(expected = NullPointerException.class)
public void fromFutureTimedSchedulerNull() {
Single.fromFuture(new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
}), 1, TimeUnit.SECONDS, null);
}

@Test(expected = NullPointerException.class)
public void fromFutureTimedReturnsNull() {
FutureTask<Object> f = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
f.run();
Single.fromFuture(f, 1, TimeUnit.SECONDS).blockingGet();
}

@Test(expected = NullPointerException.class)
public void fromFutureSchedulerNull() {
Single.fromFuture(new FutureTask<>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
}), null);
}

@Test(expected = NullPointerException.class)
public void fromPublisherNull() {
Single.fromPublisher(null);
Expand Down
Loading