diff --git a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java index a300cb02ce..80eda093c4 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java +++ b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java @@ -81,7 +81,7 @@ * underlying task-execution scheme supports stopping and restarting itself. *
* If the {@code Scheduler} is shut down or a {@code Worker} is disposed, the {@code schedule} methods
- * should return the {@link io.reactivex.rxjava3.disposables.Disposables#disposed()} singleton instance indicating the shut down/disposed
+ * should return the {@link Disposable#disposed()} singleton instance indicating the shut down/disposed
* state to the caller. Since the shutdown or dispose can happen from any thread, the {@code schedule} implementations
* should make best effort to cancel tasks immediately after those tasks have been submitted to the
* underlying task-execution scheme if the shutdown/dispose was detected after this submission.
@@ -349,7 +349,7 @@ public
+ * The {@code Future} is cancelled with {@code mayInterruptIfRunning == true}.
+ * @param future the Future to wrap
+ * @return the new Disposable instance
+ * @see #fromFuture(Future, boolean)
+ * @since 3.0.0
+ */
+ @NonNull
+ static Disposable fromFuture(@NonNull Future> future) {
+ Objects.requireNonNull(future, "future is null");
+ return fromFuture(future, true);
+ }
+
+ /**
+ * Construct a {@code Disposable} by wrapping a {@link Future} that is
+ * cancelled exactly once when the {@code Disposable} is disposed.
+ * @param future the Future to wrap
+ * @param allowInterrupt if true, the future cancel happens via {@code Future.cancel(true)}
+ * @return the new Disposable instance
+ * @since 3.0.0
+ */
+ @NonNull
+ static Disposable fromFuture(@NonNull Future> future, boolean allowInterrupt) {
+ Objects.requireNonNull(future, "future is null");
+ return new FutureDisposable(future, allowInterrupt);
+ }
+
+ /**
+ * Construct a {@code Disposable} by wrapping a {@link Subscription} that is
+ * cancelled exactly once when the {@code Disposable} is disposed.
+ * @param subscription the Runnable to wrap
+ * @return the new Disposable instance
+ * @since 3.0.0
+ */
+ @NonNull
+ static Disposable fromSubscription(@NonNull Subscription subscription) {
+ Objects.requireNonNull(subscription, "subscription is null");
+ return new SubscriptionDisposable(subscription);
+ }
+
+ /**
+ * Construct a {@code Disposable} by wrapping an {@link AutoCloseable} that is
+ * closed exactly once when the {@code Disposable} is disposed.
+ * @param autoCloseable the AutoCloseable to wrap
+ * @return the new Disposable instance
+ * @since 3.0.0
+ */
+ @NonNull
+ static Disposable fromAutoCloseable(@NonNull AutoCloseable autoCloseable) {
+ Objects.requireNonNull(autoCloseable, "autoCloseable is null");
+ return new AutoCloseableDisposable(autoCloseable);
+ }
+
+ /**
+ * Construct an {@link AutoCloseable} by wrapping a {@code Disposable} that is
+ * disposed when the returned {@code AutoCloseable} is closed.
+ * @param disposable the Disposable instance
+ * @return the new AutoCloseable instance
+ * @since 3.0.0
+ */
+ @NonNull
+ static AutoCloseable toAutoCloseable(@NonNull Disposable disposable) {
+ return disposable::dispose;
+ }
+
+ /**
+ * Returns a new, non-disposed {@code Disposable} instance.
+ * @return a new, non-disposed {@code Disposable} instance
+ * @since 3.0.0
+ */
+ @NonNull
+ static Disposable empty() {
+ return fromRunnable(Functions.EMPTY_RUNNABLE);
+ }
+
+ /**
+ * Returns a shared, disposed {@code Disposable} instance.
+ * @return a shared, disposed {@code Disposable} instance
+ * @since 3.0.0
+ */
+ @NonNull
+ static Disposable disposed() {
+ return EmptyDisposable.INSTANCE;
+ }
}
diff --git a/src/main/java/io/reactivex/rxjava3/disposables/Disposables.java b/src/main/java/io/reactivex/rxjava3/disposables/Disposables.java
deleted file mode 100644
index 39b1b76b25..0000000000
--- a/src/main/java/io/reactivex/rxjava3/disposables/Disposables.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Copyright (c) 2016-present, RxJava Contributors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is
- * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
- * the License for the specific language governing permissions and limitations under the License.
- */
-
-package io.reactivex.rxjava3.disposables;
-
-import java.util.Objects;
-import java.util.concurrent.Future;
-
-import org.reactivestreams.Subscription;
-
-import io.reactivex.rxjava3.annotations.NonNull;
-import io.reactivex.rxjava3.functions.Action;
-import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
-import io.reactivex.rxjava3.internal.functions.*;
-
-/**
- * Utility class to help create disposables by wrapping
- * other types.
- * @since 2.0
- */
-public final class Disposables {
- /** Utility class. */
- private Disposables() {
- throw new IllegalStateException("No instances!");
- }
-
- /**
- * Construct a {@code Disposable} by wrapping a {@link Runnable} that is
- * executed exactly once when the {@code Disposable} is disposed.
- * @param run the Runnable to wrap
- * @return the new Disposable instance
- */
- @NonNull
- public static Disposable fromRunnable(@NonNull Runnable run) {
- Objects.requireNonNull(run, "run is null");
- return new RunnableDisposable(run);
- }
-
- /**
- * Construct a {@code Disposable} by wrapping a {@link Action} that is
- * executed exactly once when the {@code Disposable} is disposed.
- * @param run the Action to wrap
- * @return the new Disposable instance
- */
- @NonNull
- public static Disposable fromAction(@NonNull Action run) {
- Objects.requireNonNull(run, "run is null");
- return new ActionDisposable(run);
- }
-
- /**
- * Construct a {@code Disposable} by wrapping a {@link Future} that is
- * cancelled exactly once when the {@code Disposable} is disposed.
- *
- * The {@code Future} is cancelled with {@code mayInterruptIfRunning == true}.
- * @param future the Future to wrap
- * @return the new Disposable instance
- * @see #fromFuture(Future, boolean)
- */
- @NonNull
- public static Disposable fromFuture(@NonNull Future> future) {
- Objects.requireNonNull(future, "future is null");
- return fromFuture(future, true);
- }
-
- /**
- * Construct a {@code Disposable} by wrapping a {@link Future} that is
- * cancelled exactly once when the {@code Disposable} is disposed.
- * @param future the Future to wrap
- * @param allowInterrupt if true, the future cancel happens via {@code Future.cancel(true)}
- * @return the new Disposable instance
- */
- @NonNull
- public static Disposable fromFuture(@NonNull Future> future, boolean allowInterrupt) {
- Objects.requireNonNull(future, "future is null");
- return new FutureDisposable(future, allowInterrupt);
- }
-
- /**
- * Construct a {@code Disposable} by wrapping a {@link Subscription} that is
- * cancelled exactly once when the {@code Disposable} is disposed.
- * @param subscription the Runnable to wrap
- * @return the new Disposable instance
- */
- @NonNull
- public static Disposable fromSubscription(@NonNull Subscription subscription) {
- Objects.requireNonNull(subscription, "subscription is null");
- return new SubscriptionDisposable(subscription);
- }
-
- /**
- * Construct a {@code Disposable} by wrapping an {@link AutoCloseable} that is
- * closed exactly once when the {@code Disposable} is disposed.
- * @param autoCloseable the AutoCloseable to wrap
- * @return the new Disposable instance
- * @since 3.0.0
- */
- @NonNull
- public static Disposable fromAutoCloseable(@NonNull AutoCloseable autoCloseable) {
- Objects.requireNonNull(autoCloseable, "autoCloseable is null");
- return new AutoCloseableDisposable(autoCloseable);
- }
-
- /**
- * Construct an {@link AutoCloseable} by wrapping a {@code Disposable} that is
- * disposed when the returned {@code AutoCloseable} is closed.
- * @param disposable the Disposable instance
- * @return the new AutoCloseable instance
- * @since 3.0.0
- */
- @NonNull
- public static AutoCloseable toAutoCloseable(@NonNull Disposable disposable) {
- return disposable::dispose;
- }
-
- /**
- * Returns a new, non-disposed {@code Disposable} instance.
- * @return a new, non-disposed {@code Disposable} instance
- */
- @NonNull
- public static Disposable empty() {
- return fromRunnable(Functions.EMPTY_RUNNABLE);
- }
-
- /**
- * Returns a shared, disposed {@code Disposable} instance.
- * @return a shared, disposed {@code Disposable} instance
- */
- @NonNull
- public static Disposable disposed() {
- return EmptyDisposable.INSTANCE;
- }
-}
diff --git a/src/main/java/io/reactivex/rxjava3/disposables/SerialDisposable.java b/src/main/java/io/reactivex/rxjava3/disposables/SerialDisposable.java
index 187a94b1e5..d2c7ddee4b 100644
--- a/src/main/java/io/reactivex/rxjava3/disposables/SerialDisposable.java
+++ b/src/main/java/io/reactivex/rxjava3/disposables/SerialDisposable.java
@@ -71,7 +71,7 @@ public boolean replace(@Nullable Disposable next) {
public Disposable get() {
Disposable d = resource.get();
if (d == DisposableHelper.DISPOSED) {
- return Disposables.disposed();
+ return Disposable.disposed();
}
return d;
}
diff --git a/src/main/java/io/reactivex/rxjava3/disposables/package-info.java b/src/main/java/io/reactivex/rxjava3/disposables/package-info.java
index 21b276e132..29812d0ab7 100644
--- a/src/main/java/io/reactivex/rxjava3/disposables/package-info.java
+++ b/src/main/java/io/reactivex/rxjava3/disposables/package-info.java
@@ -17,6 +17,6 @@
/**
* Default implementations for {@link io.reactivex.rxjava3.disposables.Disposable Disposable}-based resource management
* ({@code Disposable} container types) and utility classes to construct
- * {@link io.reactivex.rxjava3.disposables.Disposables Disposables} from callbacks and other types.
+ * {@link io.reactivex.rxjava3.disposables.Disposable Disposables} from callbacks and other types.
*/
package io.reactivex.rxjava3.disposables;
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java
index 5ce4d43547..4cd7fb479c 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java
@@ -29,7 +29,7 @@ public CompletableFromAction(Action run) {
@Override
protected void subscribeActual(CompletableObserver observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
try {
run.run();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromCallable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromCallable.java
index ac2c94c921..57764950df 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromCallable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromCallable.java
@@ -30,7 +30,7 @@ public CompletableFromCallable(Callable> callable) {
@Override
protected void subscribeActual(CompletableObserver observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
try {
callable.call();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java
index 3dc275e0e1..e94d650310 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnable.java
@@ -28,7 +28,7 @@ public CompletableFromRunnable(Runnable runnable) {
@Override
protected void subscribeActual(CompletableObserver observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
try {
runnable.run();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromSupplier.java b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromSupplier.java
index 5c6054306a..e3e29188a0 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromSupplier.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromSupplier.java
@@ -33,7 +33,7 @@ public CompletableFromSupplier(Supplier> supplier) {
@Override
protected void subscribeActual(CompletableObserver observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
try {
supplier.get();
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeError.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeError.java
index a850a27b6d..fc2f50066e 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeError.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeError.java
@@ -14,7 +14,7 @@
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.disposables.Disposables;
+import io.reactivex.rxjava3.disposables.Disposable;
/**
* Signals a constant Throwable.
@@ -31,7 +31,7 @@ public MaybeError(Throwable error) {
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- observer.onSubscribe(Disposables.disposed());
+ observer.onSubscribe(Disposable.disposed());
observer.onError(error);
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeErrorCallable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeErrorCallable.java
index f3b631ba36..20bb1e03a8 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeErrorCallable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeErrorCallable.java
@@ -14,7 +14,7 @@
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.disposables.Disposables;
+import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
@@ -34,7 +34,7 @@ public MaybeErrorCallable(Supplier extends Throwable> errorSupplier) {
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- observer.onSubscribe(Disposables.disposed());
+ observer.onSubscribe(Disposable.disposed());
Throwable ex;
try {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromAction.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromAction.java
index 8c29290deb..5bdf86f904 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromAction.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromAction.java
@@ -34,7 +34,7 @@ public MaybeFromAction(Action action) {
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (!d.isDisposed()) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCallable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCallable.java
index 4d218eb1ad..088f2e14f0 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCallable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromCallable.java
@@ -36,7 +36,7 @@ public MaybeFromCallable(Callable extends T> callable) {
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (!d.isDisposed()) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java
index db6c87518e..5d4dbc0f0b 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java
@@ -41,7 +41,7 @@ public MaybeFromFuture(Future extends T> future, long timeout, TimeUnit unit)
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (!d.isDisposed()) {
T v;
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromRunnable.java
index 672bd4edf5..2b7b658390 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromRunnable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromRunnable.java
@@ -34,7 +34,7 @@ public MaybeFromRunnable(Runnable runnable) {
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (!d.isDisposed()) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromSupplier.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromSupplier.java
index 52aba416d9..38f0afdb62 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromSupplier.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromSupplier.java
@@ -35,7 +35,7 @@ public MaybeFromSupplier(Supplier extends T> supplier) {
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (!d.isDisposed()) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeJust.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeJust.java
index a2c70740a6..54dacb3c7d 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeJust.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeJust.java
@@ -14,7 +14,7 @@
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.disposables.Disposables;
+import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
/**
@@ -32,7 +32,7 @@ public MaybeJust(T value) {
@Override
protected void subscribeActual(MaybeObserver super T> observer) {
- observer.onSubscribe(Disposables.disposed());
+ observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromCallable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromCallable.java
index 5300d968b0..a78dde3e36 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromCallable.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromCallable.java
@@ -31,7 +31,7 @@ public SingleFromCallable(Callable extends T> callable) {
@Override
protected void subscribeActual(SingleObserver super T> observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (d.isDisposed()) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromSupplier.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromSupplier.java
index 88dcd7dc94..e6fd32603f 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromSupplier.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFromSupplier.java
@@ -36,7 +36,7 @@ public SingleFromSupplier(Supplier extends T> supplier) {
@Override
protected void subscribeActual(SingleObserver super T> observer) {
- Disposable d = Disposables.empty();
+ Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (d.isDisposed()) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleJust.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleJust.java
index 8495f931db..4fd2387f96 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleJust.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleJust.java
@@ -14,7 +14,7 @@
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
-import io.reactivex.rxjava3.disposables.Disposables;
+import io.reactivex.rxjava3.disposables.Disposable;
public final class SingleJust
* Limit the amount concurrency two at a time without creating a new fix size
* thread pool:
- *
+ *
* S when(@NonNull Function
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
@@ -69,7 +69,7 @@
* {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.rxjava3.functions.BiFunction)} where
* subscribing to the first {@link Observable} could deadlock the subscription
* to the second.
- *
+ *
*
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
@@ -77,12 +77,12 @@
* return Completable.merge(Observable.merge(workers, 2));
* });
*
- *
+ *
* Slowing down the rate to no more than than 1 a second. This suffers from the
* same problem as the one above I could find an {@link Observable} operator
* that limits the rate without dropping the values (aka leaky bucket
* algorithm).
- *
+ *
*
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
@@ -145,7 +145,7 @@ public Worker createWorker() {
static final Disposable SUBSCRIBED = new SubscribedDisposable();
- static final Disposable DISPOSED = Disposables.disposed();
+ static final Disposable DISPOSED = Disposable.disposed();
@SuppressWarnings("serial")
abstract static class ScheduledAction extends AtomicReference