Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: opensearch-project/OpenSearch
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: fa2192ec5f3598c065d1d54e5aabf21ff24a31d3
Choose a base ref
..
head repository: opensearch-project/OpenSearch
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: d25b19cf1243b64617fc1c3d0d82d318163c0516
Choose a head ref
Original file line number Diff line number Diff line change
@@ -44,26 +44,23 @@
public class MockSinglePrioritizingExecutor extends PrioritizedOpenSearchThreadPoolExecutor {

public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() {
@Override
public void start() {
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
try {
r.run();
} catch (KillWorkerError kwe) {
// hacks everywhere
}
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread(() -> {
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
try {
r.run();
} catch (KillWorkerError kwe) {
// hacks everywhere
}
}

@Override
public String toString() {
return r.toString();
}
});
}
}, threadPool.getThreadContext(), threadPool.scheduler());
@Override
public String toString() {
return r.toString();
}
});
}), threadPool.getThreadContext(), threadPool.scheduler());
}

@Override
Original file line number Diff line number Diff line change
@@ -36,11 +36,15 @@
import org.opensearch.common.util.concurrent.PrioritizedRunnable;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

public class MockSinglePrioritizingExecutorTests extends OpenSearchTestCase {

public void testPrioritizedEsThreadPoolExecutor() {
public void testPrioritizedEsThreadPoolExecutor() throws InterruptedException, TimeoutException {
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
final PrioritizedOpenSearchThreadPoolExecutor executor = new MockSinglePrioritizingExecutor(
"test",
@@ -64,13 +68,44 @@ public void run() {
});
assertFalse(called1.get());
assertFalse(called2.get());

// There should be 2 tasks scheduled in the executor's queue
await(() -> executor.getQueue().size() == 2, 500, TimeUnit.MILLISECONDS);
await(() -> taskQueue.hasRunnableTasks(), 500, TimeUnit.MILLISECONDS);

taskQueue.runRandomTask();
assertFalse(called1.get());
assertTrue(called2.get());

// There should be 1 task scheduled in the executor's queue now (one was already run)
await(() -> executor.getQueue().size() == 1, 500, TimeUnit.MILLISECONDS);
await(() -> taskQueue.hasRunnableTasks(), 500, TimeUnit.MILLISECONDS);

taskQueue.runRandomTask();
assertTrue(called1.get());
assertTrue(called2.get());

// There should be 0 task scheduled in the executor's queue now (both were already run)
await(() -> executor.getQueue().isEmpty(), 500, TimeUnit.MILLISECONDS);
await(() -> taskQueue.hasRunnableTasks(), 500, TimeUnit.MILLISECONDS);

taskQueue.runRandomTask();
assertFalse(taskQueue.hasRunnableTasks());
}

private void await(Supplier<Boolean> awaiter, int timeout, TimeUnit unit) throws TimeoutException {
final long slice = TimeUnit.NANOSECONDS.convert(timeout, unit) / 10L;
final long limit = unit.toNanos(timeout);

int attempt = 0;
while (!awaiter.get() && attempt < limit) {
final long started = System.nanoTime();
LockSupport.parkNanos(Math.min(slice, limit - attempt));
attempt += (System.nanoTime() - started);
}

if (!awaiter.get()) {
throw new TimeoutException("Timeout reached while awaiting for condition to be met");
}
}
}