Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port DH-11185: Parallel Where #3679

Merged
merged 2 commits into from
Apr 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package io.deephaven.engine.table.impl;

import io.deephaven.base.verify.Assert;
import io.deephaven.engine.exceptions.CancellationException;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ModifiedColumnSet;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.select.WhereFilter;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedNode;
import io.deephaven.util.datastructures.linked.IntrusiveDoublyLinkedQueue;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A FilterExecution that is used for initial filters. When we split off sub filters as child jobs, they are enqueued in
* the {@link OperationInitializationThreadPool}.
*/
class InitialFilterExecution extends AbstractFilterExecution {
private final QueryTable sourceTable;
private final boolean permitParallelization;
private final int segmentCount;
private final WhereFilter[] filters;

/**
* The pendingSatisfaction list is global to the root node of this InitialExecutionFilter. The outstanding children
* allows us to count how many jobs exist. If we have no outstanding jobs, but unsatisfied Notifications then an
* error has occurred.
*/
private final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> pendingSatisfaction;
private final Map<Thread, Thread> runningChildren;
private final AtomicBoolean cancelled;

/**
* The SubEntry lets us track query performance for the split jobs.
*/
private BasePerformanceEntry basePerformanceEntry;

/**
* The InitialFilterExecution that represents all the work we are doing for this table.
*/
private final InitialFilterExecution root;

InitialFilterExecution(
final QueryTable sourceTable,
final WhereFilter[] filters,
final RowSet addedInput,
final long addStart,
final long addEnd,
final InitialFilterExecution parent,
final int filterIndex,
final boolean usePrev) {
super(sourceTable, filters, addedInput, addStart, addEnd, null, 0, 0, parent, usePrev, false,
ModifiedColumnSet.ALL, filterIndex);
this.sourceTable = sourceTable;
permitParallelization = permitParallelization(filters);
this.filters = filters;
if (parent == null) {
pendingSatisfaction = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
segmentCount = QueryTable.PARALLEL_WHERE_SEGMENTS <= 0 ? OperationInitializationThreadPool.NUM_THREADS
: QueryTable.PARALLEL_WHERE_SEGMENTS;
runningChildren = Collections.synchronizedMap(new IdentityHashMap<>());
cancelled = new AtomicBoolean(false);
this.root = this;
} else {
pendingSatisfaction = parent.pendingSatisfaction;
segmentCount = parent.segmentCount;
this.root = parent.root;
runningChildren = null;
cancelled = null;
}
}

@Override
void enqueueSubFilters(
List<AbstractFilterExecution> subFilters,
AbstractFilterExecution.CombinationNotification combinationNotification) {
synchronized (pendingSatisfaction) {
enqueueJobs(subFilters);
pendingSatisfaction.offer(combinationNotification);
}
}

private void enqueueJobs(Iterable<? extends NotificationQueue.Notification> subFilters) {
for (NotificationQueue.Notification notification : subFilters) {
OperationInitializationThreadPool.executorService.submit(() -> {
root.runningChildren.put(Thread.currentThread(), Thread.currentThread());
try {
if (!root.cancelled.get()) {
notification.run();
}
if (Thread.interrupted()) {
// we would like to throw a query cancellation exception
exceptionResult = new CancellationException("thread interrupted");
}
} finally {
root.runningChildren.remove(Thread.currentThread());
}
});
}
}

@Override
int getTargetSegments() {
return segmentCount;
}

@Override
boolean doParallelization(long numberOfRows) {
return permitParallelization && doParallelizationBase(numberOfRows);
}

@Override
void handleUncaughtException(Exception throwable) {
throw new UnsupportedOperationException(throwable);
}

@Override
void accumulatePerformanceEntry(BasePerformanceEntry entry) {
synchronized (root) {
if (root.basePerformanceEntry != null) {
root.basePerformanceEntry.accumulate(entry);
} else {
root.basePerformanceEntry = entry;
}
}
}

/**
* Run any satisfied jobs in the pendingSatisfaction list.
*/
@Override
void onNoChildren() {
final IntrusiveDoublyLinkedQueue<NotificationQueue.Notification> satisfied = new IntrusiveDoublyLinkedQueue<>(
IntrusiveDoublyLinkedNode.Adapter.<NotificationQueue.Notification>getInstance());
synchronized (pendingSatisfaction) {
for (final Iterator<NotificationQueue.Notification> it = pendingSatisfaction.iterator(); it.hasNext();) {
final NotificationQueue.Notification notification = it.next();
if (notification.canExecute(0)) {
satisfied.offer(notification);
it.remove();
}
}
}
if (satisfied.isEmpty()) {
return;
}
satisfied.forEach(NotificationQueue.Notification::run);
}

@Override
InitialFilterExecution makeChild(
final RowSet addedInput,
final long addStart,
final long addEnd,
final RowSet modifyInput,
final long modifyStart,
final long modifyEnd,
final int filterIndex) {
Assert.eqNull(modifyInput, "modifyInput");
return new InitialFilterExecution(sourceTable, filters, addedInput, addStart, addEnd, this, filterIndex,
usePrev);
}

BasePerformanceEntry getBasePerformanceEntry() {
return basePerformanceEntry;
}

void setCancelled() {
cancelled.set(true);
runningChildren.forEach((thread, ignored) -> thread.interrupt());
}
}
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.exceptions.UncheckedTableException;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
@@ -54,8 +53,6 @@ public abstract class InstrumentedTableListenerBase extends LivenessArtifact
private volatile long lastCompletedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;
private volatile long lastEnqueuedStep = NotificationStepReceiver.NULL_NOTIFICATION_STEP;

protected final ExecutionContext executionContext = ExecutionContext.getContextToRecord();

InstrumentedTableListenerBase(@Nullable String description, boolean terminalListener) {
this.entry = UpdatePerformanceTracker.getInstance().getEntry(description);
this.terminalListener = terminalListener;
@@ -214,11 +211,6 @@ public LogOutput append(LogOutput output) {
return output.append("ErrorNotification{").append("originalException=")
.append(originalException.getMessage()).append(", sourceEntry=").append(sourceEntry).append("}");
}

@Override
public ExecutionContext getExecutionContext() {
return executionContext;
}
}

protected abstract class NotificationBase extends AbstractNotification implements LogOutputAppendable {
@@ -264,11 +256,6 @@ public final boolean canExecute(final long step) {
return InstrumentedTableListenerBase.this.canExecute(step);
}

@Override
public ExecutionContext getExecutionContext() {
return executionContext;
}

void doRun(final Runnable invokeOnUpdate) {
try {
doRunInternal(invokeOnUpdate);
Original file line number Diff line number Diff line change
@@ -8,9 +8,9 @@
import io.deephaven.engine.exceptions.UncheckedTableException;
import io.deephaven.engine.liveness.LivenessArtifact;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.impl.perf.BasePerformanceEntry;
import io.deephaven.engine.table.impl.perf.PerformanceEntry;
import io.deephaven.engine.updategraph.UpdateGraphProcessor;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.engine.updategraph.NotificationQueue;
@@ -58,8 +58,6 @@ public abstract class MergedListener extends LivenessArtifact implements Notific
@ReferentialIntegrity
private Runnable delayedErrorReference;

private final ExecutionContext executionContext = ExecutionContext.getContextToRecord();

protected MergedListener(
Iterable<? extends ListenerRecorder> recorders,
Iterable<NotificationQueue.Dependency> dependencies,
@@ -256,6 +254,16 @@ public boolean satisfied(final long step) {
return true;
}

protected void handleUncaughtException(Exception updateException) {
log.error().append(logPrefix).append("Uncaught exception for entry= ").append(entry)
.append(": ").append(updateException).endl();
propagateError(true, updateException, entry);
}

protected void accumulatePeformanceEntry(BasePerformanceEntry subEntry) {
entry.accumulate(subEntry);
}

private class MergedNotification extends AbstractNotification {

public MergedNotification() {
@@ -310,9 +318,7 @@ public void run() {
entry.onUpdateEnd();
}
} catch (Exception updateException) {
log.error().append(logPrefix).append("Uncaught exception for entry= ").append(entry)
.append(": ").append(updateException).endl();
propagateError(true, updateException, entry);
handleUncaughtException(updateException);
} finally {
lastCompletedStep = currentStep;
releaseFromRecorders();
@@ -329,10 +335,5 @@ public LogOutput append(LogOutput logOutput) {
public boolean canExecute(final long step) {
return MergedListener.this.canExecute(step);
}

@Override
public ExecutionContext getExecutionContext() {
return executionContext;
}
}
}
Original file line number Diff line number Diff line change
@@ -32,6 +32,10 @@ public static boolean isInitializationThread() {
return isInitializationThread.get();
}

public static boolean canParallelize() {
return NUM_THREADS > 1 && !isInitializationThread();
}

public final static ExecutorService executorService;
static {
final ThreadGroup threadGroup = new ThreadGroup("OperationInitializationThreadPool");
Loading