Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add Maybe/Completable toFuture #6875

Merged
merged 2 commits into from
Jan 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2892,6 +2892,28 @@ public final <T> Flowable<T> toFlowable() {
}
return RxJavaPlugins.onAssembly(new CompletableToFlowable<>(this));
}
/**
* Returns a {@link Future} representing the termination of the current {@code Completable}
* via a {@code null} value.
* <p>
* <img width="640" height="433" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/Completable.toFuture.png" alt="">
* <p>
* Cancelling the {@code Future} will cancel the subscription to the current {@code Completable}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new {@code Future} instance
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Future<Void> toFuture() {
return subscribeWith(new FutureMultiObserver<>());
}

/**
* Converts this {@code Completable} into a {@link Maybe}.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6249,7 +6249,7 @@ public final T blockingSingle(@NonNull T defaultItem) {
/**
* Returns a {@link Future} representing the only value emitted by this {@code Flowable}.
* <p>
* <img width="640" height="324" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/Flowable.toFuture.png" alt="">
* <img width="640" height="311" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/Flowable.toFuture.png" alt="">
* <p>
* If the {@code Flowable} emits more than one item, {@link java.util.concurrent.Future} will receive an
* {@link java.lang.IndexOutOfBoundsException}. If the {@code Flowable} is empty, {@link java.util.concurrent.Future}
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -4166,6 +4166,29 @@ public final Flowable<T> toFlowable() {
return RxJavaPlugins.onAssembly(new MaybeToFlowable<>(this));
}

/**
* Returns a {@link Future} representing the single value emitted by the current {@code Maybe}
* or {@code null} if the current {@code Maybe} is empty.
* <p>
* <img width="640" height="292" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/Maybe.toFuture.png" alt="">
* <p>
* Cancelling the {@code Future} will cancel the subscription to the current {@code Maybe}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the new {@code Future} instance
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Future<T> toFuture() {
return subscribeWith(new FutureMultiObserver<>());
}

/**
* Converts this {@code Maybe} into an {@link Observable} instance composing disposal
* through.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5654,7 +5654,7 @@ public final T blockingSingle(@NonNull T defaultItem) {
/**
* Returns a {@link Future} representing the only value emitted by the current {@code Observable}.
* <p>
* <img width="640" height="312" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/toFuture.o.png" alt="">
* <img width="640" height="299" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/toFuture.o.png" alt="">
* <p>
* If the {@code Observable} emits more than one item, {@code Future} will receive an
* {@link IndexOutOfBoundsException}. If the {@code Observable} is empty, {@code Future}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -4625,6 +4625,8 @@ public final Flowable<T> toFlowable() {
* Returns a {@link Future} representing the single value emitted by this {@code Single}.
* <p>
* <img width="640" height="467" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/Single.toFuture.png" alt="">
* <p>
* Cancelling the {@code Future} will cancel the subscription to the current {@code Single}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toFuture} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -4637,7 +4639,7 @@ public final Flowable<T> toFlowable() {
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Future<T> toFuture() {
return subscribeWith(new FutureSingleObserver<>());
return subscribeWith(new FutureMultiObserver<>());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.util.BlockingHelper;
Expand All @@ -31,15 +31,15 @@
*
* @param <T> the value type
*/
public final class FutureSingleObserver<T> extends CountDownLatch
implements SingleObserver<T>, Future<T>, Disposable {
public final class FutureMultiObserver<T> extends CountDownLatch
implements MaybeObserver<T>, SingleObserver<T>, CompletableObserver, Future<T>, Disposable {

T value;
Throwable error;

final AtomicReference<Disposable> upstream;

public FutureSingleObserver() {
public FutureMultiObserver() {
super(1);
this.upstream = new AtomicReference<>();
}
Expand Down Expand Up @@ -141,6 +141,16 @@ public void onError(Throwable t) {
}
}

@Override
public void onComplete() {
Disposable a = upstream.get();
if (a == DisposableHelper.DISPOSED) {
return;
}
upstream.compareAndSet(a, this);
countDown();
}

@Override
public void dispose() {
// ignoring as `this` means a finished Disposable only
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.completable;

import static org.junit.Assert.*;

import java.util.concurrent.*;

import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.CompletableSubject;

public class CompletableToFutureTest extends RxJavaTest {

@Test
public void empty() throws Exception {
assertNull(Completable.complete()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}

@Test
public void error() throws InterruptedException {
try {
Completable.error(new TestException())
.subscribeOn(Schedulers.computation())
.toFuture()
.get();

fail("Should have thrown!");
} catch (ExecutionException ex) {
assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException);
}
}

@Test
public void cancel() {
CompletableSubject cs = CompletableSubject.create();

Future<Void> f = cs.toFuture();

assertTrue(cs.hasObservers());

f.cancel(true);

assertFalse(cs.hasObservers());
}

@Test
public void cancel2() {
CompletableSubject cs = CompletableSubject.create();

Future<Void> f = cs.toFuture();

assertTrue(cs.hasObservers());

f.cancel(false);

assertFalse(cs.hasObservers());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.maybe;

import static org.junit.Assert.*;

import java.util.concurrent.*;

import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.MaybeSubject;

public class MaybeToFutureTest extends RxJavaTest {

@Test
public void success() throws Exception {
assertEquals((Integer)1, Maybe.just(1)
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}

@Test
public void empty() throws Exception {
assertNull(Maybe.empty()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}

@Test
public void error() throws InterruptedException {
try {
Maybe.error(new TestException())
.subscribeOn(Schedulers.computation())
.toFuture()
.get();

fail("Should have thrown!");
} catch (ExecutionException ex) {
assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException);
}
}

@Test
public void cancel() {
MaybeSubject<Integer> ms = MaybeSubject.create();

Future<Integer> f = ms.toFuture();

assertTrue(ms.hasObservers());

f.cancel(true);

assertFalse(ms.hasObservers());
}

@Test
public void cancel2() {
MaybeSubject<Integer> ms = MaybeSubject.create();

Future<Integer> f = ms.toFuture();

assertTrue(ms.hasObservers());

f.cancel(false);

assertFalse(ms.hasObservers());
}
}