From fa608705e1f7c1119eea6e0a60233a25820429ac Mon Sep 17 00:00:00 2001 From: Daniel Weeks Date: Fri, 7 Mar 2025 09:51:14 -0800 Subject: [PATCH] Core: Use InternalData with Avro and common DataIterable for readers. --- .../org/apache/iceberg/AllManifestsTable.java | 9 +++----- .../java/org/apache/iceberg/InternalData.java | 22 +++++++++++++++++++ .../org/apache/iceberg/ManifestLists.java | 9 +++----- .../org/apache/iceberg/ManifestReader.java | 14 +++++++----- .../java/org/apache/iceberg/avro/Avro.java | 1 + .../org/apache/iceberg/avro/AvroIterable.java | 9 ++++++-- .../iceberg/parquet/ParquetIterable.java | 5 +++-- .../apache/iceberg/parquet/ParquetReader.java | 10 +++++++-- .../org/apache/iceberg/parquet/ReadConf.java | 8 +++++++ .../parquet/VectorizedParquetReader.java | 10 +++++++-- 10 files changed, 72 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java index 641a7a3c9aec..64ae15b36646 100644 --- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.BoundReference; @@ -192,13 +191,11 @@ public List deletes() { @Override public CloseableIterable rows() { try (CloseableIterable manifests = - Avro.read(io.newInputFile(manifestListLocation)) - .rename("manifest_file", GenericManifestFile.class.getName()) - .rename("partitions", GenericPartitionFieldSummary.class.getName()) - .rename("r508", GenericPartitionFieldSummary.class.getName()) + InternalData.read(FileFormat.AVRO, io.newInputFile(manifestListLocation)) + .setRootType(GenericManifestFile.class) + .setCustomType(508, GenericPartitionFieldSummary.class) .project(ManifestFile.schema()) .classLoader(GenericManifestFile.class.getClassLoader()) - .reuseContainers(false) .build()) { CloseableIterable rowIterable = diff --git a/core/src/main/java/org/apache/iceberg/InternalData.java b/core/src/main/java/org/apache/iceberg/InternalData.java index fa39d23e43fe..9fe74ec16024 100644 --- a/core/src/main/java/org/apache/iceberg/InternalData.java +++ b/core/src/main/java/org/apache/iceberg/InternalData.java @@ -34,6 +34,23 @@ import org.slf4j.LoggerFactory; public class InternalData { + /** + * An iterable returned by readers that also exposes access to the file metadata. + * + * @param internal data model for records + */ + public interface DataIterable extends CloseableIterable { + + /** + * Returns key/value metadata of file being read + * + * @return metadata + */ + default Map metadata() { + throw new UnsupportedOperationException("File metadata not supported"); + } + } + private InternalData() {} private static final Logger LOG = LoggerFactory.getLogger(InternalData.class); @@ -163,6 +180,11 @@ public interface ReadBuilder { /** Set a custom class for in-memory objects at the given field ID. */ ReadBuilder setCustomType(int fieldId, Class structClass); + /** Set the classloader used for custom types. */ + default ReadBuilder classLoader(ClassLoader classLoader) { + throw new UnsupportedOperationException("Classloader not supported"); + } + /** Build the configured reader. */ CloseableIterable build(); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java index f20a481cf25a..1886b9f7b42e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestLists.java +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.List; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; @@ -33,13 +32,11 @@ private ManifestLists() {} static List read(InputFile manifestList) { try (CloseableIterable files = - Avro.read(manifestList) - .rename("manifest_file", GenericManifestFile.class.getName()) - .rename("partitions", GenericPartitionFieldSummary.class.getName()) - .rename("r508", GenericPartitionFieldSummary.class.getName()) + InternalData.read(FileFormat.AVRO, manifestList) + .setRootType(GenericManifestFile.class) + .setCustomType(508, GenericPartitionFieldSummary.class) .classLoader(GenericManifestFile.class.getClassLoader()) .project(ManifestFile.schema()) - .reuseContainers(false) .build()) { return Lists.newLinkedList(files); diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 22a93a85ca6b..70f57e6b32a1 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.avro.AvroIterable; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; @@ -133,12 +131,18 @@ private > PartitionSpec readPartitionSpec(InputFile inp private static > Map readMetadata(InputFile inputFile) { Map metadata; try { - try (AvroIterable> headerReader = - Avro.read(inputFile) + try (CloseableIterable> headerReader = + InternalData.read(FileFormat.AVRO, inputFile) .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) .classLoader(GenericManifestEntry.class.getClassLoader()) .build()) { - metadata = headerReader.getMetadata(); + + if (headerReader instanceof InternalData.DataIterable) { + metadata = ((InternalData.DataIterable) headerReader).metadata(); + } else { + throw new RuntimeException( + "Reader does not support metadata reading: " + headerReader.getClass().getName()); + } } } catch (IOException e) { throw new RuntimeIOException(e); diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 2a3ea11590bb..b06155f4ea47 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -737,6 +737,7 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) { return this; } + @Override public ReadBuilder classLoader(ClassLoader classLoader) { this.loader = classLoader; return this; diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java index 3bcc6a4799d2..30b25bfb25d5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroIterable.java @@ -26,15 +26,15 @@ import org.apache.avro.file.DataFileReader; import org.apache.avro.file.FileReader; import org.apache.avro.io.DatumReader; +import org.apache.iceberg.InternalData; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableGroup; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Suppliers; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -public class AvroIterable extends CloseableGroup implements CloseableIterable { +public class AvroIterable extends CloseableGroup implements InternalData.DataIterable { private final InputFile file; private final DatumReader reader; private final Long start; @@ -61,6 +61,11 @@ private DataFileReader initMetadata(DataFileReader metadataReader) { return metadataReader; } + @Override + public Map metadata() { + return getMetadata(); + } + public Map getMetadata() { if (metadata == null) { try (DataFileReader reader = newFileReader()) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java index ac4e5c1f97ed..d8c2fb8f1a9c 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java @@ -20,13 +20,14 @@ import java.io.IOException; import java.util.NoSuchElementException; +import org.apache.iceberg.InternalData; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableGroup; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.parquet.hadoop.ParquetReader; -public class ParquetIterable extends CloseableGroup implements CloseableIterable { +public class ParquetIterable extends CloseableGroup implements InternalData.DataIterable { + private final ParquetReader.Builder builder; ParquetIterable(ParquetReader.Builder builder) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index e8ee90fdebb7..6ec84bd21a4b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -19,13 +19,14 @@ package org.apache.iceberg.parquet; import java.io.IOException; +import java.util.Map; import java.util.function.Function; +import org.apache.iceberg.InternalData; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableGroup; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; @@ -34,7 +35,7 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.schema.MessageType; -public class ParquetReader extends CloseableGroup implements CloseableIterable { +public class ParquetReader extends CloseableGroup implements InternalData.DataIterable { private final InputFile input; private final Schema expectedSchema; private final ParquetReadOptions options; @@ -93,6 +94,11 @@ public CloseableIterator iterator() { return iter; } + @Override + public Map metadata() { + return conf.metadata().getKeyValueMetaData(); + } + private static class FileIterator implements CloseableIterator { private final ParquetFileReader reader; private final boolean[] shouldSkip; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java index 1fb2372ba568..b9b48049d1e0 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -36,6 +36,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.schema.MessageType; /** @@ -55,6 +56,7 @@ class ReadConf { private final long totalValues; private final boolean reuseContainers; private final Integer batchSize; + private final FileMetaData metadata; // List of column chunk metadata for each row group private final List> columnChunkMetaDataForRowGroups; @@ -74,6 +76,7 @@ class ReadConf { this.file = file; this.options = options; this.reader = newReader(file, options); + this.metadata = reader.getFileMetaData(); MessageType fileSchema = reader.getFileMetaData().getSchema(); MessageType typeWithIds; @@ -144,6 +147,7 @@ private ReadConf(ReadConf toCopy) { this.batchSize = toCopy.batchSize; this.vectorizedModel = toCopy.vectorizedModel; this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups; + this.metadata = toCopy.metadata; } ParquetFileReader reader() { @@ -181,6 +185,10 @@ Integer batchSize() { return batchSize; } + FileMetaData metadata() { + return metadata; + } + List> columnChunkMetadataForRowGroups() { return columnChunkMetaDataForRowGroups; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index fc10a57ec0e0..b4147b290ed6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -23,12 +23,12 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.function.Function; +import org.apache.iceberg.InternalData; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableGroup; -import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; @@ -39,7 +39,8 @@ import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; -public class VectorizedParquetReader extends CloseableGroup implements CloseableIterable { +public class VectorizedParquetReader extends CloseableGroup + implements InternalData.DataIterable { private final InputFile input; private final Schema expectedSchema; private final ParquetReadOptions options; @@ -101,6 +102,11 @@ public CloseableIterator iterator() { return iter; } + @Override + public Map metadata() { + return conf.metadata().getKeyValueMetaData(); + } + private static class FileIterator implements CloseableIterator { private final ParquetFileReader reader; private final boolean[] shouldSkip;