Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Various small API changes and removals #6517

Merged
merged 1 commit into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/jmh/java/io/reactivex/BlockingGetPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Object maybe() {
}

@Benchmark
public Object completable() {
return completable.blockingGet();
public void completable() {
completable.blockingAwait();
}
}
46 changes: 0 additions & 46 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1232,52 +1232,6 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) {
return observer.blockingAwait(timeout, unit);
}

/**
* Subscribes to this Completable instance and blocks until it terminates, then returns null or
* the emitted exception if any.
* <p>
* <img width="640" height="435" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingGet.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the throwable if this terminated with an error, null otherwise
* @throws RuntimeException that wraps an InterruptedException if the wait is interrupted
*/
@Nullable
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Throwable blockingGet() {
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
subscribe(observer);
return observer.blockingGetError();
}

/**
* Subscribes to this Completable instance and blocks until it terminates or the specified timeout
* elapses, then returns null for normal termination or the emitted exception if any.
* <p>
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingGet.t.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param timeout the timeout value
* @param unit the time unit
* @return the throwable if this terminated with an error, null otherwise
* @throws RuntimeException that wraps an InterruptedException if the wait is interrupted or
* TimeoutException if the specified timeout elapsed before it
*/
@Nullable
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Throwable blockingGet(long timeout, TimeUnit unit) {
ObjectHelper.requireNonNull(unit, "unit is null");
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
subscribe(observer);
return observer.blockingGetError(timeout, unit);
}

/**
* Subscribes to this Completable only once, when the first CompletableObserver
* subscribes to the result Completable, caches its terminal event
Expand Down
50 changes: 4 additions & 46 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14686,8 +14686,7 @@ public final Flowable<T> startWithArray(T... items) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -14716,8 +14715,7 @@ public final Disposable subscribe() {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -14747,7 +14745,7 @@ public final Disposable subscribe(Consumer<? super T> onNext) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
return subscribe(onNext, onError, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -14782,51 +14780,11 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
return subscribe(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
}

/**
* Subscribes to a Publisher and provides callbacks to handle the items it emits and any error or
* completion notification it issues.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
* backpressure is applied to it).</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onNext
* the {@code Consumer<T>} you have designed to accept emissions from the Publisher
* @param onError
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the
* Publisher
* @param onComplete
* the {@code Action} you have designed to accept a completion notification from the
* Publisher
* @param onSubscribe
* the {@code Consumer} that receives the upstream's Subscription
* @return a {@link Disposable} reference with which the caller can stop receiving items before
* the Publisher has finished sending them
* @throws NullPointerException
* if {@code onNext} is null, or
* if {@code onError} is null, or
* if {@code onComplete} is null, or
* if {@code onSubscribe} is null
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);

subscribe(ls);

Expand Down
33 changes: 5 additions & 28 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2489,13 +2489,9 @@ public final Single<Long> count() {
}

/**
* Returns a Maybe that emits the item emitted by the source Maybe or a specified default item
* Returns a Single that emits the item emitted by the source Maybe or a specified default item
* if the source Maybe is empty.
* <p>
* Note that the result Maybe is semantically equivalent to a {@code Single}, since it's guaranteed
* to emit exactly one item or an error. See {@link #toSingle(Object)} for a method with equivalent
* behavior which returns a {@code Single}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
Expand All @@ -2504,16 +2500,16 @@ public final Single<Long> count() {
*
* @param defaultItem
* the item to emit if the source Maybe emits no items
* @return a Maybe that emits either the specified default item if the source Maybe emits no
* items, or the items emitted by the source Maybe
* @return a Single that emits either the specified default item if the source Maybe emits no
* item, or the item emitted by the source Maybe
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> defaultIfEmpty(T defaultItem) {
public final Single<T> defaultIfEmpty(T defaultItem) {
ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
return switchIfEmpty(just(defaultItem));
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, defaultItem));
}

/**
Expand Down Expand Up @@ -3619,25 +3615,6 @@ public final Observable<T> toObservable() {
return RxJavaPlugins.onAssembly(new MaybeToObservable<T>(this));
}

/**
* Converts this Maybe into a Single instance composing disposal
* through and turning an empty Maybe into a Single that emits the given
* value through onSuccess.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultValue the default item to signal in Single if this Maybe is empty
* @return the new Single instance
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> toSingle(T defaultValue) {
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, defaultValue));
}

/**
* Converts this Maybe into a Single instance composing disposal
* through and turning an empty Maybe into a signal of NoSuchElementException.
Expand Down
43 changes: 4 additions & 39 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12106,7 +12106,7 @@ public final Observable<T> startWithArray(T... items) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand All @@ -12131,7 +12131,7 @@ public final Disposable subscribe() {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}

/**
Expand All @@ -12157,7 +12157,7 @@ public final Disposable subscribe(Consumer<? super T> onNext) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
return subscribe(onNext, onError, Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -12188,46 +12188,11 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}

/**
* Subscribes to an ObservableSource and provides callbacks to handle the items it emits and any error or
* completion notification it issues.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onNext
* the {@code Consumer<T>} you have designed to accept emissions from the ObservableSource
* @param onError
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the
* ObservableSource
* @param onComplete
* the {@code Action} you have designed to accept a completion notification from the
* ObservableSource
* @param onSubscribe
* the {@code Consumer} that receives the upstream's Disposable
* @return a {@link Disposable} reference with which the caller can stop receiving items before
* the ObservableSource has finished sending them
* @throws NullPointerException
* if {@code onNext} is null, or
* if {@code onError} is null, or
* if {@code onComplete} is null, or
* if {@code onSubscribe} is null
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, Functions.emptyConsumer());

subscribe(ls);

Expand Down
23 changes: 0 additions & 23 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -3850,29 +3850,6 @@ public final <R> R to(@NonNull SingleConverter<T, ? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}

/**
* Returns a {@link Completable} that discards result of the {@link Single}
* and calls {@code onComplete} when this source {@link Single} calls
* {@code onSuccess}. Error terminal event is propagated.
* <p>
* <img width="640" height="436" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toCompletable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Completable} that calls {@code onComplete} on it's subscriber when the source {@link Single}
* calls {@code onSuccess}.
* @since 2.0
* @deprecated see {@link #ignoreElement()} instead, will be removed in 3.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@Deprecated
public final Completable toCompletable() {
return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(this));
}

/**
* Returns a {@link Completable} that ignores the success value of this {@link Single}
* and calls {@code onComplete} instead on the returned {@code Completable}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.util.*;

import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;

/**
* A combined Observer that awaits the success or error signal via a CountDownLatch.
* @param <T> the value type
Expand Down Expand Up @@ -119,47 +117,6 @@ public T blockingGet(T defaultValue) {
return v != null ? v : defaultValue;
}

/**
* Block until the latch is counted down and return the error received or null if no
* error happened.
* @return the error received or null
*/
public Throwable blockingGetError() {
if (getCount() != 0) {
try {
BlockingHelper.verifyNonBlocking();
await();
} catch (InterruptedException ex) {
dispose();
return ex;
}
}
return error;
}

/**
* Block until the latch is counted down and return the error received or
* when the wait is interrupted or times out, null otherwise.
* @param timeout the timeout value
* @param unit the time unit
* @return the error received or null
*/
public Throwable blockingGetError(long timeout, TimeUnit unit) {
if (getCount() != 0) {
try {
BlockingHelper.verifyNonBlocking();
if (!await(timeout, unit)) {
dispose();
throw ExceptionHelper.wrapOrThrow(new TimeoutException(timeoutMessage(timeout, unit)));
}
} catch (InterruptedException ex) {
dispose();
throw ExceptionHelper.wrapOrThrow(ex);
}
}
return error;
}

/**
* Block until the observer terminates and return true; return false if
* the wait times out.
Expand Down
Loading