From 91f4dd4d3f4be42e9724e6d09fa0d9146819a3c8 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 3 Dec 2019 16:02:44 +0100 Subject: [PATCH] 3.x: Add fair mode overload to Schedulers.from(Executor) --- .../schedulers/ExecutorScheduler.java | 42 +- .../rxjava3/schedulers/Schedulers.java | 81 ++- .../schedulers/ExecutorSchedulerFairTest.java | 515 ++++++++++++++++++ .../ExecutorSchedulerInterruptibleTest.java | 88 +-- .../schedulers/ExecutorSchedulerTest.java | 34 +- 5 files changed, 655 insertions(+), 105 deletions(-) create mode 100644 src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerFairTest.java diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java index 16fdf98032..8fd7b33720 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java @@ -33,20 +33,23 @@ public final class ExecutorScheduler extends Scheduler { final boolean interruptibleWorker; + final boolean fair; + @NonNull final Executor executor; static final Scheduler HELPER = Schedulers.single(); - public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker) { + public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) { this.executor = executor; this.interruptibleWorker = interruptibleWorker; + this.fair = fair; } @NonNull @Override public Worker createWorker() { - return new ExecutorWorker(executor, interruptibleWorker); + return new ExecutorWorker(executor, interruptibleWorker, fair); } @NonNull @@ -123,6 +126,8 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run final boolean interruptibleWorker; + final boolean fair; + final Executor executor; final MpscLinkedQueue queue; @@ -133,10 +138,11 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run final CompositeDisposable tasks = new CompositeDisposable(); - public ExecutorWorker(Executor executor, boolean interruptibleWorker) { + public ExecutorWorker(Executor executor, boolean interruptibleWorker, boolean fair) { this.executor = executor; this.queue = new MpscLinkedQueue(); this.interruptibleWorker = interruptibleWorker; + this.fair = fair; } @NonNull @@ -236,6 +242,36 @@ public boolean isDisposed() { @Override public void run() { + if (fair) { + runFair(); + } else { + runEager(); + } + } + + void runFair() { + final MpscLinkedQueue q = queue; + if (disposed) { + q.clear(); + return; + } + + Runnable run = q.poll(); + if (run != null) { + run.run(); + } + + if (disposed) { + q.clear(); + return; + } + + if (wip.decrementAndGet() != 0) { + executor.execute(this); + } + } + + void runEager() { int missed = 1; final MpscLinkedQueue q = queue; for (;;) { diff --git a/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java b/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java index 2a0d396043..254511f5e0 100644 --- a/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java @@ -321,6 +321,8 @@ public static Scheduler single() { * non-delayed tasks as it can, which may result in a longer than expected occupation of a * thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness * in case the worker runs on a shared underlying thread of the Executor. + * See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor + * more fairly. *

* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided * executor's lifecycle must be managed externally: @@ -346,10 +348,11 @@ public static Scheduler single() { * @param executor * the executor to wrap * @return the new Scheduler wrapping the Executor + * @see #from(Executor, boolean, boolean) */ @NonNull public static Scheduler from(@NonNull Executor executor) { - return new ExecutorScheduler(executor, false); + return new ExecutorScheduler(executor, false, false); } /** @@ -382,6 +385,8 @@ public static Scheduler from(@NonNull Executor executor) { * non-delayed tasks as it can, which may result in a longer than expected occupation of a * thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness * in case the worker runs on a shared underlying thread of the Executor. + * See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor + * more fairly. *

* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided * executor's lifecycle must be managed externally: @@ -411,10 +416,82 @@ public static Scheduler from(@NonNull Executor executor) { * be interrupted when the task is disposed. * @return the new Scheduler wrapping the Executor * @since 3.0.0 + * @see #from(Executor, boolean, boolean) */ @NonNull public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker) { - return new ExecutorScheduler(executor, interruptibleWorker); + return new ExecutorScheduler(executor, interruptibleWorker, false); + } + + /** + * Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()} + * calls to it. + *

+ * The tasks scheduled by the returned {@link Scheduler} and its {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} + * can be optionally interrupted. + *

+ * If the provided executor doesn't support any of the more specific standard Java executor + * APIs, tasks scheduled with a time delay or periodically will use the + * {@link #single()} scheduler for the timed waiting + * before posting the actual task to the given executor. + *

+ * If the provided executor supports the standard Java {@link ExecutorService} API, + * canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling + * {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with + * a time delay or periodically will use the {@link #single()} scheduler for the timed waiting + * before posting the actual task to the given executor. + *

+ * If the provided executor supports the standard Java {@link ScheduledExecutorService} API, + * canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling + * {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with + * a time delay or periodically will use the provided executor. Note, however, if the provided + * {@code ScheduledExecutorService} instance is not single threaded, tasks scheduled + * with a time delay close to each other may end up executing in different order than + * the original schedule() call was issued. This limitation may be lifted in a future patch. + *

+ * The implementation of the Worker of this wrapper Scheduler can operate in both eager (non-fair) and + * fair modes depending on the specified parameter. In eager mode, it will execute as many + * non-delayed tasks as it can, which may result in a longer than expected occupation of a + * thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness + * in case the worker runs on a shared underlying thread of the Executor. In fair mode, + * non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task, + * the execution for the next task is rescheduled with the same underlying Executor, allowing interleaving + * from both the same Scheduler or other external usages of the underlying Executor. + *

+ * Starting, stopping and restarting this scheduler is not supported (no-op) and the provided + * executor's lifecycle must be managed externally: + *


+     * ExecutorService exec = Executors.newSingleThreadedExecutor();
+     * try {
+     *     Scheduler scheduler = Schedulers.from(exec, true, true);
+     *     Flowable.just(1)
+     *        .subscribeOn(scheduler)
+     *        .map(v -> v + 1)
+     *        .observeOn(scheduler)
+     *        .blockingSubscribe(System.out::println);
+     * } finally {
+     *     exec.shutdown();
+     * }
+     * 
+ *

+ * This type of scheduler is less sensitive to leaking {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} instances, although + * not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or + * execute those tasks "unexpectedly". + *

+ * Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance. + * @param executor + * the executor to wrap + * @param interruptibleWorker if {@code true} the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will + * be interrupted when the task is disposed. + * @param fair if {@code true} tasks submitted to the will be executed by the underlying {@link Executor} one after the other, still + * in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlying {@code Executor}. + * If {@code false}, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlying {@code Executor} thread. + * @return the new Scheduler wrapping the Executor + * @since 3.0.0 + */ + @NonNull + public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) { + return new ExecutorScheduler(executor, interruptibleWorker, fair); } /** diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerFairTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerFairTest.java new file mode 100644 index 0000000000..97fd1d3269 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerFairTest.java @@ -0,0 +1,515 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.schedulers; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.core.Scheduler.Worker; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.internal.functions.Functions; +import io.reactivex.rxjava3.internal.schedulers.RxThreadFactory; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.subscribers.TestSubscriber; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class ExecutorSchedulerFairTest extends AbstractSchedulerConcurrencyTests { + + static final Executor executor = Executors.newFixedThreadPool(2, new RxThreadFactory("TestCustomPool")); + + @Override + protected Scheduler getScheduler() { + return Schedulers.from(executor, false, true); + } + + @Test + public final void handledErrorIsNotDeliveredToThreadHandler() throws InterruptedException { + SchedulerTestHelper.handledErrorIsNotDeliveredToThreadHandler(getScheduler()); + } + + @Test + public void cancelledTaskRetention() throws InterruptedException { + ExecutorService exec = Executors.newSingleThreadExecutor(); + Scheduler s = Schedulers.from(exec, false, true); + try { + Scheduler.Worker w = s.createWorker(); + try { + ExecutorSchedulerTest.cancelledRetention(w, false); + } finally { + w.dispose(); + } + + w = s.createWorker(); + try { + ExecutorSchedulerTest.cancelledRetention(w, true); + } finally { + w.dispose(); + } + } finally { + exec.shutdownNow(); + } + } + + /** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */ + static final class TestExecutor implements Executor { + final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + @Override + public void execute(Runnable command) { + queue.offer(command); + } + public void executeOne() { + Runnable r = queue.poll(); + if (r != null) { + r.run(); + } + } + public void executeAll() { + Runnable r; + while ((r = queue.poll()) != null) { + r.run(); + } + } + } + + @Test + public void cancelledTasksDontRun() { + final AtomicInteger calls = new AtomicInteger(); + Runnable task = new Runnable() { + @Override + public void run() { + calls.getAndIncrement(); + } + }; + TestExecutor exec = new TestExecutor(); + Scheduler custom = Schedulers.from(exec, false, true); + Worker w = custom.createWorker(); + try { + Disposable d1 = w.schedule(task); + Disposable d2 = w.schedule(task); + Disposable d3 = w.schedule(task); + + d1.dispose(); + d2.dispose(); + d3.dispose(); + + exec.executeAll(); + + assertEquals(0, calls.get()); + } finally { + w.dispose(); + } + } + + @Test + public void cancelledWorkerDoesntRunTasks() { + final AtomicInteger calls = new AtomicInteger(); + Runnable task = new Runnable() { + @Override + public void run() { + calls.getAndIncrement(); + } + }; + TestExecutor exec = new TestExecutor(); + Scheduler custom = Schedulers.from(exec, false, true); + Worker w = custom.createWorker(); + try { + w.schedule(task); + w.schedule(task); + w.schedule(task); + } finally { + w.dispose(); + } + exec.executeAll(); + assertEquals(0, calls.get()); + } + + @Test + public void plainExecutor() throws Exception { + Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + r.run(); + } + }, false, true); + + final CountDownLatch cdl = new CountDownLatch(5); + + Runnable r = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + + s.scheduleDirect(r); + + s.scheduleDirect(r, 50, TimeUnit.MILLISECONDS); + + Disposable d = s.schedulePeriodicallyDirect(r, 10, 10, TimeUnit.MILLISECONDS); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + } finally { + d.dispose(); + } + + assertTrue(d.isDisposed()); + } + + @Test + public void rejectingExecutor() { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + exec.shutdown(); + + Scheduler s = Schedulers.from(exec, false, true); + + List errors = TestHelper.trackPluginErrors(); + + try { + assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE)); + + assertSame(EmptyDisposable.INSTANCE, s.scheduleDirect(Functions.EMPTY_RUNNABLE, 10, TimeUnit.MILLISECONDS)); + + assertSame(EmptyDisposable.INSTANCE, s.schedulePeriodicallyDirect(Functions.EMPTY_RUNNABLE, 10, 10, TimeUnit.MILLISECONDS)); + + TestHelper.assertUndeliverable(errors, 0, RejectedExecutionException.class); + TestHelper.assertUndeliverable(errors, 1, RejectedExecutionException.class); + TestHelper.assertUndeliverable(errors, 2, RejectedExecutionException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void rejectingExecutorWorker() { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + exec.shutdown(); + + List errors = TestHelper.trackPluginErrors(); + + try { + Worker s = Schedulers.from(exec, false, true).createWorker(); + assertSame(EmptyDisposable.INSTANCE, s.schedule(Functions.EMPTY_RUNNABLE)); + + s = Schedulers.from(exec, false, true).createWorker(); + assertSame(EmptyDisposable.INSTANCE, s.schedule(Functions.EMPTY_RUNNABLE, 10, TimeUnit.MILLISECONDS)); + + s = Schedulers.from(exec, false, true).createWorker(); + assertSame(EmptyDisposable.INSTANCE, s.schedulePeriodically(Functions.EMPTY_RUNNABLE, 10, 10, TimeUnit.MILLISECONDS)); + + TestHelper.assertUndeliverable(errors, 0, RejectedExecutionException.class); + TestHelper.assertUndeliverable(errors, 1, RejectedExecutionException.class); + TestHelper.assertUndeliverable(errors, 2, RejectedExecutionException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void reuseScheduledExecutor() throws Exception { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + + try { + Scheduler s = Schedulers.from(exec, false, true); + + final CountDownLatch cdl = new CountDownLatch(8); + + Runnable r = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + + s.scheduleDirect(r); + + s.scheduleDirect(r, 10, TimeUnit.MILLISECONDS); + + Disposable d = s.schedulePeriodicallyDirect(r, 10, 10, TimeUnit.MILLISECONDS); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + } finally { + d.dispose(); + } + } finally { + exec.shutdown(); + } + } + + @Test + public void reuseScheduledExecutorAsWorker() throws Exception { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + + Worker s = Schedulers.from(exec, false, true).createWorker(); + + assertFalse(s.isDisposed()); + try { + + final CountDownLatch cdl = new CountDownLatch(8); + + Runnable r = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + + s.schedule(r); + + s.schedule(r, 10, TimeUnit.MILLISECONDS); + + Disposable d = s.schedulePeriodically(r, 10, 10, TimeUnit.MILLISECONDS); + + try { + assertTrue(cdl.await(5, TimeUnit.SECONDS)); + } finally { + d.dispose(); + } + } finally { + s.dispose(); + exec.shutdown(); + } + + assertTrue(s.isDisposed()); + } + + @Test + public void disposeRace() { + ExecutorService exec = Executors.newSingleThreadExecutor(); + final Scheduler s = Schedulers.from(exec, false, true); + try { + for (int i = 0; i < 500; i++) { + final Worker w = s.createWorker(); + + final AtomicInteger c = new AtomicInteger(2); + + w.schedule(new Runnable() { + @Override + public void run() { + c.decrementAndGet(); + while (c.get() != 0) { } + } + }); + + c.decrementAndGet(); + while (c.get() != 0) { } + w.dispose(); + } + } finally { + exec.shutdownNow(); + } + } + + @Test + public void runnableDisposed() { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + r.run(); + } + }, false, true); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); + + assertTrue(d.isDisposed()); + } + + @Test + public void runnableDisposedAsync() throws Exception { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + new Thread(r).start(); + } + }, false, true); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test + public void runnableDisposedAsync2() throws Exception { + final Scheduler s = Schedulers.from(executor, false, true); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test + public void runnableDisposedAsyncCrash() throws Exception { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + new Thread(r).start(); + } + }, false, true); + Disposable d = s.scheduleDirect(new Runnable() { + @Override + public void run() { + throw new IllegalStateException(); + } + }); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test + public void runnableDisposedAsyncTimed() throws Exception { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + new Thread(r).start(); + } + }, false, true); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } + + @Test + public void runnableDisposedAsyncTimed2() throws Exception { + ExecutorService executorScheduler = Executors.newScheduledThreadPool(1, new RxThreadFactory("TestCustomPoolTimed")); + try { + final Scheduler s = Schedulers.from(executorScheduler, false, true); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE, 1, TimeUnit.MILLISECONDS); + + while (!d.isDisposed()) { + Thread.sleep(1); + } + } finally { + executorScheduler.shutdownNow(); + } + } + + @Test + public void unwrapScheduleDirectTaskAfterDispose() { + Scheduler scheduler = getScheduler(); + final CountDownLatch cdl = new CountDownLatch(1); + Runnable countDownRunnable = new Runnable() { + @Override + public void run() { + cdl.countDown(); + } + }; + Disposable disposable = scheduler.scheduleDirect(countDownRunnable, 100, TimeUnit.MILLISECONDS); + SchedulerRunnableIntrospection wrapper = (SchedulerRunnableIntrospection) disposable; + assertSame(countDownRunnable, wrapper.getWrappedRunnable()); + disposable.dispose(); + + assertSame(Functions.EMPTY_RUNNABLE, wrapper.getWrappedRunnable()); + } + + @Test + public void fairInterleaving() { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + final Scheduler sch = Schedulers.from(exec, false, true); + + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .publish(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Throwable { + return Flowable.merge( + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Throwable { + return w % 2 == 0; + } + }).observeOn(sch, false, 1).hide(), + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Throwable { + return w % 2 != 0; + } + }).observeOn(sch, false, 1).hide() + ); + } + }) + .test(); + + for (int i = 1; i < 11; i++) { + pp.onNext(i); + } + pp.onComplete(); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } finally { + exec.shutdown(); + } + } + + @Test + public void fairInterleavingWithDelay() { + ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor(); + try { + final Scheduler sch = Schedulers.from(exec, false, true); + + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp + .publish(new Function, Flowable>() { + @Override + public Flowable apply(Flowable v) throws Throwable { + return Flowable.merge( + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Throwable { + return w % 2 == 0; + } + }).delay(0, TimeUnit.SECONDS, sch).hide(), + v.filter(new Predicate() { + @Override + public boolean test(Integer w) throws Throwable { + return w % 2 != 0; + } + }).delay(0, TimeUnit.SECONDS, sch).hide() + ); + } + }) + .test(); + + for (int i = 1; i < 11; i++) { + pp.onNext(i); + } + pp.onComplete(); + + ts + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } finally { + exec.shutdown(); + } + } +} diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java index 369907bb3d..ecf50c3e99 100644 --- a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java +++ b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java @@ -15,7 +15,6 @@ import static org.junit.Assert.*; -import java.lang.management.*; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -27,7 +26,7 @@ import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import io.reactivex.rxjava3.internal.functions.Functions; -import io.reactivex.rxjava3.internal.schedulers.*; +import io.reactivex.rxjava3.internal.schedulers.RxThreadFactory; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.testsupport.TestHelper; @@ -45,87 +44,6 @@ public final void handledErrorIsNotDeliveredToThreadHandler() throws Interrupted SchedulerTestHelper.handledErrorIsNotDeliveredToThreadHandler(getScheduler()); } - public static void cancelledRetention(Scheduler.Worker w, boolean periodic) throws InterruptedException { - System.out.println("Wait before GC"); - Thread.sleep(1000); - - System.out.println("GC"); - System.gc(); - - Thread.sleep(1000); - - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); - long initial = memHeap.getUsed(); - - System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); - - int n = 100 * 1000; - if (periodic) { - final CountDownLatch cdl = new CountDownLatch(n); - final Runnable action = new Runnable() { - @Override - public void run() { - cdl.countDown(); - } - }; - for (int i = 0; i < n; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedulePeriodically(action, 0, 1, TimeUnit.DAYS); - } - - System.out.println("Waiting for the first round to finish..."); - cdl.await(); - } else { - for (int i = 0; i < n; i++) { - if (i % 50000 == 0) { - System.out.println(" -> still scheduling: " + i); - } - w.schedule(Functions.EMPTY_RUNNABLE, 1, TimeUnit.DAYS); - } - } - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long after = memHeap.getUsed(); - System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); - - w.dispose(); - - System.out.println("Wait before second GC"); - System.out.println("JDK 6 purge is N log N because it removes and shifts one by one"); - int t = (int)(n * Math.log(n) / 100) + SchedulerPoolFactory.PURGE_PERIOD_SECONDS * 1000; - while (t > 0) { - System.out.printf(" >> Waiting for purge: %.2f s remaining%n", t / 1000d); - - System.gc(); - - Thread.sleep(1000); - - t -= 1000; - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); - System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); - if (finish <= initial * 5) { - break; - } - } - - System.out.println("Second GC"); - System.gc(); - - Thread.sleep(1000); - - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); - System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); - - if (finish > initial * 5) { - fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); - } - } - @Test public void cancelledTaskRetention() throws InterruptedException { ExecutorService exec = Executors.newSingleThreadExecutor(); @@ -133,14 +51,14 @@ public void cancelledTaskRetention() throws InterruptedException { try { Scheduler.Worker w = s.createWorker(); try { - cancelledRetention(w, false); + ExecutorSchedulerTest.cancelledRetention(w, false); } finally { w.dispose(); } w = s.createWorker(); try { - cancelledRetention(w, true); + ExecutorSchedulerTest.cancelledRetention(w, true); } finally { w.dispose(); } diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java index 7baebce0d5..b24fa65f90 100644 --- a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerTest.java @@ -55,8 +55,7 @@ public static void cancelledRetention(Scheduler.Worker w, boolean periodic) thro Thread.sleep(1000); MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); - MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); - long initial = memHeap.getUsed(); + long initial = memoryMXBean.getHeapMemoryUsage().getUsed(); System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); @@ -87,8 +86,7 @@ public void run() { } } - memHeap = memoryMXBean.getHeapMemoryUsage(); - long after = memHeap.getUsed(); + long after = memoryMXBean.getHeapMemoryUsage().getUsed(); System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); w.dispose(); @@ -96,34 +94,40 @@ public void run() { System.out.println("Wait before second GC"); System.out.println("JDK 6 purge is N log N because it removes and shifts one by one"); int t = (int)(n * Math.log(n) / 100) + SchedulerPoolFactory.PURGE_PERIOD_SECONDS * 1000; + int sleepStep = 100; while (t > 0) { System.out.printf(" >> Waiting for purge: %.2f s remaining%n", t / 1000d); System.gc(); - Thread.sleep(1000); - - t -= 1000; - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); + long finish = memoryMXBean.getHeapMemoryUsage().getUsed(); System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); if (finish <= initial * 5) { break; } + + Thread.sleep(sleepStep); + t -= sleepStep; } System.out.println("Second GC"); System.gc(); - Thread.sleep(1000); + t = 2000; + long finish = memoryMXBean.getHeapMemoryUsage().getUsed(); - memHeap = memoryMXBean.getHeapMemoryUsage(); - long finish = memHeap.getUsed(); - System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); + while (t > 0) { + System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); - if (finish > initial * 5) { - fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); + if (finish <= initial * 5) { + return; + } + Thread.sleep(sleepStep); + t -= sleepStep; + finish = memoryMXBean.getHeapMemoryUsage().getUsed(); } + + fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); } @Test