From aea20c70a6e22fbac7bd9c8297080ffa9ba9e389 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 26 Jan 2020 11:12:01 +0100 Subject: [PATCH 1/2] 3.x: Add Maybe/Completable toFuture --- .../reactivex/rxjava3/core/Completable.java | 22 +++++ .../io/reactivex/rxjava3/core/Flowable.java | 2 +- .../java/io/reactivex/rxjava3/core/Maybe.java | 23 +++++ .../io/reactivex/rxjava3/core/Observable.java | 2 +- .../io/reactivex/rxjava3/core/Single.java | 4 +- ...Observer.java => FutureMultiObserver.java} | 18 +++- .../completable/CompletableToFutureTest.java | 76 +++++++++++++++++ .../operators/maybe/MaybeToFutureTest.java | 84 +++++++++++++++++++ 8 files changed, 224 insertions(+), 7 deletions(-) rename src/main/java/io/reactivex/rxjava3/internal/observers/{FutureSingleObserver.java => FutureMultiObserver.java} (89%) create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFutureTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToFutureTest.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index e965855e00..abae9a7351 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -2892,6 +2892,28 @@ public final Flowable toFlowable() { } return RxJavaPlugins.onAssembly(new CompletableToFlowable<>(this)); } + /** + * Returns a {@link Future} representing the termination of the current {@code Completable} + * via a {@code null} value. + *

+ * + *

+ * Cancelling the {@code Future} will cancel the subscription to the current {@code Maybe}. + *

+ *
Scheduler:
+ *
{@code toFuture} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new {@code Future} instance + * @see ReactiveX documentation: To + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Future toFuture() { + return subscribeWith(new FutureMultiObserver<>()); + } /** * Converts this {@code Completable} into a {@link Maybe}. diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 3d83d477c5..a994e526c5 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -6249,7 +6249,7 @@ public final T blockingSingle(@NonNull T defaultItem) { /** * Returns a {@link Future} representing the only value emitted by this {@code Flowable}. *

- * + * *

* If the {@code Flowable} emits more than one item, {@link java.util.concurrent.Future} will receive an * {@link java.lang.IndexOutOfBoundsException}. If the {@code Flowable} is empty, {@link java.util.concurrent.Future} diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 03e73abe65..8b1b569aeb 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -4166,6 +4166,29 @@ public final Flowable toFlowable() { return RxJavaPlugins.onAssembly(new MaybeToFlowable<>(this)); } + /** + * Returns a {@link Future} representing the single value emitted by the current {@code Maybe} + * or {@code null} if the current {@code Maybe} is empty. + *

+ * + *

+ * Cancelling the {@code Future} will cancel the subscription to the current {@code Maybe}. + *

+ *
Scheduler:
+ *
{@code toFuture} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the new {@code Future} instance + * @see ReactiveX documentation: To + * @since 3.0.0 + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final Future toFuture() { + return subscribeWith(new FutureMultiObserver<>()); + } + /** * Converts this {@code Maybe} into an {@link Observable} instance composing disposal * through. diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 89de9dd63e..3ab0be1ab9 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -5654,7 +5654,7 @@ public final T blockingSingle(@NonNull T defaultItem) { /** * Returns a {@link Future} representing the only value emitted by the current {@code Observable}. *

- * + * *

* If the {@code Observable} emits more than one item, {@code Future} will receive an * {@link IndexOutOfBoundsException}. If the {@code Observable} is empty, {@code Future} diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 697c1355b5..55894e534c 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -4625,6 +4625,8 @@ public final Flowable toFlowable() { * Returns a {@link Future} representing the single value emitted by this {@code Single}. *

* + *

+ * Cancelling the {@code Future} will cancel the subscription to the current {@code Maybe}. *

*
Scheduler:
*
{@code toFuture} does not operate by default on a particular {@link Scheduler}.
@@ -4637,7 +4639,7 @@ public final Flowable toFlowable() { @SchedulerSupport(SchedulerSupport.NONE) @NonNull public final Future toFuture() { - return subscribeWith(new FutureSingleObserver<>()); + return subscribeWith(new FutureMultiObserver<>()); } /** diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/FutureSingleObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/FutureMultiObserver.java similarity index 89% rename from src/main/java/io/reactivex/rxjava3/internal/observers/FutureSingleObserver.java rename to src/main/java/io/reactivex/rxjava3/internal/observers/FutureMultiObserver.java index 3e55963bea..e2f2aeea5a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/observers/FutureSingleObserver.java +++ b/src/main/java/io/reactivex/rxjava3/internal/observers/FutureMultiObserver.java @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.annotations.NonNull; -import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; import io.reactivex.rxjava3.internal.util.BlockingHelper; @@ -31,15 +31,15 @@ * * @param the value type */ -public final class FutureSingleObserver extends CountDownLatch -implements SingleObserver, Future, Disposable { +public final class FutureMultiObserver extends CountDownLatch +implements MaybeObserver, SingleObserver, CompletableObserver, Future, Disposable { T value; Throwable error; final AtomicReference upstream; - public FutureSingleObserver() { + public FutureMultiObserver() { super(1); this.upstream = new AtomicReference<>(); } @@ -141,6 +141,16 @@ public void onError(Throwable t) { } } + @Override + public void onComplete() { + Disposable a = upstream.get(); + if (a == DisposableHelper.DISPOSED) { + return; + } + upstream.compareAndSet(a, this); + countDown(); + } + @Override public void dispose() { // ignoring as `this` means a finished Disposable only diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFutureTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFutureTest.java new file mode 100644 index 0000000000..7501a2fe85 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFutureTest.java @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.completable; + +import static org.junit.Assert.*; + +import java.util.concurrent.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.CompletableSubject; + +public class CompletableToFutureTest extends RxJavaTest { + + @Test + public void empty() throws Exception { + assertNull(Completable.complete() + .subscribeOn(Schedulers.computation()) + .toFuture() + .get()); + } + + @Test + public void error() throws InterruptedException { + try { + Completable.error(new TestException()) + .subscribeOn(Schedulers.computation()) + .toFuture() + .get(); + + fail("Should have thrown!"); + } catch (ExecutionException ex) { + assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + } + } + + @Test + public void cancel() { + CompletableSubject cs = CompletableSubject.create(); + + Future f = cs.toFuture(); + + assertTrue(cs.hasObservers()); + + f.cancel(true); + + assertFalse(cs.hasObservers()); + } + + @Test + public void cancel2() { + CompletableSubject cs = CompletableSubject.create(); + + Future f = cs.toFuture(); + + assertTrue(cs.hasObservers()); + + f.cancel(false); + + assertFalse(cs.hasObservers()); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToFutureTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToFutureTest.java new file mode 100644 index 0000000000..37723dc656 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToFutureTest.java @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import static org.junit.Assert.*; + +import java.util.concurrent.*; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.MaybeSubject; + +public class MaybeToFutureTest extends RxJavaTest { + + @Test + public void success() throws Exception { + assertEquals((Integer)1, Maybe.just(1) + .subscribeOn(Schedulers.computation()) + .toFuture() + .get()); + } + + @Test + public void empty() throws Exception { + assertNull(Maybe.empty() + .subscribeOn(Schedulers.computation()) + .toFuture() + .get()); + } + + @Test + public void error() throws InterruptedException { + try { + Maybe.error(new TestException()) + .subscribeOn(Schedulers.computation()) + .toFuture() + .get(); + + fail("Should have thrown!"); + } catch (ExecutionException ex) { + assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + } + } + + @Test + public void cancel() { + MaybeSubject ms = MaybeSubject.create(); + + Future f = ms.toFuture(); + + assertTrue(ms.hasObservers()); + + f.cancel(true); + + assertFalse(ms.hasObservers()); + } + + @Test + public void cancel2() { + MaybeSubject ms = MaybeSubject.create(); + + Future f = ms.toFuture(); + + assertTrue(ms.hasObservers()); + + f.cancel(false); + + assertFalse(ms.hasObservers()); + } +} From 39849320349bb34f9c97836176949eaa287f400d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 26 Jan 2020 11:16:10 +0100 Subject: [PATCH 2/2] Fix copy-paste mistakes --- src/main/java/io/reactivex/rxjava3/core/Completable.java | 2 +- src/main/java/io/reactivex/rxjava3/core/Single.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Completable.java b/src/main/java/io/reactivex/rxjava3/core/Completable.java index abae9a7351..1c6214e1da 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Completable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Completable.java @@ -2898,7 +2898,7 @@ public final Flowable toFlowable() { *

* *

- * Cancelling the {@code Future} will cancel the subscription to the current {@code Maybe}. + * Cancelling the {@code Future} will cancel the subscription to the current {@code Completable}. *

*
Scheduler:
*
{@code toFuture} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 55894e534c..a283d9eef0 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -4626,7 +4626,7 @@ public final Flowable toFlowable() { *

* *

- * Cancelling the {@code Future} will cancel the subscription to the current {@code Maybe}. + * Cancelling the {@code Future} will cancel the subscription to the current {@code Single}. *

*
Scheduler:
*
{@code toFuture} does not operate by default on a particular {@link Scheduler}.