Skip to content

Commit

Permalink
Improve Iceberg table properties building
Browse files Browse the repository at this point in the history
When building Iceberg table properties with many files under one partition,
process this partition only once
  • Loading branch information
jinyangli34 committed Mar 3, 2025
1 parent efc081e commit b6f21f6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
Expand Down Expand Up @@ -800,40 +801,44 @@ public ConnectorTableProperties getTableProperties(ConnectorSession session, Con
Map<Integer, IcebergColumnHandle> columns = getProjectedColumns(icebergTable.schema(), typeManager, partitionSourceIds).stream()
.collect(toImmutableMap(IcebergColumnHandle::getId, identity()));

Supplier<List<FileScanTask>> lazyFiles = Suppliers.memoize(() -> {
Supplier<Map<StructLikeWrapperWithFieldIdToIndex, PartitionSpec>> uniquePartitions = Suppliers.memoize(() -> {
TableScan tableScan = icebergTable.newScan()
.useSnapshot(table.getSnapshotId().get())
.filter(toIcebergExpression(enforcedPredicate));

try (CloseableIterable<FileScanTask> iterator = tableScan.planFiles()) {
return ImmutableList.copyOf(iterator);
return Streams.stream(iterator)
.collect(toMap(
StructLikeWrapperWithFieldIdToIndex::createStructLikeWrapper,
FileScanTask::spec,
(p, _) -> p));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
});

Iterable<FileScanTask> files = () -> lazyFiles.get().iterator();

Iterable<TupleDomain<ColumnHandle>> discreteTupleDomain = Iterables.transform(files, fileScan -> {
// Extract partition values in the data file
Map<Integer, Optional<String>> partitionColumnValueStrings = getPartitionKeys(fileScan);
Map<ColumnHandle, NullableValue> partitionValues = partitionSourceIds.stream()
.filter(partitionColumnValueStrings::containsKey)
.collect(toImmutableMap(
columns::get,
columnId -> {
IcebergColumnHandle column = columns.get(columnId);
Object prestoValue = deserializePartitionValue(
column.getType(),
partitionColumnValueStrings.get(columnId).orElse(null),
column.getName());

return new NullableValue(column.getType(), prestoValue);
}));

return TupleDomain.fromFixedValues(partitionValues);
});
Iterable<TupleDomain<ColumnHandle>> discreteTupleDomain = Iterables.transform(
() -> uniquePartitions.get().entrySet().iterator(),
entry -> {
// Extract partition values in the data file
Map<Integer, Optional<String>> partitionColumnValueStrings = getPartitionKeys(entry.getKey().getStructLikeWrapper().get(), entry.getValue());
Map<ColumnHandle, NullableValue> partitionValues = partitionSourceIds.stream()
.filter(partitionColumnValueStrings::containsKey)
.collect(toImmutableMap(
columns::get,
columnId -> {
IcebergColumnHandle column = columns.get(columnId);
Object prestoValue = deserializePartitionValue(
column.getType(),
partitionColumnValueStrings.get(columnId).orElse(null),
column.getName());

return new NullableValue(column.getType(), prestoValue);
}));

return TupleDomain.fromFixedValues(partitionValues);
});

discretePredicates = new DiscretePredicates(
columns.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,33 @@ public void testOptimization()
values(ImmutableList.of("b", "c"), ImmutableList.of())));
}

@Test
public void testOptimizationOnPartitionWithMultipleFiles()
{
String testTable = "test_metadata_optimization_on_partition_with_multiple_files";

getPlanTester().executeStatement(format(
"CREATE TABLE %s (a, b, c) WITH (PARTITIONING = ARRAY['b', 'c']) AS VALUES (1, 8, 9), (2, 8, 9)",
testTable));

Session session = Session.builder(getPlanTester().getDefaultSession())
.setSystemProperty("optimize_metadata_queries", "true")
.build();

// Insert again to generate another file in same partition
getPlanTester().executeStatement(format(
"INSERT INTO %s VALUES (3, 8, 9)",
testTable));

assertPlan(
format("SELECT DISTINCT b, c FROM %s ORDER BY b", testTable),
session,
anyTree(values(
ImmutableList.of("b", "c"),
ImmutableList.of(
ImmutableList.of(new Constant(INTEGER, 8L), new Constant(INTEGER, 9L))))));
}

@Test
public void testOptimizationWithNullPartitions()
{
Expand Down

0 comments on commit b6f21f6

Please sign in to comment.