Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Symbol table aggregation improvements from DHE #3955

Merged
merged 8 commits into from
Jun 21, 2023
Merged
Original file line number Diff line number Diff line change
@@ -57,15 +57,15 @@ class BucketingContext implements SafeCloseable {
originalLeftSources = Arrays.copyOf(leftSources, leftSources.length);

keyColumnCount = leftSources.length;
useLeftGrouping = control.useGrouping(leftTable, leftSources);
useLeftGrouping = JoinControl.useGrouping(leftTable, leftSources);
// note that the naturalJoin operation ignores this field, because there is never any point to reading or
// processing grouping information when we have a single row on the right side. Cross join just doesn't support
// grouping at all (yuck).
useRightGrouping = control.useGrouping(rightTable, rightSources);
useRightGrouping = JoinControl.useGrouping(rightTable, rightSources);

for (int ii = 0; ii < keyColumnCount; ++ii) {
final Class leftType = TypeUtils.getUnboxedTypeIfBoxed(leftSources[ii].getType());
final Class rightType = TypeUtils.getUnboxedTypeIfBoxed(rightSources[ii].getType());
final Class<?> leftType = TypeUtils.getUnboxedTypeIfBoxed(leftSources[ii].getType());
final Class<?> rightType = TypeUtils.getUnboxedTypeIfBoxed(rightSources[ii].getType());
if (leftType != rightType) {
throw new IllegalArgumentException(
"Mismatched join types, " + columnsToMatch[ii] + ": " + leftType + " != " + rightType);
@@ -91,8 +91,8 @@ class BucketingContext implements SafeCloseable {
} else if (leftType == String.class) {
if (control.considerSymbolTables(leftTable, rightTable, useLeftGrouping, useRightGrouping,
leftSources[ii], rightSources[ii])) {
final SymbolTableSource leftSymbolTableSource = (SymbolTableSource) leftSources[ii];
final SymbolTableSource rightSymbolTableSource = (SymbolTableSource) rightSources[ii];
final SymbolTableSource<?> leftSymbolTableSource = (SymbolTableSource<?>) leftSources[ii];
final SymbolTableSource<?> rightSymbolTableSource = (SymbolTableSource<?>) rightSources[ii];

final Table leftSymbolTable = leftSymbolTableSource.getStaticSymbolTable(leftTable.getRowSet(),
control.useSymbolTableLookupCaching());
@@ -120,9 +120,9 @@ class BucketingContext implements SafeCloseable {
final ColumnSource<Long> rightSourceAsLong = rightSources[ii].reinterpret(long.class);

leftSources[ii] =
new NaturalJoinHelper.SymbolTableToUniqueIdSource(leftSourceAsLong, leftSymbolMapper);
new SymbolTableToUniqueIdSource(leftSourceAsLong, leftSymbolMapper);
rightSources[ii] =
new NaturalJoinHelper.SymbolTableToUniqueIdSource(rightSourceAsLong, rightSymbolMapper);
new SymbolTableToUniqueIdSource(rightSourceAsLong, rightSymbolMapper);

if (leftSources.length == 1) {
uniqueValues = true;
Original file line number Diff line number Diff line change
@@ -6,22 +6,22 @@
import io.deephaven.base.Pair;
import io.deephaven.base.verify.Assert;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.*;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.by.typed.TypedHasherFactory;
import io.deephaven.engine.table.impl.join.JoinListenerRecorder;
import io.deephaven.engine.table.impl.naturaljoin.*;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.*;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableInt;
import org.jetbrains.annotations.NotNull;

import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

class NaturalJoinHelper {

@@ -468,70 +468,6 @@ private static QueryTable makeResult(@NotNull final QueryTable leftTable,
return new QueryTable(leftTable.getRowSet(), columnSourceMap);
}

/**
* This column source is used as a wrapper for the original table's symbol sources.
*
* The symbol sources are reinterpreted to longs, and then the SymbolCombiner produces an IntegerSparseArraySource
* for each side. To convert from the symbol table value, we simply look it up in the symbolLookup source and use
* that as our chunked result.
*/
static class SymbolTableToUniqueIdSource extends AbstractColumnSource<Integer>
implements ImmutableColumnSourceGetDefaults.ForInt {
private final ColumnSource<Long> symbolSource;
private final IntegerSparseArraySource symbolLookup;

SymbolTableToUniqueIdSource(ColumnSource<Long> symbolSource, IntegerSparseArraySource symbolLookup) {
super(int.class);
this.symbolSource = symbolSource;
this.symbolLookup = symbolLookup;
}

@Override
public int getInt(long rowKey) {
final long symbolId = symbolSource.getLong(rowKey);
return symbolLookup.getInt(symbolId);
}

private class LongToIntFillContext implements ColumnSource.FillContext {
final WritableLongChunk<Values> longChunk;
final FillContext innerFillContext;

LongToIntFillContext(final int chunkCapacity, final SharedContext sharedState) {
longChunk = WritableLongChunk.makeWritableChunk(chunkCapacity);
innerFillContext = symbolSource.makeFillContext(chunkCapacity, sharedState);
}

@Override
public void close() {
longChunk.close();
innerFillContext.close();
}
}

@Override
public FillContext makeFillContext(final int chunkCapacity, final SharedContext sharedContext) {
return new LongToIntFillContext(chunkCapacity, sharedContext);
}

@Override
public void fillChunk(@NotNull final FillContext context,
@NotNull final WritableChunk<? super Values> destination, @NotNull final RowSequence rowSequence) {
final WritableIntChunk<? super Values> destAsInt = destination.asWritableIntChunk();
final LongToIntFillContext longToIntContext = (LongToIntFillContext) context;
final WritableLongChunk<Values> longChunk = longToIntContext.longChunk;
symbolSource.fillChunk(longToIntContext.innerFillContext, longChunk, rowSequence);
for (int ii = 0; ii < longChunk.size(); ++ii) {
destAsInt.set(ii, symbolLookup.getInt(longChunk.get(ii)));
}
destination.setSize(longChunk.size());
}

@Override
public boolean isStateless() {
return symbolSource.isStateless();
}
}

private static class LeftTickingListener extends BaseTable.ListenerImpl {
final LongArraySource newLeftRedirections;
private final QueryTable result;
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.deephaven.engine.table.impl;

import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.sources.IntegerSparseArraySource;
import org.jetbrains.annotations.NotNull;

/**
* This column source is used as a wrapper for the original table's symbol sources.
* <p>
* The symbol sources are reinterpreted to longs, and then the SymbolCombiner produces an IntegerSparseArraySource for
* each side. To convert from the symbol table value, we simply look it up in the symbolLookup source and use that as
* our chunked result.
*/
public class SymbolTableToUniqueIdSource extends AbstractColumnSource<Integer>
implements ImmutableColumnSourceGetDefaults.ForInt {
private final ColumnSource<Long> symbolSource;
private final IntegerSparseArraySource symbolLookup;

SymbolTableToUniqueIdSource(ColumnSource<Long> symbolSource, IntegerSparseArraySource symbolLookup) {
super(int.class);
this.symbolSource = symbolSource;
this.symbolLookup = symbolLookup;
}

@Override
public int getInt(long rowKey) {
final long symbolId = symbolSource.getLong(rowKey);
return symbolLookup.getInt(symbolId);
}

private class LongToIntFillContext implements FillContext {
final WritableLongChunk<Values> longChunk;
final FillContext innerFillContext;

LongToIntFillContext(final int chunkCapacity, final SharedContext sharedState) {
longChunk = WritableLongChunk.makeWritableChunk(chunkCapacity);
innerFillContext = symbolSource.makeFillContext(chunkCapacity, sharedState);
}

@Override
public void close() {
longChunk.close();
innerFillContext.close();
}
}

@Override
public FillContext makeFillContext(final int chunkCapacity, final SharedContext sharedContext) {
return new LongToIntFillContext(chunkCapacity, sharedContext);
}

@Override
public void fillChunk(@NotNull final FillContext context, @NotNull final WritableChunk<? super Values> destination,
@NotNull final RowSequence orderedKeys) {
fillChunkWithSymbolSource(context, destination, orderedKeys);
}

public WritableLongChunk<Values> fillChunkWithSymbolSource(@NotNull final FillContext context,
@NotNull final WritableChunk<? super Values> destination, @NotNull final RowSequence orderedKeys) {
final WritableIntChunk<? super Values> destAsInt = destination.asWritableIntChunk();
final LongToIntFillContext longToIntContext = (LongToIntFillContext) context;
final WritableLongChunk<Values> longChunk = longToIntContext.longChunk;
symbolSource.fillChunk(longToIntContext.innerFillContext, longChunk, orderedKeys);
for (int ii = 0; ii < longChunk.size(); ++ii) {
destAsInt.set(ii, symbolLookup.getInt(longChunk.get(ii)));
}
destination.setSize(longChunk.size());

return longChunk;
}

public static SymbolTableToUniqueIdSource getUniqueIdSource(final Table symbolTable,
final ColumnSource<?> keySource) {
final IntegerSparseArraySource symbolMapper = new IntegerSparseArraySource();
if (symbolTable.size() > 0) {
final SymbolTableCombiner stc = new SymbolTableCombiner(new ColumnSource<?>[] {keySource},
Integer.highestOneBit(symbolTable.intSize()) << 1);
stc.addSymbols(symbolTable, symbolMapper);
}

return new SymbolTableToUniqueIdSource(keySource.reinterpret(long.class), symbolMapper);
}

@Override
public boolean isStateless() {
return symbolSource.isStateless();
}
}
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@

import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.sources.regioned.SymbolTableSource;
import io.deephaven.util.annotations.VisibleForTesting;
import org.jetbrains.annotations.NotNull;

@@ -49,27 +50,19 @@ public boolean shouldProbeShift(final long shiftSize, final int numStates) {
return shiftSize <= numStates * 2;
}

// boolean considerSymbolTables(@NotNull final Table inputTable, final boolean useGrouping, @NotNull final
// ColumnSource<?>[] sources) {
// return !inputTable.refreshing()
// && !useGrouping
// && sources.length == 1
// && sources[0] instanceof SymbolTableSource
// && ((SymbolTableSource) sources[0]).hasSymbolTable(inputTable.getRowSet());
// }
//
// boolean useSymbolTableLookupCaching() {
// return false;
// }
//
// boolean useSymbolTables(final long inputTableSize, final long symbolTableSize) {
// return symbolTableSize <= inputTableSize / 2;
// }
//
// boolean useUniqueTable(final boolean uniqueValues, final long maximumUniqueValue, final long minimumUniqueValue)
// {
// // We want to have one left over value for "no good" (Integer.MAX_VALUE), and then we need another value to
// // represent that (max - min + 1) is the number of slots required.
// return uniqueValues && (maximumUniqueValue - minimumUniqueValue) < (Integer.MAX_VALUE - 2);
// }
boolean considerSymbolTables(@NotNull final Table inputTable, final boolean useGrouping,
@NotNull final ColumnSource<?>[] sources) {
return !inputTable.isRefreshing() && !useGrouping && sources.length == 1
&& SymbolTableSource.hasSymbolTable(sources[0], inputTable.getRowSet());
}

boolean useSymbolTableLookupCaching() {
return false;
}

boolean useSymbolTables(final long inputTableSize, final long symbolTableSize) {
// the less than vs. leq is important here, so that we do not attempt to use a symbol table for an empty table
// which fails later on in the SymbolTableCombiner
return symbolTableSize < inputTableSize / 2;
}
}
Loading