diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 8b1b569aeb..b2a6465eae 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -3472,7 +3472,7 @@ public final Maybe doOnSuccess(@NonNull Consumer onSuccess) { * Filters the success item of the {@code Maybe} via a predicate function and emitting it if the predicate * returns {@code true}, completing otherwise. *

- * + * *

*
Scheduler:
*
{@code filter} does not operate by default on a particular {@link Scheduler}.
@@ -4098,10 +4098,10 @@ public final Maybe observeOn(@NonNull Scheduler scheduler) { } /** - * Filters the items emitted by a {@code Maybe}, only emitting its success value if that + * Filters the items emitted by the current {@code Maybe}, only emitting its success value if that * is an instance of the supplied {@link Class}. *

- * + * *

*
Scheduler:
*
{@code ofType} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index a283d9eef0..12400b1fd0 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -3439,6 +3439,30 @@ public final Single contains(@NonNull Object item, @NonNull BiPredicate public final Flowable mergeWith(@NonNull SingleSource other) { return merge(this, other); } + /** + * Filters the items emitted by the current {@code Single}, only emitting its success value if that + * is an instance of the supplied {@link Class}. + *

+ * + *

+ *
Scheduler:
+ *
{@code ofType} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the output type + * @param clazz + * the class type to filter the items emitted by the current {@code Single} + * @return the new {@link Maybe} instance + * @throws NullPointerException if {@code clazz} is {@code null} + * @see ReactiveX operators documentation: Filter + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe ofType(@NonNull Class clazz) { + Objects.requireNonNull(clazz, "clazz is null"); + return filter(Functions.isInstanceOf(clazz)).cast(clazz); + } /** * Signals the success item or the terminal signals of the current {@code Single} on the specified {@link Scheduler}, diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleOfTypeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleOfTypeTest.java new file mode 100644 index 0000000000..3ce3a0d15e --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleOfTypeTest.java @@ -0,0 +1,96 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleOfTypeTest extends RxJavaTest { + + @Test + public void normal() { + Single.just(1).ofType(Integer.class) + .test() + .assertResult(1); + } + + @Test + public void normalDowncast() { + TestObserver to = Single.just(1) + .ofType(Number.class) + .test(); + // don't make this fluent, target type required! + to.assertResult((Number)1); + } + + @Test + public void notInstance() { + TestObserver to = Single.just(1) + .ofType(String.class) + .test(); + // don't make this fluent, target type required! + to.assertResult(); + } + + @Test + public void error() { + TestObserver to = Single.error(new TestException()) + .ofType(Number.class) + .test(); + // don't make this fluent, target type required! + to.assertFailure(TestException.class); + } + + @Test + public void errorNotInstance() { + TestObserver to = Single.error(new TestException()) + .ofType(String.class) + .test(); + // don't make this fluent, target type required! + to.assertFailure(TestException.class); + } + + @Test + public void dispose() { + TestHelper.checkDisposedSingleToMaybe(new Function, Maybe>() { + @Override + public Maybe apply(Single m) throws Exception { + return m.ofType(Object.class); + } + }); + } + + @Test + public void isDisposed() { + PublishProcessor pp = PublishProcessor.create(); + + TestHelper.checkDisposed(pp.singleElement().ofType(Object.class)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function, Maybe>() { + @Override + public Maybe apply(Single f) throws Exception { + return f.ofType(Object.class); + } + }); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java index f50354b6a5..bb6608e96d 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java @@ -2248,6 +2248,30 @@ public static void checkDisposedMaybeToSingle(Function, ? extend assertFalse(pp.hasSubscribers()); } + /** + * Check if the operator applied to a Maybe source propagates dispose properly. + * @param the source value type + * @param the output value type + * @param composer the function to apply an operator to the provided Maybe source + */ + public static void checkDisposedSingleToMaybe(Function, ? extends MaybeSource> composer) { + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = new TestSubscriber<>(); + + try { + new MaybeToFlowable<>(composer.apply(pp.singleOrError())).subscribe(ts); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + /** * Check if the TestSubscriber has a CompositeException with the specified class * of Throwables in the given order.