From f41fb2ee336f37ecf0c0990595fa40e0ef834d54 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 25 Jan 2020 14:13:11 +0100 Subject: [PATCH] 3.x: Add Maybe.dematerialize --- .../java/io/reactivex/rxjava3/core/Maybe.java | 41 ++++++ .../io/reactivex/rxjava3/core/Single.java | 2 +- .../operators/maybe/MaybeDematerialize.java | 108 ++++++++++++++++ .../maybe/MaybeDematerializeTest.java | 122 ++++++++++++++++++ 4 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerialize.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerializeTest.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 8699d60e16..a9435534f6 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -2893,6 +2893,47 @@ public final Single defaultIfEmpty(@NonNull T defaultItem) { return RxJavaPlugins.onAssembly(new MaybeToSingle<>(this, defaultItem)); } + /** + * Maps the {@link Notification} success value of the current {@code Maybe} back into normal + * {@code onSuccess}, {@code onError} or {@code onComplete} signals. + *

+ * + *

+ * The intended use of the {@code selector} function is to perform a + * type-safe identity mapping (see example) on a source that is already of type + * {@code Notification}. The Java language doesn't allow + * limiting instance methods to a certain generic argument shape, therefore, + * a function is used to ensure the conversion remains type safe. + *

+ * Regular {@code onError} or {@code onComplete} signals from the current {@code Maybe} are passed along to the downstream. + *

+ *
Scheduler:
+ *
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ *

+ * Example: + *


+     * Maybe.just(Notification.createOnNext(1))
+     * .dematerialize(notification -> notification)
+     * .test()
+     * .assertResult(1);
+     * 
+ * @param the result type + * @param selector the function called with the success item and should + * return a {@code Notification} instance. + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code selector} is {@code null} + * @since 3.0.0 + * @see #materialize() + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final <@NonNull R> Maybe dematerialize(@NonNull Function> selector) { + Objects.requireNonNull(selector, "selector is null"); + return RxJavaPlugins.onAssembly(new MaybeDematerialize<>(this, selector)); + } + /** * Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a * specified delay. diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 719719520f..3bfcd78a9c 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -2511,7 +2511,7 @@ public final Single delaySubscription(long time, @NonNull TimeUnit unit, @Non } /** - * Maps the {@link Notification} success value of this {@code Single} back into normal + * Maps the {@link Notification} success value of the current {@code Single} back into normal * {@code onSuccess}, {@code onError} or {@code onComplete} signals as a * {@link Maybe} source. *

diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerialize.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerialize.java new file mode 100644 index 0000000000..d5d3ed25cd --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerialize.java @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; + +import java.util.Objects; + +/** + * Maps the success value of the source to a Notification, then + * maps it back to the corresponding signal type. + *

History: 2.2.4 - experimental + * @param the element type of the source + * @param the element type of the Notification and result + * @since 3.0.0 + */ +public final class MaybeDematerialize extends AbstractMaybeWithUpstream { + + final Function> selector; + + public MaybeDematerialize(Maybe source, Function> selector) { + super(source); + this.selector = selector; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new DematerializeObserver<>(observer, selector)); + } + + static final class DematerializeObserver implements MaybeObserver, Disposable { + + final MaybeObserver downstream; + + final Function> selector; + + Disposable upstream; + + DematerializeObserver(MaybeObserver downstream, + Function> selector) { + this.downstream = downstream; + this.selector = selector; + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(upstream, d)) { + upstream = d; + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(T t) { + Notification notification; + + try { + notification = Objects.requireNonNull(selector.apply(t), "The selector returned a null Notification"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + if (notification.isOnNext()) { + downstream.onSuccess(notification.getValue()); + } else if (notification.isOnComplete()) { + downstream.onComplete(); + } else { + downstream.onError(notification.getError()); + } + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerializeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerializeTest.java new file mode 100644 index 0000000000..5c75a92a10 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDematerializeTest.java @@ -0,0 +1,122 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import static org.mockito.Mockito.*; +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.functions.Functions; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class MaybeDematerializeTest extends RxJavaTest { + + @Test + public void success() { + Maybe.just(Notification.createOnNext(1)) + .dematerialize(Functions.>identity()) + .test() + .assertResult(1); + } + + @Test + public void empty() { + Maybe.just(Notification.createOnComplete()) + .dematerialize(Functions.>identity()) + .test() + .assertResult(); + } + + @Test + public void emptySource() throws Throwable { + @SuppressWarnings("unchecked") + Function, Notification> function = mock(Function.class); + + Maybe.>empty() + .dematerialize(function) + .test() + .assertResult(); + + verify(function, never()).apply(any()); + } + + @Test + public void error() { + Maybe.>error(new TestException()) + .dematerialize(Functions.>identity()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorNotification() { + Maybe.just(Notification.createOnError(new TestException())) + .dematerialize(Functions.>identity()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(new Function, MaybeSource>() { + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public MaybeSource apply(Maybe v) throws Exception { + return v.dematerialize((Function)Functions.identity()); + } + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(MaybeSubject.>create().dematerialize(Functions.>identity())); + } + + @Test + public void selectorCrash() { + Maybe.just(Notification.createOnNext(1)) + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void selectorNull() { + Maybe.just(Notification.createOnNext(1)) + .dematerialize(Functions.justFunction((Notification)null)) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void selectorDifferentType() { + Maybe.just(Notification.createOnNext(1)) + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + return Notification.createOnNext("Value-" + 1); + } + }) + .test() + .assertResult("Value-1"); + } +}