diff --git a/src/jmh/java/io/reactivex/BlockingGetPerf.java b/src/jmh/java/io/reactivex/BlockingGetPerf.java
index e1f8204513..e9b2fb1bbe 100644
--- a/src/jmh/java/io/reactivex/BlockingGetPerf.java
+++ b/src/jmh/java/io/reactivex/BlockingGetPerf.java
@@ -78,7 +78,7 @@ public Object maybe() {
}
@Benchmark
- public Object completable() {
- return completable.blockingGet();
+ public void completable() {
+ completable.blockingAwait();
}
}
diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index 65c9e26918..fdeaaccf91 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -1232,52 +1232,6 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) {
return observer.blockingAwait(timeout, unit);
}
- /**
- * Subscribes to this Completable instance and blocks until it terminates, then returns null or
- * the emitted exception if any.
- *
- *
- *
- * - Scheduler:
- * - {@code blockingGet} does not operate by default on a particular {@link Scheduler}.
- *
- * @return the throwable if this terminated with an error, null otherwise
- * @throws RuntimeException that wraps an InterruptedException if the wait is interrupted
- */
- @Nullable
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.NONE)
- public final Throwable blockingGet() {
- BlockingMultiObserver observer = new BlockingMultiObserver();
- subscribe(observer);
- return observer.blockingGetError();
- }
-
- /**
- * Subscribes to this Completable instance and blocks until it terminates or the specified timeout
- * elapses, then returns null for normal termination or the emitted exception if any.
- *
- *
- *
- * - Scheduler:
- * - {@code blockingGet} does not operate by default on a particular {@link Scheduler}.
- *
- * @param timeout the timeout value
- * @param unit the time unit
- * @return the throwable if this terminated with an error, null otherwise
- * @throws RuntimeException that wraps an InterruptedException if the wait is interrupted or
- * TimeoutException if the specified timeout elapsed before it
- */
- @Nullable
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.NONE)
- public final Throwable blockingGet(long timeout, TimeUnit unit) {
- ObjectHelper.requireNonNull(unit, "unit is null");
- BlockingMultiObserver observer = new BlockingMultiObserver();
- subscribe(observer);
- return observer.blockingGetError(timeout, unit);
- }
-
/**
* Subscribes to this Completable only once, when the first CompletableObserver
* subscribes to the result Completable, caches its terminal event
diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java
index b6b0a21628..4ae2322d09 100644
--- a/src/main/java/io/reactivex/Flowable.java
+++ b/src/main/java/io/reactivex/Flowable.java
@@ -14686,8 +14686,7 @@ public final Flowable startWithArray(T... items) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
- return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
- Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
+ return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
/**
@@ -14716,8 +14715,7 @@ public final Disposable subscribe() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer super T> onNext) {
- return subscribe(onNext, Functions.ON_ERROR_MISSING,
- Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
+ return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
/**
@@ -14747,7 +14745,7 @@ public final Disposable subscribe(Consumer super T> onNext) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError) {
- return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
+ return subscribe(onNext, onError, Functions.EMPTY_ACTION);
}
/**
@@ -14782,51 +14780,11 @@ public final Disposable subscribe(Consumer super T> onNext, Consumer super T
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,
Action onComplete) {
- return subscribe(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
- }
-
- /**
- * Subscribes to a Publisher and provides callbacks to handle the items it emits and any error or
- * completion notification it issues.
- *
- * - Backpressure:
- * - The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
- * backpressure is applied to it).
- * - Scheduler:
- * - {@code subscribe} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param onNext
- * the {@code Consumer} you have designed to accept emissions from the Publisher
- * @param onError
- * the {@code Consumer} you have designed to accept any error notification from the
- * Publisher
- * @param onComplete
- * the {@code Action} you have designed to accept a completion notification from the
- * Publisher
- * @param onSubscribe
- * the {@code Consumer} that receives the upstream's Subscription
- * @return a {@link Disposable} reference with which the caller can stop receiving items before
- * the Publisher has finished sending them
- * @throws NullPointerException
- * if {@code onNext} is null, or
- * if {@code onError} is null, or
- * if {@code onComplete} is null, or
- * if {@code onSubscribe} is null
- * @see ReactiveX operators documentation: Subscribe
- */
- @CheckReturnValue
- @NonNull
- @BackpressureSupport(BackpressureKind.SPECIAL)
- @SchedulerSupport(SchedulerSupport.NONE)
- public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,
- Action onComplete, Consumer super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
- ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
- LambdaSubscriber ls = new LambdaSubscriber(onNext, onError, onComplete, onSubscribe);
+ LambdaSubscriber ls = new LambdaSubscriber(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java
index 49017334c1..f5c3085259 100644
--- a/src/main/java/io/reactivex/Maybe.java
+++ b/src/main/java/io/reactivex/Maybe.java
@@ -2489,13 +2489,9 @@ public final Single count() {
}
/**
- * Returns a Maybe that emits the item emitted by the source Maybe or a specified default item
+ * Returns a Single that emits the item emitted by the source Maybe or a specified default item
* if the source Maybe is empty.
*
- * Note that the result Maybe is semantically equivalent to a {@code Single}, since it's guaranteed
- * to emit exactly one item or an error. See {@link #toSingle(Object)} for a method with equivalent
- * behavior which returns a {@code Single}.
- *
*
*
* - Scheduler:
@@ -2504,16 +2500,16 @@ public final Single count() {
*
* @param defaultItem
* the item to emit if the source Maybe emits no items
- * @return a Maybe that emits either the specified default item if the source Maybe emits no
- * items, or the items emitted by the source Maybe
+ * @return a Single that emits either the specified default item if the source Maybe emits no
+ * item, or the item emitted by the source Maybe
* @see ReactiveX operators documentation: DefaultIfEmpty
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
- public final Maybe defaultIfEmpty(T defaultItem) {
+ public final Single defaultIfEmpty(T defaultItem) {
ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
- return switchIfEmpty(just(defaultItem));
+ return RxJavaPlugins.onAssembly(new MaybeToSingle(this, defaultItem));
}
/**
@@ -3619,25 +3615,6 @@ public final Observable toObservable() {
return RxJavaPlugins.onAssembly(new MaybeToObservable(this));
}
- /**
- * Converts this Maybe into a Single instance composing disposal
- * through and turning an empty Maybe into a Single that emits the given
- * value through onSuccess.
- *
- * - Scheduler:
- * - {@code toSingle} does not operate by default on a particular {@link Scheduler}.
- *
- * @param defaultValue the default item to signal in Single if this Maybe is empty
- * @return the new Single instance
- */
- @CheckReturnValue
- @NonNull
- @SchedulerSupport(SchedulerSupport.NONE)
- public final Single toSingle(T defaultValue) {
- ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
- return RxJavaPlugins.onAssembly(new MaybeToSingle(this, defaultValue));
- }
-
/**
* Converts this Maybe into a Single instance composing disposal
* through and turning an empty Maybe into a signal of NoSuchElementException.
diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java
index 9a92fb7f65..fa24976de9 100644
--- a/src/main/java/io/reactivex/Observable.java
+++ b/src/main/java/io/reactivex/Observable.java
@@ -12106,7 +12106,7 @@ public final Observable startWithArray(T... items) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
- return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
+ return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
/**
@@ -12131,7 +12131,7 @@ public final Disposable subscribe() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer super T> onNext) {
- return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
+ return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}
/**
@@ -12157,7 +12157,7 @@ public final Disposable subscribe(Consumer super T> onNext) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError) {
- return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
+ return subscribe(onNext, onError, Functions.EMPTY_ACTION);
}
/**
@@ -12188,46 +12188,11 @@ public final Disposable subscribe(Consumer super T> onNext, Consumer super T
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,
Action onComplete) {
- return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
- }
-
- /**
- * Subscribes to an ObservableSource and provides callbacks to handle the items it emits and any error or
- * completion notification it issues.
- *
- * - Scheduler:
- * - {@code subscribe} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param onNext
- * the {@code Consumer} you have designed to accept emissions from the ObservableSource
- * @param onError
- * the {@code Consumer} you have designed to accept any error notification from the
- * ObservableSource
- * @param onComplete
- * the {@code Action} you have designed to accept a completion notification from the
- * ObservableSource
- * @param onSubscribe
- * the {@code Consumer} that receives the upstream's Disposable
- * @return a {@link Disposable} reference with which the caller can stop receiving items before
- * the ObservableSource has finished sending them
- * @throws NullPointerException
- * if {@code onNext} is null, or
- * if {@code onError} is null, or
- * if {@code onComplete} is null, or
- * if {@code onSubscribe} is null
- * @see ReactiveX operators documentation: Subscribe
- */
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.NONE)
- public final Disposable subscribe(Consumer super T> onNext, Consumer super Throwable> onError,
- Action onComplete, Consumer super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
- ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
- LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);
+ LambdaObserver ls = new LambdaObserver(onNext, onError, onComplete, Functions.emptyConsumer());
subscribe(ls);
diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java
index bc2227829f..9de155b256 100644
--- a/src/main/java/io/reactivex/Single.java
+++ b/src/main/java/io/reactivex/Single.java
@@ -3850,29 +3850,6 @@ public final R to(@NonNull SingleConverter converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}
- /**
- * Returns a {@link Completable} that discards result of the {@link Single}
- * and calls {@code onComplete} when this source {@link Single} calls
- * {@code onSuccess}. Error terminal event is propagated.
- *
- *
- *
- * - Scheduler:
- * - {@code toCompletable} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @return a {@link Completable} that calls {@code onComplete} on it's subscriber when the source {@link Single}
- * calls {@code onSuccess}.
- * @since 2.0
- * @deprecated see {@link #ignoreElement()} instead, will be removed in 3.0
- */
- @CheckReturnValue
- @SchedulerSupport(SchedulerSupport.NONE)
- @Deprecated
- public final Completable toCompletable() {
- return RxJavaPlugins.onAssembly(new CompletableFromSingle(this));
- }
-
/**
* Returns a {@link Completable} that ignores the success value of this {@link Single}
* and calls {@code onComplete} instead on the returned {@code Completable}.
diff --git a/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java b/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java
index 2b5f5603e0..a0dcb7bb9d 100644
--- a/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java
+++ b/src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java
@@ -19,8 +19,6 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.util.*;
-import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
-
/**
* A combined Observer that awaits the success or error signal via a CountDownLatch.
* @param the value type
@@ -119,47 +117,6 @@ public T blockingGet(T defaultValue) {
return v != null ? v : defaultValue;
}
- /**
- * Block until the latch is counted down and return the error received or null if no
- * error happened.
- * @return the error received or null
- */
- public Throwable blockingGetError() {
- if (getCount() != 0) {
- try {
- BlockingHelper.verifyNonBlocking();
- await();
- } catch (InterruptedException ex) {
- dispose();
- return ex;
- }
- }
- return error;
- }
-
- /**
- * Block until the latch is counted down and return the error received or
- * when the wait is interrupted or times out, null otherwise.
- * @param timeout the timeout value
- * @param unit the time unit
- * @return the error received or null
- */
- public Throwable blockingGetError(long timeout, TimeUnit unit) {
- if (getCount() != 0) {
- try {
- BlockingHelper.verifyNonBlocking();
- if (!await(timeout, unit)) {
- dispose();
- throw ExceptionHelper.wrapOrThrow(new TimeoutException(timeoutMessage(timeout, unit)));
- }
- } catch (InterruptedException ex) {
- dispose();
- throw ExceptionHelper.wrapOrThrow(ex);
- }
- }
- return error;
- }
-
/**
* Block until the observer terminates and return true; return false if
* the wait times out.
diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java
index ef76bf9812..0898c6a362 100644
--- a/src/test/java/io/reactivex/completable/CompletableTest.java
+++ b/src/test/java/io/reactivex/completable/CompletableTest.java
@@ -1959,32 +1959,6 @@ public void run() {
Assert.assertEquals(1, calls.get());
}
- @Test(timeout = 5000)
- public void getNormal() {
- Assert.assertNull(normal.completable.blockingGet());
- }
-
- @Test(timeout = 5000)
- public void getError() {
- Assert.assertTrue(error.completable.blockingGet() instanceof TestException);
- }
-
- @Test(timeout = 5000)
- public void getTimeout() {
- try {
- Completable.never().blockingGet(100, TimeUnit.MILLISECONDS);
- } catch (RuntimeException ex) {
- if (!(ex.getCause() instanceof TimeoutException)) {
- Assert.fail("Wrong exception cause: " + ex.getCause());
- }
- }
- }
-
- @Test(expected = NullPointerException.class)
- public void getNullUnit() {
- normal.completable.blockingGet(1, null);
- }
-
@Test(expected = NullPointerException.class)
public void liftNull() {
normal.completable.lift(null);
@@ -2762,13 +2736,6 @@ public void subscribe(CompletableObserver observer) {
Assert.assertTrue(name.get().startsWith("RxComputation"));
}
- @Test(timeout = 5000)
- public void timeoutEmitError() {
- Throwable e = Completable.never().timeout(100, TimeUnit.MILLISECONDS).blockingGet();
-
- Assert.assertTrue(e instanceof TimeoutException);
- }
-
@Test(timeout = 5000)
public void timeoutSwitchNormal() {
Completable c = Completable.never().timeout(100, TimeUnit.MILLISECONDS, normal.completable);
diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java
index 159086048e..0a2c8c7a95 100644
--- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java
+++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java
@@ -2155,11 +2155,6 @@ public void subscribeOnCompleteNull() {
just1.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), null);
}
- @Test(expected = NullPointerException.class)
- public void subscribeOnSubscribeNull() {
- just1.subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION, null);
- }
-
@Test(expected = NullPointerException.class)
public void subscribeNull() {
just1.subscribe((Subscriber)null);
diff --git a/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java b/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java
index 793253504b..2e486fe67a 100644
--- a/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java
+++ b/src/test/java/io/reactivex/internal/observers/BlockingMultiObserverTest.java
@@ -13,16 +13,13 @@
package io.reactivex.internal.observers;
-import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
import static org.junit.Assert.*;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.junit.Test;
import io.reactivex.disposables.*;
-import io.reactivex.exceptions.TestException;
import io.reactivex.schedulers.Schedulers;
public class BlockingMultiObserverTest {
@@ -79,72 +76,4 @@ public void blockingGetDefaultInterrupt() {
Thread.interrupted();
}
}
-
- @Test
- public void blockingGetErrorInterrupt() {
- final BlockingMultiObserver bmo = new BlockingMultiObserver();
-
- Thread.currentThread().interrupt();
- try {
- assertTrue(bmo.blockingGetError() instanceof InterruptedException);
- } finally {
- Thread.interrupted();
- }
- }
-
- @Test
- public void blockingGetErrorTimeoutInterrupt() {
- final BlockingMultiObserver bmo = new BlockingMultiObserver();
-
- Thread.currentThread().interrupt();
- try {
- bmo.blockingGetError(1, TimeUnit.MINUTES);
- fail("Should have thrown");
- } catch (RuntimeException ex) {
- assertTrue(ex.getCause() instanceof InterruptedException);
- } finally {
- Thread.interrupted();
- }
- }
-
- @Test
- public void blockingGetErrorDelayed() {
- final BlockingMultiObserver bmo = new BlockingMultiObserver();
-
- Schedulers.single().scheduleDirect(new Runnable() {
- @Override
- public void run() {
- bmo.onError(new TestException());
- }
- }, 100, TimeUnit.MILLISECONDS);
-
- assertTrue(bmo.blockingGetError() instanceof TestException);
- }
-
- @Test
- public void blockingGetErrorTimeoutDelayed() {
- final BlockingMultiObserver bmo = new BlockingMultiObserver();
-
- Schedulers.single().scheduleDirect(new Runnable() {
- @Override
- public void run() {
- bmo.onError(new TestException());
- }
- }, 100, TimeUnit.MILLISECONDS);
-
- assertTrue(bmo.blockingGetError(1, TimeUnit.MINUTES) instanceof TestException);
- }
-
- @Test
- public void blockingGetErrorTimedOut() {
- final BlockingMultiObserver bmo = new BlockingMultiObserver();
-
- try {
- assertNull(bmo.blockingGetError(1, TimeUnit.NANOSECONDS));
- fail("Should have thrown");
- } catch (RuntimeException expected) {
- assertEquals(TimeoutException.class, expected.getCause().getClass());
- assertEquals(timeoutMessage(1, TimeUnit.NANOSECONDS), expected.getCause().getMessage());
- }
- }
}
diff --git a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java
index 81221cfc21..26c6e30bea 100644
--- a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java
+++ b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java
@@ -334,7 +334,7 @@ public void onSubscribeThrowsCancelsUpstream() {
final List errors = new ArrayList();
- ps.subscribe(new Consumer() {
+ ps.subscribe(new LambdaObserver(new Consumer() {
@Override
public void accept(Integer v) throws Exception {
}
@@ -352,7 +352,7 @@ public void run() throws Exception {
public void accept(Disposable d) throws Exception {
throw new TestException();
}
- });
+ }));
assertFalse("Has observers?!", ps.hasObservers());
assertFalse("No errors?!", errors.isEmpty());
diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java
index fb73306973..1a80afbebd 100644
--- a/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java
+++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableAwaitTest.java
@@ -19,8 +19,6 @@
import org.junit.Test;
-import io.reactivex.*;
-import io.reactivex.exceptions.TestException;
import io.reactivex.processors.PublishProcessor;
public class CompletableAwaitTest {
@@ -61,27 +59,4 @@ public void awaitTimeoutInterrupted() {
public void awaitTimeout() {
assertFalse(PublishProcessor.create().ignoreElements().blockingAwait(100, TimeUnit.MILLISECONDS));
}
-
- @Test
- public void blockingGet() {
- assertNull(Completable.complete().blockingGet());
- }
-
- @Test
- public void blockingGetTimeout() {
- assertNull(Completable.complete().blockingGet(1, TimeUnit.SECONDS));
- }
-
- @Test
- public void blockingGetError() {
- TestException ex = new TestException();
- assertSame(ex, Completable.error(ex).blockingGet());
- }
-
- @Test
- public void blockingGetErrorTimeout() {
- TestException ex = new TestException();
- assertSame(ex, Completable.error(ex).blockingGet(1, TimeUnit.SECONDS));
- }
-
}
diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java
index 4d2915fac5..6272ab2757 100644
--- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java
+++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java
@@ -144,21 +144,21 @@ public void onNext(Integer t) {
assertEquals(0, count.get());
}
- @Test
+ @Test(timeout = 5000)
public void testWithEmpty() {
- assertNull(Flowable.empty().ignoreElements().blockingGet());
+ Flowable.empty().ignoreElements().blockingAwait();
}
- @Test
+ @Test(timeout = 5000)
public void testWithNonEmpty() {
- assertNull(Flowable.just(1, 2, 3).ignoreElements().blockingGet());
+ Flowable.just(1, 2, 3).ignoreElements().blockingAwait();
}
@Test
public void testUpstreamIsProcessedButIgnored() {
final int num = 10;
final AtomicInteger upstreamCount = new AtomicInteger();
- Object count = Flowable.range(1, num)
+ Flowable.range(1, num)
.doOnNext(new Consumer() {
@Override
public void accept(Integer t) {
@@ -166,9 +166,8 @@ public void accept(Integer t) {
}
})
.ignoreElements()
- .blockingGet();
+ .blockingAwait();
assertEquals(num, upstreamCount.get());
- assertNull(count);
}
@Test
diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java
index 7e75e12a8d..b930349d18 100644
--- a/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java
+++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java
@@ -97,19 +97,19 @@ public void run() {
@Test
public void testWithEmpty() {
- assertNull(Observable.empty().ignoreElements().blockingGet());
+ Observable.empty().ignoreElements().blockingAwait();
}
@Test
public void testWithNonEmpty() {
- assertNull(Observable.just(1, 2, 3).ignoreElements().blockingGet());
+ Observable.just(1, 2, 3).ignoreElements().blockingAwait();
}
@Test
public void testUpstreamIsProcessedButIgnored() {
final int num = 10;
final AtomicInteger upstreamCount = new AtomicInteger();
- Object count = Observable.range(1, num)
+ Observable.range(1, num)
.doOnNext(new Consumer() {
@Override
public void accept(Integer t) {
@@ -117,9 +117,8 @@ public void accept(Integer t) {
}
})
.ignoreElements()
- .blockingGet();
+ .blockingAwait();
assertEquals(num, upstreamCount.get());
- assertNull(count);
}
@Test
diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java
index 7b3a891680..290d581364 100644
--- a/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java
+++ b/src/test/java/io/reactivex/internal/operators/single/SingleMiscTest.java
@@ -259,20 +259,6 @@ public void timeoutOther() throws Exception {
.assertResult(1);
}
- @Test
- @SuppressWarnings("deprecation")
- public void toCompletable() {
- Single.just(1)
- .toCompletable()
- .test()
- .assertResult();
-
- Single.error(new TestException())
- .toCompletable()
- .test()
- .assertFailure(TestException.class);
- }
-
@Test
public void ignoreElement() {
Single.just(1)
diff --git a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java
index d61d1a4dfe..f8090a27a9 100644
--- a/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java
+++ b/src/test/java/io/reactivex/internal/subscribers/LambdaSubscriberTest.java
@@ -324,7 +324,7 @@ public void onSubscribeThrowsCancelsUpstream() {
final List errors = new ArrayList();
- pp.subscribe(new Consumer() {
+ pp.subscribe(new LambdaSubscriber(new Consumer() {
@Override
public void accept(Integer v) throws Exception {
}
@@ -342,7 +342,7 @@ public void run() throws Exception {
public void accept(Subscription s) throws Exception {
throw new TestException();
}
- });
+ }));
assertFalse("Has observers?!", pp.hasSubscribers());
assertFalse("No errors?!", errors.isEmpty());
diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java
index fbeba59f91..be114d4495 100644
--- a/src/test/java/io/reactivex/maybe/MaybeTest.java
+++ b/src/test/java/io/reactivex/maybe/MaybeTest.java
@@ -2761,15 +2761,6 @@ public void blockingGet() {
}
}
- @Test
- public void toSingleDefault() {
- Maybe.just(1).toSingle(100)
- .test().assertResult(1);
-
- Maybe.empty().toSingle(100)
- .test().assertResult(100);
- }
-
@Test
public void flatMapContinuation() {
Maybe.just(1).flatMapCompletable(new Function() {
diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java
index 4cd96d7932..71719bdbab 100644
--- a/src/test/java/io/reactivex/observable/ObservableNullTests.java
+++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java
@@ -2202,17 +2202,6 @@ public void accept(Throwable e) { }
}, null);
}
- @Test(expected = NullPointerException.class)
- public void subscribeOnSubscribeNull() {
- just1.subscribe(new Consumer() {
- @Override
- public void accept(Integer e) { }
- }, new Consumer() {
- @Override
- public void accept(Throwable e) { }
- }, Functions.EMPTY_ACTION, null);
- }
-
@Test(expected = NullPointerException.class)
public void subscribeNull() {
just1.subscribe((Observer)null);
diff --git a/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java
index aebcf6c307..a9465d4916 100644
--- a/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java
+++ b/src/test/java/io/reactivex/schedulers/FailOnBlockingTest.java
@@ -581,7 +581,7 @@ public void failSingleCompletableBlockingGet() {
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
- Completable.complete().delay(10, TimeUnit.SECONDS).blockingGet();
+ Completable.complete().delay(10, TimeUnit.SECONDS).blockingAwait();
}
})
.test()
diff --git a/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java
index 3cc1605346..cb75c07366 100644
--- a/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java
+++ b/src/test/java/io/reactivex/validators/ParamValidationCheckerTest.java
@@ -272,9 +272,6 @@ public void checkParallelFlowable() {
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE));
addOverride(new ParamOverride(Completable.class, 0, ParamMode.NON_NEGATIVE, "retry", Long.TYPE, Predicate.class));
- // negative time is considered as zero time
- addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingGet", Long.TYPE, TimeUnit.class));
-
// negative time is considered as zero time
addOverride(new ParamOverride(Completable.class, 0, ParamMode.ANY, "blockingAwait", Long.TYPE, TimeUnit.class));