diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 4322afaa93..6260f3e206 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -2672,6 +2672,7 @@ public final Single defaultIfEmpty(@NonNull T defaultItem) { /** * Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a * specified delay. + * An error signal will not be delayed. *

* *

@@ -2682,17 +2683,68 @@ public final Single defaultIfEmpty(@NonNull T defaultItem) { * @param time * the delay to shift the source by * @param unit - * the {@link TimeUnit} in which {@code period} is defined + * the {@link TimeUnit} in which {@code time} is defined * @return the new {@code Maybe} instance * @throws NullPointerException if {@code unit} is {@code null} * @see ReactiveX operators documentation: Delay - * @see #delay(long, TimeUnit, Scheduler) + * @see #delay(long, TimeUnit, Scheduler, boolean) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.COMPUTATION) @NonNull public final Maybe delay(long time, @NonNull TimeUnit unit) { - return delay(time, unit, Schedulers.computation()); + return delay(time, unit, Schedulers.computation(), false); + } + + /** + * Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a + * specified delay. + *

+ * + *

+ *
Scheduler:
+ *
This version of {@code delay} operates by default on the {@code computation} {@link Scheduler}.
+ *
+ * + * @param time the delay to shift the source by + * @param unit the {@link TimeUnit} in which {@code time} is defined + * @param delayError if {@code true}, both success and error signals are delayed. if {@code false}, only success signals are delayed. + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code unit} is {@code null} + * @see ReactiveX operators documentation: Delay + * @see #delay(long, TimeUnit, Scheduler, boolean) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.COMPUTATION) + @NonNull + public final Maybe delay(long time, @NonNull TimeUnit unit, boolean delayError) { + return delay(time, unit, Schedulers.computation(), delayError); + } + + /** + * Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a + * specified delay. + * An error signal will not be delayed. + *

+ * + *

+ *
Scheduler:
+ *
you specify the {@link Scheduler} where the non-blocking wait and emission happens
+ *
+ * + * @param time the delay to shift the source by + * @param unit the {@link TimeUnit} in which {@code time} is defined + * @param scheduler the {@code Scheduler} to use for delaying + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} + * @see ReactiveX operators documentation: Delay + * @see #delay(long, TimeUnit, Scheduler, boolean) + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.CUSTOM) + @NonNull + public final Maybe delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { + return delay(time, unit, scheduler, false); } /** @@ -2708,9 +2760,10 @@ public final Maybe delay(long time, @NonNull TimeUnit unit) { * @param time * the delay to shift the source by * @param unit - * the time unit of {@code delay} + * the {@link TimeUnit} in which {@code time} is defined * @param scheduler * the {@code Scheduler} to use for delaying + * @param delayError if {@code true}, both success and error signals are delayed. if {@code false}, only success signals are delayed. * @return the new {@code Maybe} instance * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} * @see ReactiveX operators documentation: Delay @@ -2718,10 +2771,10 @@ public final Maybe delay(long time, @NonNull TimeUnit unit) { @CheckReturnValue @NonNull @SchedulerSupport(SchedulerSupport.CUSTOM) - public final Maybe delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) { + public final Maybe delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean delayError) { Objects.requireNonNull(unit, "unit is null"); Objects.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new MaybeDelay<>(this, Math.max(0L, time), unit, scheduler)); + return RxJavaPlugins.onAssembly(new MaybeDelay<>(this, Math.max(0L, time), unit, scheduler, delayError)); } /** diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelay.java index 6885763cbd..3e6922a339 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelay.java @@ -33,16 +33,19 @@ public final class MaybeDelay extends AbstractMaybeWithUpstream { final Scheduler scheduler; - public MaybeDelay(MaybeSource source, long delay, TimeUnit unit, Scheduler scheduler) { + final boolean delayError; + + public MaybeDelay(MaybeSource source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) { super(source); this.delay = delay; this.unit = unit; this.scheduler = scheduler; + this.delayError = delayError; } @Override protected void subscribeActual(MaybeObserver observer) { - source.subscribe(new DelayMaybeObserver<>(observer, delay, unit, scheduler)); + source.subscribe(new DelayMaybeObserver<>(observer, delay, unit, scheduler, delayError)); } static final class DelayMaybeObserver @@ -59,15 +62,18 @@ static final class DelayMaybeObserver final Scheduler scheduler; + final boolean delayError; + T value; Throwable error; - DelayMaybeObserver(MaybeObserver actual, long delay, TimeUnit unit, Scheduler scheduler) { + DelayMaybeObserver(MaybeObserver actual, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) { this.downstream = actual; this.delay = delay; this.unit = unit; this.scheduler = scheduler; + this.delayError = delayError; } @Override @@ -105,21 +111,21 @@ public void onSubscribe(Disposable d) { @Override public void onSuccess(T value) { this.value = value; - schedule(); + schedule(delay); } @Override public void onError(Throwable e) { this.error = e; - schedule(); + schedule(delayError ? delay : 0); } @Override public void onComplete() { - schedule(); + schedule(delay); } - void schedule() { + void schedule(long delay) { DisposableHelper.replace(this, scheduler.scheduleDirect(this, delay, unit)); } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelayTest.java index 747b96faf1..14448c3f2c 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelayTest.java @@ -96,4 +96,32 @@ public Maybe apply(Maybe f) throws Exception { } }); } + + @Test + public void delayedErrorOnSuccess() { + final TestScheduler scheduler = new TestScheduler(); + final TestObserver observer = Maybe.just(1) + .delay(5, TimeUnit.SECONDS, scheduler, true) + .test(); + + scheduler.advanceTimeTo(2, TimeUnit.SECONDS); + observer.assertNoValues(); + + scheduler.advanceTimeTo(5, TimeUnit.SECONDS); + observer.assertValue(1); + } + + @Test + public void delayedErrorOnError() { + final TestScheduler scheduler = new TestScheduler(); + final TestObserver observer = Maybe.error(new TestException()) + .delay(5, TimeUnit.SECONDS, scheduler, true) + .test(); + + scheduler.advanceTimeTo(2, TimeUnit.SECONDS); + observer.assertNoErrors(); + + scheduler.advanceTimeTo(5, TimeUnit.SECONDS); + observer.assertError(TestException.class); + } } diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index 403a83964a..7eff55d15e 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -289,6 +289,8 @@ public void checkParallelFlowable() { // negative time is considered as zero time addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class)); addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Boolean.TYPE)); + addOverride(new ParamOverride(Maybe.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE)); // zero repeat is allowed addOverride(new ParamOverride(Maybe.class, 0, ParamMode.NON_NEGATIVE, "repeat", Long.TYPE));