Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add fair mode overload to Schedulers.from(Executor) #6744

Merged
merged 1 commit into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@ public final class ExecutorScheduler extends Scheduler {

final boolean interruptibleWorker;

final boolean fair;

@NonNull
final Executor executor;

static final Scheduler HELPER = Schedulers.single();

public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker) {
public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
this.executor = executor;
this.interruptibleWorker = interruptibleWorker;
this.fair = fair;
}

@NonNull
@Override
public Worker createWorker() {
return new ExecutorWorker(executor, interruptibleWorker);
return new ExecutorWorker(executor, interruptibleWorker, fair);
}

@NonNull
Expand Down Expand Up @@ -123,6 +126,8 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run

final boolean interruptibleWorker;

final boolean fair;

final Executor executor;

final MpscLinkedQueue<Runnable> queue;
Expand All @@ -133,10 +138,11 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run

final CompositeDisposable tasks = new CompositeDisposable();

public ExecutorWorker(Executor executor, boolean interruptibleWorker) {
public ExecutorWorker(Executor executor, boolean interruptibleWorker, boolean fair) {
this.executor = executor;
this.queue = new MpscLinkedQueue<Runnable>();
this.interruptibleWorker = interruptibleWorker;
this.fair = fair;
}

@NonNull
Expand Down Expand Up @@ -236,6 +242,36 @@ public boolean isDisposed() {

@Override
public void run() {
if (fair) {
runFair();
} else {
runEager();
}
}

void runFair() {
final MpscLinkedQueue<Runnable> q = queue;
if (disposed) {
q.clear();
return;
}

Runnable run = q.poll();
if (run != null) {
run.run();
}

if (disposed) {
q.clear();
return;
}

if (wip.decrementAndGet() != 0) {
executor.execute(this);
}
}

void runEager() {
int missed = 1;
final MpscLinkedQueue<Runnable> q = queue;
for (;;) {
Expand Down
81 changes: 79 additions & 2 deletions src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ public static Scheduler single() {
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
* in case the worker runs on a shared underlying thread of the Executor.
* See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor
* more fairly.
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
Expand All @@ -346,10 +348,11 @@ public static Scheduler single() {
* @param executor
* the executor to wrap
* @return the new Scheduler wrapping the Executor
* @see #from(Executor, boolean, boolean)
*/
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor, false);
return new ExecutorScheduler(executor, false, false);
}

/**
Expand Down Expand Up @@ -382,6 +385,8 @@ public static Scheduler from(@NonNull Executor executor) {
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
* in case the worker runs on a shared underlying thread of the Executor.
* See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor
* more fairly.
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
Expand Down Expand Up @@ -411,10 +416,82 @@ public static Scheduler from(@NonNull Executor executor) {
* be interrupted when the task is disposed.
* @return the new Scheduler wrapping the Executor
* @since 3.0.0
* @see #from(Executor, boolean, boolean)
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker) {
return new ExecutorScheduler(executor, interruptibleWorker);
return new ExecutorScheduler(executor, interruptibleWorker, false);
}

/**
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
* calls to it.
* <p>
* The tasks scheduled by the returned {@link Scheduler} and its {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker}
* can be optionally interrupted.
* <p>
* If the provided executor doesn't support any of the more specific standard Java executor
* APIs, tasks scheduled with a time delay or periodically will use the
* {@link #single()} scheduler for the timed waiting
* before posting the actual task to the given executor.
* <p>
* If the provided executor supports the standard Java {@link ExecutorService} API,
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
* {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
* before posting the actual task to the given executor.
* <p>
* If the provided executor supports the standard Java {@link ScheduledExecutorService} API,
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
* {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with
* a time delay or periodically will use the provided executor. Note, however, if the provided
* {@code ScheduledExecutorService} instance is not single threaded, tasks scheduled
* with a time delay close to each other may end up executing in different order than
* the original schedule() call was issued. This limitation may be lifted in a future patch.
* <p>
* The implementation of the Worker of this wrapper Scheduler can operate in both eager (non-fair) and
* fair modes depending on the specified parameter. In <em>eager</em> mode, it will execute as many
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
* in case the worker runs on a shared underlying thread of the Executor. In <em>fair</em> mode,
* non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task,
* the execution for the next task is rescheduled with the same underlying Executor, allowing interleaving
* from both the same Scheduler or other external usages of the underlying Executor.
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
* <pre><code>
* ExecutorService exec = Executors.newSingleThreadedExecutor();
* try {
* Scheduler scheduler = Schedulers.from(exec, true, true);
* Flowable.just(1)
* .subscribeOn(scheduler)
* .map(v -&gt; v + 1)
* .observeOn(scheduler)
* .blockingSubscribe(System.out::println);
* } finally {
* exec.shutdown();
* }
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
* @param executor
* the executor to wrap
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will
* be interrupted when the task is disposed.
* @param fair if {@code true} tasks submitted to the will be executed by the underlying {@link Executor} one after the other, still
* in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlying {@code Executor}.
* If {@code false}, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlying {@code Executor} thread.
* @return the new Scheduler wrapping the Executor
* @since 3.0.0
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
return new ExecutorScheduler(executor, interruptibleWorker, fair);
}

/**
Expand Down
Loading