From f51146933ebf7f06eeaff55457dac81a13506962 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 19 Jun 2019 11:23:02 +0200 Subject: [PATCH] 3.x: Various small API changes and removals --- .../java/io/reactivex/BlockingGetPerf.java | 4 +- src/main/java/io/reactivex/Completable.java | 46 ------------ src/main/java/io/reactivex/Flowable.java | 50 ++----------- src/main/java/io/reactivex/Maybe.java | 33 ++------- src/main/java/io/reactivex/Observable.java | 43 ++--------- src/main/java/io/reactivex/Single.java | 23 ------ .../observers/BlockingMultiObserver.java | 43 ----------- .../completable/CompletableTest.java | 33 --------- .../reactivex/flowable/FlowableNullTests.java | 5 -- .../observers/BlockingMultiObserverTest.java | 71 ------------------- .../observers/LambdaObserverTest.java | 4 +- .../completable/CompletableAwaitTest.java | 25 ------- .../flowable/FlowableIgnoreElementsTest.java | 13 ++-- .../ObservableIgnoreElementsTest.java | 9 ++- .../operators/single/SingleMiscTest.java | 14 ---- .../subscribers/LambdaSubscriberTest.java | 4 +- .../java/io/reactivex/maybe/MaybeTest.java | 9 --- .../observable/ObservableNullTests.java | 11 --- .../schedulers/FailOnBlockingTest.java | 2 +- .../ParamValidationCheckerTest.java | 3 - 20 files changed, 30 insertions(+), 415 deletions(-) diff --git a/src/jmh/java/io/reactivex/BlockingGetPerf.java b/src/jmh/java/io/reactivex/BlockingGetPerf.java index e1f8204513..e9b2fb1bbe 100644 --- a/src/jmh/java/io/reactivex/BlockingGetPerf.java +++ b/src/jmh/java/io/reactivex/BlockingGetPerf.java @@ -78,7 +78,7 @@ public Object maybe() { } @Benchmark - public Object completable() { - return completable.blockingGet(); + public void completable() { + completable.blockingAwait(); } } diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 65c9e26918..fdeaaccf91 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1232,52 +1232,6 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) { return observer.blockingAwait(timeout, unit); } - /** - * Subscribes to this Completable instance and blocks until it terminates, then returns null or - * the emitted exception if any. - *

- * - *

- *
Scheduler:
- *
{@code blockingGet} does not operate by default on a particular {@link Scheduler}.
- *
- * @return the throwable if this terminated with an error, null otherwise - * @throws RuntimeException that wraps an InterruptedException if the wait is interrupted - */ - @Nullable - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final Throwable blockingGet() { - BlockingMultiObserver observer = new BlockingMultiObserver(); - subscribe(observer); - return observer.blockingGetError(); - } - - /** - * Subscribes to this Completable instance and blocks until it terminates or the specified timeout - * elapses, then returns null for normal termination or the emitted exception if any. - *

- * - *

- *
Scheduler:
- *
{@code blockingGet} does not operate by default on a particular {@link Scheduler}.
- *
- * @param timeout the timeout value - * @param unit the time unit - * @return the throwable if this terminated with an error, null otherwise - * @throws RuntimeException that wraps an InterruptedException if the wait is interrupted or - * TimeoutException if the specified timeout elapsed before it - */ - @Nullable - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final Throwable blockingGet(long timeout, TimeUnit unit) { - ObjectHelper.requireNonNull(unit, "unit is null"); - BlockingMultiObserver observer = new BlockingMultiObserver(); - subscribe(observer); - return observer.blockingGetError(timeout, unit); - } - /** * Subscribes to this Completable only once, when the first CompletableObserver * subscribes to the result Completable, caches its terminal event diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index b6b0a21628..4ae2322d09 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -14686,8 +14686,7 @@ public final Flowable startWithArray(T... items) { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { - return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, - Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); + return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -14716,8 +14715,7 @@ public final Disposable subscribe() { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext) { - return subscribe(onNext, Functions.ON_ERROR_MISSING, - Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); + return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -14747,7 +14745,7 @@ public final Disposable subscribe(Consumer onNext) { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext, Consumer onError) { - return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); + return subscribe(onNext, onError, Functions.EMPTY_ACTION); } /** @@ -14782,51 +14780,11 @@ public final Disposable subscribe(Consumer onNext, Consumer onNext, Consumer onError, Action onComplete) { - return subscribe(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE); - } - - /** - * Subscribes to a Publisher and provides callbacks to handle the items it emits and any error or - * completion notification it issues. - *
- *
Backpressure:
- *
The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no - * backpressure is applied to it).
- *
Scheduler:
- *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param onNext - * the {@code Consumer} you have designed to accept emissions from the Publisher - * @param onError - * the {@code Consumer} you have designed to accept any error notification from the - * Publisher - * @param onComplete - * the {@code Action} you have designed to accept a completion notification from the - * Publisher - * @param onSubscribe - * the {@code Consumer} that receives the upstream's Subscription - * @return a {@link Disposable} reference with which the caller can stop receiving items before - * the Publisher has finished sending them - * @throws NullPointerException - * if {@code onNext} is null, or - * if {@code onError} is null, or - * if {@code onComplete} is null, or - * if {@code onSubscribe} is null - * @see ReactiveX operators documentation: Subscribe - */ - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.SPECIAL) - @SchedulerSupport(SchedulerSupport.NONE) - public final Disposable subscribe(Consumer onNext, Consumer onError, - Action onComplete, Consumer onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); - ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); - LambdaSubscriber ls = new LambdaSubscriber(onNext, onError, onComplete, onSubscribe); + LambdaSubscriber ls = new LambdaSubscriber(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE); subscribe(ls); diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 49017334c1..f5c3085259 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2489,13 +2489,9 @@ public final Single count() { } /** - * Returns a Maybe that emits the item emitted by the source Maybe or a specified default item + * Returns a Single that emits the item emitted by the source Maybe or a specified default item * if the source Maybe is empty. *

- * Note that the result Maybe is semantically equivalent to a {@code Single}, since it's guaranteed - * to emit exactly one item or an error. See {@link #toSingle(Object)} for a method with equivalent - * behavior which returns a {@code Single}. - *

* *

*
Scheduler:
@@ -2504,16 +2500,16 @@ public final Single count() { * * @param defaultItem * the item to emit if the source Maybe emits no items - * @return a Maybe that emits either the specified default item if the source Maybe emits no - * items, or the items emitted by the source Maybe + * @return a Single that emits either the specified default item if the source Maybe emits no + * item, or the item emitted by the source Maybe * @see ReactiveX operators documentation: DefaultIfEmpty */ @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.NONE) - public final Maybe defaultIfEmpty(T defaultItem) { + public final Single defaultIfEmpty(T defaultItem) { ObjectHelper.requireNonNull(defaultItem, "defaultItem is null"); - return switchIfEmpty(just(defaultItem)); + return RxJavaPlugins.onAssembly(new MaybeToSingle(this, defaultItem)); } /** @@ -3619,25 +3615,6 @@ public final Observable toObservable() { return RxJavaPlugins.onAssembly(new MaybeToObservable(this)); } - /** - * Converts this Maybe into a Single instance composing disposal - * through and turning an empty Maybe into a Single that emits the given - * value through onSuccess. - *
- *
Scheduler:
- *
{@code toSingle} does not operate by default on a particular {@link Scheduler}.
- *
- * @param defaultValue the default item to signal in Single if this Maybe is empty - * @return the new Single instance - */ - @CheckReturnValue - @NonNull - @SchedulerSupport(SchedulerSupport.NONE) - public final Single toSingle(T defaultValue) { - ObjectHelper.requireNonNull(defaultValue, "defaultValue is null"); - return RxJavaPlugins.onAssembly(new MaybeToSingle(this, defaultValue)); - } - /** * Converts this Maybe into a Single instance composing disposal * through and turning an empty Maybe into a signal of NoSuchElementException. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 9a92fb7f65..fa24976de9 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -12106,7 +12106,7 @@ public final Observable startWithArray(T... items) { */ @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { - return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); + return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -12131,7 +12131,7 @@ public final Disposable subscribe() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext) { - return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); + return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); } /** @@ -12157,7 +12157,7 @@ public final Disposable subscribe(Consumer onNext) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext, Consumer onError) { - return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer()); + return subscribe(onNext, onError, Functions.EMPTY_ACTION); } /** @@ -12188,46 +12188,11 @@ public final Disposable subscribe(Consumer onNext, Consumer onNext, Consumer onError, Action onComplete) { - return subscribe(onNext, onError, onComplete, Functions.emptyConsumer()); - } - - /** - * Subscribes to an ObservableSource and provides callbacks to handle the items it emits and any error or - * completion notification it issues. - *
- *
Scheduler:
- *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param onNext - * the {@code Consumer} you have designed to accept emissions from the ObservableSource - * @param onError - * the {@code Consumer} you have designed to accept any error notification from the - * ObservableSource - * @param onComplete - * the {@code Action} you have designed to accept a completion notification from the - * ObservableSource - * @param onSubscribe - * the {@code Consumer} that receives the upstream's Disposable - * @return a {@link Disposable} reference with which the caller can stop receiving items before - * the ObservableSource has finished sending them - * @throws NullPointerException - * if {@code onNext} is null, or - * if {@code onError} is null, or - * if {@code onComplete} is null, or - * if {@code onSubscribe} is null - * @see ReactiveX operators documentation: Subscribe - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final Disposable subscribe(Consumer onNext, Consumer onError, - Action onComplete, Consumer onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); - ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); - LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe); + LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, Functions.emptyConsumer()); subscribe(ls); diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index bc2227829f..9de155b256 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -3850,29 +3850,6 @@ public final R to(@NonNull SingleConverter converter) { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } - /** - * Returns a {@link Completable} that discards result of the {@link Single} - * and calls {@code onComplete} when this source {@link Single} calls - * {@code onSuccess}. Error terminal event is propagated. - *

- * - *

- *
Scheduler:
- *
{@code toCompletable} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @return a {@link Completable} that calls {@code onComplete} on it's subscriber when the source {@link Single} - * calls {@code onSuccess}. - * @since 2.0 - * @deprecated see {@link #ignoreElement()} instead, will be removed in 3.0 - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - @Deprecated - public final Completable toCompletable() { - return RxJavaPlugins.onAssembly(new CompletableFromSingle(this)); - } - /** * Returns a {@link Completable} that ignores the success value of this {@link Single} * and calls {@code onComplete} instead on the returned {@code Completable}. diff --git a/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java b/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java index 2b5f5603e0..a0dcb7bb9d 100644 --- a/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java +++ b/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java @@ -19,8 +19,6 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.util.*; -import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage; - /** * A combined Observer that awaits the success or error signal via a CountDownLatch. * @param the value type @@ -119,47 +117,6 @@ public T blockingGet(T defaultValue) { return v != null ? v : defaultValue; } - /** - * Block until the latch is counted down and return the error received or null if no - * error happened. - * @return the error received or null - */ - public Throwable blockingGetError() { - if (getCount() != 0) { - try { - BlockingHelper.verifyNonBlocking(); - await(); - } catch (InterruptedException ex) { - dispose(); - return ex; - } - } - return error; - } - - /** - * Block until the latch is counted down and return the error received or - * when the wait is interrupted or times out, null otherwise. - * @param timeout the timeout value - * @param unit the time unit - * @return the error received or null - */ - public Throwable blockingGetError(long timeout, TimeUnit unit) { - if (getCount() != 0) { - try { - BlockingHelper.verifyNonBlocking(); - if (!await(timeout, unit)) { - dispose(); - throw ExceptionHelper.wrapOrThrow(new TimeoutException(timeoutMessage(timeout, unit))); - } - } catch (InterruptedException ex) { - dispose(); - throw ExceptionHelper.wrapOrThrow(ex); - } - } - return error; - } - /** * Block until the observer terminates and return true; return false if * the wait times out. diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index ef76bf9812..0898c6a362 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -1959,32 +1959,6 @@ public void run() { Assert.assertEquals(1, calls.get()); } - @Test(timeout = 5000) - public void getNormal() { - Assert.assertNull(normal.completable.blockingGet()); - } - - @Test(timeout = 5000) - public void getError() { - Assert.assertTrue(error.completable.blockingGet() instanceof TestException); - } - - @Test(timeout = 5000) - public void getTimeout() { - try { - Completable.never().blockingGet(100, TimeUnit.MILLISECONDS); - } catch (RuntimeException ex) { - if (!(ex.getCause() instanceof TimeoutException)) { - Assert.fail("Wrong exception cause: " + ex.getCause()); - } - } - } - - @Test(expected = NullPointerException.class) - public void getNullUnit() { - normal.completable.blockingGet(1, null); - } - @Test(expected = NullPointerException.class) public void liftNull() { normal.completable.lift(null); @@ -2762,13 +2736,6 @@ public void subscribe(CompletableObserver observer) { Assert.assertTrue(name.get().startsWith("RxComputation")); } - @Test(timeout = 5000) - public void timeoutEmitError() { - Throwable e = Completable.never().timeout(100, TimeUnit.MILLISECONDS).blockingGet(); - - Assert.assertTrue(e instanceof TimeoutException); - } - @Test(timeout = 5000) public void timeoutSwitchNormal() { Completable c = Completable.never().timeout(100, TimeUnit.MILLISECONDS, normal.completable); diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 159086048e..0a2c8c7a95 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -2155,11 +2155,6 @@ public void subscribeOnCompleteNull() { just1.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), null); } - @Test(expected = NullPointerException.class) - public void subscribeOnSubscribeNull() { - just1.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, null); - } - @Test(expected = NullPointerException.class) public void subscribeNull() { just1.subscribe((Subscriber)null); diff --git a/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java b/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java index 793253504b..2e486fe67a 100644 --- a/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java +++ b/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java @@ -13,16 +13,13 @@ package io.reactivex.internal.observers; -import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage; import static org.junit.Assert.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.junit.Test; import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; import io.reactivex.schedulers.Schedulers; public class BlockingMultiObserverTest { @@ -79,72 +76,4 @@ public void blockingGetDefaultInterrupt() { Thread.interrupted(); } } - - @Test - public void blockingGetErrorInterrupt() { - final BlockingMultiObserver bmo = new BlockingMultiObserver(); - - Thread.currentThread().interrupt(); - try { - assertTrue(bmo.blockingGetError() instanceof InterruptedException); - } finally { - Thread.interrupted(); - } - } - - @Test - public void blockingGetErrorTimeoutInterrupt() { - final BlockingMultiObserver bmo = new BlockingMultiObserver(); - - Thread.currentThread().interrupt(); - try { - bmo.blockingGetError(1, TimeUnit.MINUTES); - fail("Should have thrown"); - } catch (RuntimeException ex) { - assertTrue(ex.getCause() instanceof InterruptedException); - } finally { - Thread.interrupted(); - } - } - - @Test - public void blockingGetErrorDelayed() { - final BlockingMultiObserver bmo = new BlockingMultiObserver(); - - Schedulers.single().scheduleDirect(new Runnable() { - @Override - public void run() { - bmo.onError(new TestException()); - } - }, 100, TimeUnit.MILLISECONDS); - - assertTrue(bmo.blockingGetError() instanceof TestException); - } - - @Test - public void blockingGetErrorTimeoutDelayed() { - final BlockingMultiObserver bmo = new BlockingMultiObserver(); - - Schedulers.single().scheduleDirect(new Runnable() { - @Override - public void run() { - bmo.onError(new TestException()); - } - }, 100, TimeUnit.MILLISECONDS); - - assertTrue(bmo.blockingGetError(1, TimeUnit.MINUTES) instanceof TestException); - } - - @Test - public void blockingGetErrorTimedOut() { - final BlockingMultiObserver bmo = new BlockingMultiObserver(); - - try { - assertNull(bmo.blockingGetError(1, TimeUnit.NANOSECONDS)); - fail("Should have thrown"); - } catch (RuntimeException expected) { - assertEquals(TimeoutException.class, expected.getCause().getClass()); - assertEquals(timeoutMessage(1, TimeUnit.NANOSECONDS), expected.getCause().getMessage()); - } - } } diff --git a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java index 81221cfc21..26c6e30bea 100644 --- a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java +++ b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java @@ -334,7 +334,7 @@ public void onSubscribeThrowsCancelsUpstream() { final List errors = new ArrayList(); - ps.subscribe(new Consumer() { + ps.subscribe(new LambdaObserver(new Consumer() { @Override public void accept(Integer v) throws Exception { } @@ -352,7 +352,7 @@ public void run() throws Exception { public void accept(Disposable d) throws Exception { throw new TestException(); } - }); + })); assertFalse("Has observers?!", ps.hasObservers()); assertFalse("No errors?!", errors.isEmpty()); diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java index fb73306973..1a80afbebd 100644 --- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java @@ -19,8 +19,6 @@ import org.junit.Test; -import io.reactivex.*; -import io.reactivex.exceptions.TestException; import io.reactivex.processors.PublishProcessor; public class CompletableAwaitTest { @@ -61,27 +59,4 @@ public void awaitTimeoutInterrupted() { public void awaitTimeout() { assertFalse(PublishProcessor.create().ignoreElements().blockingAwait(100, TimeUnit.MILLISECONDS)); } - - @Test - public void blockingGet() { - assertNull(Completable.complete().blockingGet()); - } - - @Test - public void blockingGetTimeout() { - assertNull(Completable.complete().blockingGet(1, TimeUnit.SECONDS)); - } - - @Test - public void blockingGetError() { - TestException ex = new TestException(); - assertSame(ex, Completable.error(ex).blockingGet()); - } - - @Test - public void blockingGetErrorTimeout() { - TestException ex = new TestException(); - assertSame(ex, Completable.error(ex).blockingGet(1, TimeUnit.SECONDS)); - } - } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java index 4d2915fac5..6272ab2757 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java @@ -144,21 +144,21 @@ public void onNext(Integer t) { assertEquals(0, count.get()); } - @Test + @Test(timeout = 5000) public void testWithEmpty() { - assertNull(Flowable.empty().ignoreElements().blockingGet()); + Flowable.empty().ignoreElements().blockingAwait(); } - @Test + @Test(timeout = 5000) public void testWithNonEmpty() { - assertNull(Flowable.just(1, 2, 3).ignoreElements().blockingGet()); + Flowable.just(1, 2, 3).ignoreElements().blockingAwait(); } @Test public void testUpstreamIsProcessedButIgnored() { final int num = 10; final AtomicInteger upstreamCount = new AtomicInteger(); - Object count = Flowable.range(1, num) + Flowable.range(1, num) .doOnNext(new Consumer() { @Override public void accept(Integer t) { @@ -166,9 +166,8 @@ public void accept(Integer t) { } }) .ignoreElements() - .blockingGet(); + .blockingAwait(); assertEquals(num, upstreamCount.get()); - assertNull(count); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java index 7e75e12a8d..b930349d18 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java @@ -97,19 +97,19 @@ public void run() { @Test public void testWithEmpty() { - assertNull(Observable.empty().ignoreElements().blockingGet()); + Observable.empty().ignoreElements().blockingAwait(); } @Test public void testWithNonEmpty() { - assertNull(Observable.just(1, 2, 3).ignoreElements().blockingGet()); + Observable.just(1, 2, 3).ignoreElements().blockingAwait(); } @Test public void testUpstreamIsProcessedButIgnored() { final int num = 10; final AtomicInteger upstreamCount = new AtomicInteger(); - Object count = Observable.range(1, num) + Observable.range(1, num) .doOnNext(new Consumer() { @Override public void accept(Integer t) { @@ -117,9 +117,8 @@ public void accept(Integer t) { } }) .ignoreElements() - .blockingGet(); + .blockingAwait(); assertEquals(num, upstreamCount.get()); - assertNull(count); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java index 7b3a891680..290d581364 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java @@ -259,20 +259,6 @@ public void timeoutOther() throws Exception { .assertResult(1); } - @Test - @SuppressWarnings("deprecation") - public void toCompletable() { - Single.just(1) - .toCompletable() - .test() - .assertResult(); - - Single.error(new TestException()) - .toCompletable() - .test() - .assertFailure(TestException.class); - } - @Test public void ignoreElement() { Single.just(1) diff --git a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java index d61d1a4dfe..f8090a27a9 100644 --- a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java +++ b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java @@ -324,7 +324,7 @@ public void onSubscribeThrowsCancelsUpstream() { final List errors = new ArrayList(); - pp.subscribe(new Consumer() { + pp.subscribe(new LambdaSubscriber(new Consumer() { @Override public void accept(Integer v) throws Exception { } @@ -342,7 +342,7 @@ public void run() throws Exception { public void accept(Subscription s) throws Exception { throw new TestException(); } - }); + })); assertFalse("Has observers?!", pp.hasSubscribers()); assertFalse("No errors?!", errors.isEmpty()); diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index fbeba59f91..be114d4495 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -2761,15 +2761,6 @@ public void blockingGet() { } } - @Test - public void toSingleDefault() { - Maybe.just(1).toSingle(100) - .test().assertResult(1); - - Maybe.empty().toSingle(100) - .test().assertResult(100); - } - @Test public void flatMapContinuation() { Maybe.just(1).flatMapCompletable(new Function() { diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index 4cd96d7932..71719bdbab 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -2202,17 +2202,6 @@ public void accept(Throwable e) { } }, null); } - @Test(expected = NullPointerException.class) - public void subscribeOnSubscribeNull() { - just1.subscribe(new Consumer() { - @Override - public void accept(Integer e) { } - }, new Consumer() { - @Override - public void accept(Throwable e) { } - }, Functions.EMPTY_ACTION, null); - } - @Test(expected = NullPointerException.class) public void subscribeNull() { just1.subscribe((Observer)null); diff --git a/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java index aebcf6c307..a9465d4916 100644 --- a/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java +++ b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java @@ -581,7 +581,7 @@ public void failSingleCompletableBlockingGet() { .doOnComplete(new Action() { @Override public void run() throws Exception { - Completable.complete().delay(10, TimeUnit.SECONDS).blockingGet(); + Completable.complete().delay(10, TimeUnit.SECONDS).blockingAwait(); } }) .test() diff --git a/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java index 3cc1605346..cb75c07366 100644 --- a/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java @@ -272,9 +272,6 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE)); addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class)); - // negative time is considered as zero time - addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class)); - // negative time is considered as zero time addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingAwait", Long.TYPE, TimeUnit.class));