Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Use InternalData with avro and common DataIterable for readers. #12476

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundReference;
Expand Down Expand Up @@ -192,13 +191,11 @@ public List<DeleteFile> deletes() {
@Override
public CloseableIterable<StructLike> rows() {
try (CloseableIterable<ManifestFile> manifests =
Avro.read(io.newInputFile(manifestListLocation))
.rename("manifest_file", GenericManifestFile.class.getName())
.rename("partitions", GenericPartitionFieldSummary.class.getName())
.rename("r508", GenericPartitionFieldSummary.class.getName())
InternalData.read(FileFormat.AVRO, io.newInputFile(manifestListLocation))
.setRootType(GenericManifestFile.class)
.setCustomType(508, GenericPartitionFieldSummary.class)
.project(ManifestFile.schema())
.classLoader(GenericManifestFile.class.getClassLoader())
.reuseContainers(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just using the default, right?

.build()) {

CloseableIterable<StructLike> rowIterable =
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/org/apache/iceberg/InternalData.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@
import org.slf4j.LoggerFactory;

public class InternalData {
/**
* An iterable returned by readers that also exposes access to the file metadata.
*
* @param <D> internal data model for records
*/
public interface DataIterable<D> extends CloseableIterable<D> {

/**
* Returns key/value metadata of file being read
*
* @return metadata
*/
default Map<String, String> metadata() {
throw new UnsupportedOperationException("File metadata not supported");
}
}

private InternalData() {}

private static final Logger LOG = LoggerFactory.getLogger(InternalData.class);
Expand Down Expand Up @@ -163,6 +180,11 @@ public interface ReadBuilder {
/** Set a custom class for in-memory objects at the given field ID. */
ReadBuilder setCustomType(int fieldId, Class<? extends StructLike> structClass);

/** Set the classloader used for custom types. */
default ReadBuilder classLoader(ClassLoader classLoader) {
throw new UnsupportedOperationException("Classloader not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this is needed. I originally added it in the InternalData commit, but because we are passing the classes themselves (rather than loading them dynamically by name) they are already loaded.

}

/** Build the configured reader. */
<D> CloseableIterable<D> build();
}
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/java/org/apache/iceberg/ManifestLists.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.List;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
Expand All @@ -33,13 +32,11 @@ private ManifestLists() {}

static List<ManifestFile> read(InputFile manifestList) {
try (CloseableIterable<ManifestFile> files =
Avro.read(manifestList)
.rename("manifest_file", GenericManifestFile.class.getName())
.rename("partitions", GenericPartitionFieldSummary.class.getName())
.rename("r508", GenericPartitionFieldSummary.class.getName())
InternalData.read(FileFormat.AVRO, manifestList)
.setRootType(GenericManifestFile.class)
.setCustomType(508, GenericPartitionFieldSummary.class)
.classLoader(GenericManifestFile.class.getClassLoader())
.project(ManifestFile.schema())
.reuseContainers(false)
.build()) {

return Lists.newLinkedList(files);
Expand Down
14 changes: 9 additions & 5 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -133,12 +131,18 @@ private <T extends ContentFile<T>> PartitionSpec readPartitionSpec(InputFile inp
private static <T extends ContentFile<T>> Map<String, String> readMetadata(InputFile inputFile) {
Map<String, String> metadata;
try {
try (AvroIterable<ManifestEntry<T>> headerReader =
Avro.read(inputFile)
try (CloseableIterable<ManifestEntry<T>> headerReader =
InternalData.read(FileFormat.AVRO, inputFile)
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
.classLoader(GenericManifestEntry.class.getClassLoader())
.build()) {
metadata = headerReader.getMetadata();

if (headerReader instanceof InternalData.DataIterable) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little bit of a workaround since we can't switch fully over to DataIterable due to binary incompatibility, but this is about the only place other than tests where it's used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this is needed and I'd prefer not to add the interface if possible. We should not really be using the file metadata to recover things like the partition spec. I think that this is only used by older code paths that we didn't migrate to pass the id to spec map through.

At this point, I think we should go see where those methods are used and try to remove them, instead of adding this.

metadata = ((InternalData.DataIterable<?>) headerReader).metadata();
} else {
throw new RuntimeException(
"Reader does not support metadata reading: " + headerReader.getClass().getName());
}
}
} catch (IOException e) {
throw new RuntimeIOException(e);
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
return this;
}

@Override
public ReadBuilder classLoader(ClassLoader classLoader) {
this.loader = classLoader;
return this;
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/java/org/apache/iceberg/avro/AvroIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.io.DatumReader;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Suppliers;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class AvroIterable<D> extends CloseableGroup implements CloseableIterable<D> {
public class AvroIterable<D> extends CloseableGroup implements InternalData.DataIterable<D> {
private final InputFile file;
private final DatumReader<D> reader;
private final Long start;
Expand All @@ -61,6 +61,11 @@ private DataFileReader<D> initMetadata(DataFileReader<D> metadataReader) {
return metadataReader;
}

@Override
public Map<String, String> metadata() {
return getMetadata();
}

public Map<String, String> getMetadata() {
if (metadata == null) {
try (DataFileReader<D> reader = newFileReader()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import java.io.IOException;
import java.util.NoSuchElementException;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.parquet.hadoop.ParquetReader;

public class ParquetIterable<T> extends CloseableGroup implements CloseableIterable<T> {
public class ParquetIterable<T> extends CloseableGroup implements InternalData.DataIterable<T> {

private final ParquetReader.Builder<T> builder;

ParquetIterable(ParquetReader.Builder<T> builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
Expand All @@ -34,7 +35,7 @@
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;

public class ParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
public class ParquetReader<T> extends CloseableGroup implements InternalData.DataIterable<T> {
private final InputFile input;
private final Schema expectedSchema;
private final ParquetReadOptions options;
Expand Down Expand Up @@ -93,6 +94,11 @@ public CloseableIterator<T> iterator() {
return iter;
}

@Override
public Map<String, String> metadata() {
return conf.metadata().getKeyValueMetaData();
}

private static class FileIterator<T> implements CloseableIterator<T> {
private final ParquetFileReader reader;
private final boolean[] shouldSkip;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.schema.MessageType;

/**
Expand All @@ -55,6 +56,7 @@ class ReadConf<T> {
private final long totalValues;
private final boolean reuseContainers;
private final Integer batchSize;
private final FileMetaData metadata;

// List of column chunk metadata for each row group
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
Expand All @@ -74,6 +76,7 @@ class ReadConf<T> {
this.file = file;
this.options = options;
this.reader = newReader(file, options);
this.metadata = reader.getFileMetaData();
MessageType fileSchema = reader.getFileMetaData().getSchema();

MessageType typeWithIds;
Expand Down Expand Up @@ -144,6 +147,7 @@ private ReadConf(ReadConf<T> toCopy) {
this.batchSize = toCopy.batchSize;
this.vectorizedModel = toCopy.vectorizedModel;
this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
this.metadata = toCopy.metadata;
}

ParquetFileReader reader() {
Expand Down Expand Up @@ -181,6 +185,10 @@ Integer batchSize() {
return batchSize;
}

FileMetaData metadata() {
return metadata;
}

List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadataForRowGroups() {
return columnChunkMetaDataForRowGroups;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Function;
import org.apache.iceberg.InternalData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
Expand All @@ -39,7 +39,8 @@
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.MessageType;

public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {
public class VectorizedParquetReader<T> extends CloseableGroup
implements InternalData.DataIterable<T> {
private final InputFile input;
private final Schema expectedSchema;
private final ParquetReadOptions options;
Expand Down Expand Up @@ -101,6 +102,11 @@ public CloseableIterator<T> iterator() {
return iter;
}

@Override
public Map<String, String> metadata() {
return conf.metadata().getKeyValueMetaData();
}

private static class FileIterator<T> implements CloseableIterator<T> {
private final ParquetFileReader reader;
private final boolean[] shouldSkip;
Expand Down