From c81f8be252946b46977c52f3c0c25ff72f6c57b7 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 4 Mar 2025 17:50:50 -0800 Subject: [PATCH] Avro: Support variant in Avro readers, writers. --- .../apache/iceberg/variants/Serialized.java | 2 +- .../java/org/apache/iceberg/avro/Avro.java | 1 + .../iceberg/avro/AvroWithPartnerVisitor.java | 31 ++++++++- .../apache/iceberg/avro/BaseWriteBuilder.java | 6 ++ .../iceberg/avro/GenericAvroReader.java | 6 ++ .../apache/iceberg/avro/InternalReader.java | 6 ++ .../org/apache/iceberg/avro/ValueReaders.java | 13 ++-- .../org/apache/iceberg/avro/ValueWriters.java | 46 +++++++++++++ .../iceberg/avro/VariantConversion.java | 68 +++++++++++++++++++ .../apache/iceberg/data/avro/DataWriter.java | 6 ++ .../iceberg/data/avro/PlannedDataReader.java | 6 ++ .../apache/iceberg/InternalTestHelpers.java | 8 +++ .../apache/iceberg/avro/AvroTestHelpers.java | 10 +++ .../apache/iceberg/avro/RandomAvroData.java | 6 ++ .../apache/iceberg/avro/TestGenericAvro.java | 5 ++ .../apache/iceberg/avro/TestInternalAvro.java | 5 ++ .../iceberg/data/avro/TestGenericData.java | 5 ++ 17 files changed, 224 insertions(+), 6 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/avro/VariantConversion.java diff --git a/api/src/main/java/org/apache/iceberg/variants/Serialized.java b/api/src/main/java/org/apache/iceberg/variants/Serialized.java index f66d0c898135..b5cb5e38816c 100644 --- a/api/src/main/java/org/apache/iceberg/variants/Serialized.java +++ b/api/src/main/java/org/apache/iceberg/variants/Serialized.java @@ -20,6 +20,6 @@ import java.nio.ByteBuffer; -interface Serialized { +public interface Serialized { ByteBuffer buffer(); } diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 2a3ea11590bb..6c7edc25b691 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -89,6 +89,7 @@ private enum Codec { LogicalTypes.register(VariantLogicalType.NAME, schema -> VariantLogicalType.get()); DEFAULT_MODEL.addLogicalTypeConversion(new Conversions.DecimalConversion()); DEFAULT_MODEL.addLogicalTypeConversion(new UUIDConversion()); + DEFAULT_MODEL.addLogicalTypeConversion(new VariantConversion()); } public static WriteBuilder write(OutputFile file) { diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java index 692c1ead3fbf..83ddc9be5e29 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerVisitor.java @@ -89,6 +89,10 @@ public R map(P partner, Schema map, R valueResult) { return null; } + public R variant(P partner, R metadataResult, R valueResult) { + throw new UnsupportedOperationException("Visitor does not support variant"); + } + public R primitive(P partner, Schema primitive) { return null; } @@ -100,7 +104,11 @@ public static R visit( PartnerAccessors

accessors) { switch (schema.getType()) { case RECORD: - return visitRecord(partner, schema, visitor, accessors); + if (schema.getLogicalType() instanceof VariantLogicalType) { + return visitVariant(partner, schema, visitor, accessors); + } else { + return visitRecord(partner, schema, visitor, accessors); + } case UNION: return visitUnion(partner, schema, visitor, accessors); @@ -123,6 +131,27 @@ public static R visit( } } + private static R visitVariant( + P partner, + Schema variant, + AvroWithPartnerVisitor visitor, + PartnerAccessors

accessors) { + // check to make sure this hasn't been visited before + String recordName = variant.getFullName(); + Preconditions.checkState( + !visitor.recordLevels.contains(recordName), + "Cannot process recursive Avro record %s", + recordName); + visitor.recordLevels.push(recordName); + + R metadataResult = visit(null, variant.getField("metadata").schema(), visitor, accessors); + R valueResult = visit(null, variant.getField("value").schema(), visitor, accessors); + + visitor.recordLevels.pop(); + + return visitor.variant(partner, metadataResult, valueResult); + } + private static R visitRecord( P partnerStruct, Schema record, diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java index 3955bd83a97a..5fd75acbf13f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java +++ b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java @@ -64,6 +64,12 @@ public ValueWriter map(Schema map, ValueWriter valueWriter) { return ValueWriters.map(ValueWriters.strings(), valueWriter); } + @Override + public ValueWriter variant( + Schema variant, ValueWriter metadataResult, ValueWriter valueResult) { + return ValueWriters.variants(metadataResult, valueResult); + } + @Override public ValueWriter primitive(Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index 0ed342b458d5..8fd39ee4b780 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -163,6 +163,12 @@ public ValueReader map(Type partner, Schema map, ValueReader valueReader) return ValueReaders.map(ValueReaders.strings(), valueReader); } + @Override + public ValueReader variant( + Type partner, ValueReader metadataReader, ValueReader valueReader) { + return ValueReaders.variants(metadataReader, valueReader); + } + @Override public ValueReader primitive(Type partner, Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java index 18ec485f083d..09a7a0951ca7 100644 --- a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java @@ -160,6 +160,12 @@ public ValueReader map(Pair partner, Schema map, ValueReader variant( + Pair partner, ValueReader metadataReader, ValueReader valueReader) { + return ValueReaders.variants(metadataReader, valueReader); + } + @Override public ValueReader primitive(Pair partner, Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 94b30d87f680..8b3fbe25b0d9 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -144,9 +144,11 @@ public static ValueReader decimalBytesReader(Schema schema) { } } + @SuppressWarnings("unchecked") public static ValueReader variants( - ValueReader metadataReader, ValueReader valueReader) { - return new VariantReader(metadataReader, valueReader); + ValueReader metadataReader, ValueReader valueReader) { + return new VariantReader( + (ValueReader) metadataReader, (ValueReader) valueReader); } public static ValueReader union(List> readers) { @@ -673,8 +675,11 @@ public VariantReader( @Override public Variant read(Decoder decoder, Object reuse) throws IOException { - VariantMetadata metadata = VariantMetadata.from(metadataReader.read(decoder, null)); - VariantValue value = VariantValue.from(metadata, metadataReader.read(decoder, null)); + VariantMetadata metadata = + VariantMetadata.from(metadataReader.read(decoder, null).order(ByteOrder.LITTLE_ENDIAN)); + VariantValue value = + VariantValue.from( + metadata, metadataReader.read(decoder, null).order(ByteOrder.LITTLE_ENDIAN)); return Variant.of(metadata, value); } diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java index afc0a37a2838..323f2f86bba3 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java @@ -37,6 +37,10 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Serialized; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; public class ValueWriters { private ValueWriters() {} @@ -109,6 +113,13 @@ public static ValueWriter decimal(int precision, int scale) { return new DecimalWriter(precision, scale); } + @SuppressWarnings("unchecked") + public static ValueWriter variants( + ValueWriter metadataWriter, ValueWriter valueWriter) { + return new VariantWriter( + (ValueWriter) metadataWriter, (ValueWriter) valueWriter); + } + public static ValueWriter option(int nullIndex, ValueWriter writer) { return new OptionWriter<>(nullIndex, writer); } @@ -373,6 +384,41 @@ public void write(BigDecimal decimal, Encoder encoder) throws IOException { } } + private static class VariantWriter implements ValueWriter { + private final ValueWriter metadataWriter; + private final ValueWriter valueWriter; + + private VariantWriter( + ValueWriter metadataWriter, ValueWriter valueWriter) { + this.metadataWriter = metadataWriter; + this.valueWriter = valueWriter; + } + + @Override + public void write(Variant variant, Encoder encoder) throws IOException { + VariantMetadata metadata = variant.metadata(); + if (metadata instanceof Serialized) { + metadataWriter.write(((Serialized) metadata).buffer(), encoder); + } else { + // TODO: reuse buffers using buffer size code from Parquet + ByteBuffer metadataBuffer = + ByteBuffer.allocate(metadata.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + variant.metadata().writeTo(metadataBuffer, 0); + metadataWriter.write(metadataBuffer, encoder); + } + + VariantValue value = variant.value(); + if (value instanceof Serialized) { + valueWriter.write(((Serialized) value).buffer(), encoder); + } else { + ByteBuffer valueBuffer = + ByteBuffer.allocate(variant.value().sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + variant.value().writeTo(valueBuffer, 0); + valueWriter.write(valueBuffer, encoder); + } + } + } + private static class OptionWriter implements ValueWriter { private final int nullIndex; private final int valueIndex; diff --git a/core/src/main/java/org/apache/iceberg/avro/VariantConversion.java b/core/src/main/java/org/apache/iceberg/avro/VariantConversion.java new file mode 100644 index 000000000000..230bf01bb966 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/VariantConversion.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.avro.Conversion; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; + +public class VariantConversion extends Conversion { + @Override + public Class getConvertedType() { + return Variant.class; + } + + @Override + public String getLogicalTypeName() { + return VariantLogicalType.NAME; + } + + @Override + public Variant fromRecord(IndexedRecord record, Schema schema, LogicalType type) { + int metadataPos = schema.getField("metadata").pos(); + int valuePos = schema.getField("value").pos(); + VariantMetadata metadata = VariantMetadata.from((ByteBuffer) record.get(metadataPos)); + VariantValue value = VariantValue.from(metadata, (ByteBuffer) record.get(valuePos)); + return Variant.of(metadata, value); + } + + @Override + public IndexedRecord toRecord(Variant variant, Schema schema, LogicalType type) { + int metadataPos = schema.getField("metadata").pos(); + int valuePos = schema.getField("value").pos(); + GenericRecord record = new GenericData.Record(schema); + ByteBuffer metadataBuffer = + ByteBuffer.allocate(variant.metadata().sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + variant.metadata().writeTo(metadataBuffer, 0); + record.put(metadataPos, metadataBuffer); + ByteBuffer valueBuffer = + ByteBuffer.allocate(variant.value().sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + variant.value().writeTo(valueBuffer, 0); + record.put(valuePos, valueBuffer); + return record; + } +} diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java index 46ac3e07ea15..19ff9f42759a 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java @@ -101,6 +101,12 @@ public ValueWriter map(Schema map, ValueWriter valueWriter) { return ValueWriters.map(ValueWriters.strings(), valueWriter); } + @Override + public ValueWriter variant( + Schema variant, ValueWriter metadataResult, ValueWriter valueResult) { + return ValueWriters.variants(metadataResult, valueResult); + } + @Override public ValueWriter primitive(Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java index 3e401023464b..7eeac5818759 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java @@ -125,6 +125,12 @@ public ValueReader map(Type ignored, Schema map, ValueReader valueReader) return ValueReaders.map(ValueReaders.strings(), valueReader); } + @Override + public ValueReader variant( + Type partner, ValueReader metadataReader, ValueReader valueReader) { + return ValueReaders.variants(metadataReader, valueReader); + } + @Override public ValueReader primitive(Type partner, Schema primitive) { LogicalType logicalType = primitive.getLogicalType(); diff --git a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java index 781051f11d7b..f83827349c4b 100644 --- a/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/InternalTestHelpers.java @@ -93,6 +93,14 @@ private static void assertEquals(Type type, Object expected, Object actual) { case DECIMAL: assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); break; + case VARIANT: + assertThat(expected).as("Expected should be a Variant").isInstanceOf(Variant.class); + assertThat(actual).as("Actual should be a Variant").isInstanceOf(Variant.class); + Variant expectedVariant = (Variant) expected; + Variant actualVariant = (Variant) actual; + VariantTestUtil.assertEqual(expectedVariant.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedVariant.value(), actualVariant.value()); + break; case STRUCT: assertThat(expected).as("Expected should be a StructLike").isInstanceOf(StructLike.class); assertThat(actual).as("Actual should be a StructLike").isInstanceOf(StructLike.class); diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java index d4c848c9e600..56efc4cd3e72 100644 --- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java +++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java @@ -29,6 +29,8 @@ import org.apache.avro.generic.GenericData.Record; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantTestUtil; class AvroTestHelpers { @@ -135,6 +137,14 @@ private static void assertEquals(Type type, Object expected, Object actual) { case DECIMAL: assertThat(actual).as("Primitive value should be equal to expected").isEqualTo(expected); break; + case VARIANT: + assertThat(expected).as("Expected should be a Variant").isInstanceOf(Variant.class); + assertThat(actual).as("Actual should be a Variant").isInstanceOf(Variant.class); + Variant expectedVariant = (Variant) expected; + Variant actualVariant = (Variant) actual; + VariantTestUtil.assertEqual(expectedVariant.metadata(), actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedVariant.value(), actualVariant.value()); + break; case STRUCT: assertThat(expected).as("Expected should be a Record").isInstanceOf(Record.class); assertThat(actual).as("Actual should be a Record").isInstanceOf(Record.class); diff --git a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java index ebb9d93a5342..b3b9ad858b66 100644 --- a/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java +++ b/core/src/test/java/org/apache/iceberg/avro/RandomAvroData.java @@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.util.Utf8; +import org.apache.iceberg.RandomVariants; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -93,6 +94,11 @@ public Object map(Types.MapType map, Supplier keyResult, Supplier expected = RandomAvroData.generate(schema, 100, 0L); diff --git a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java index f418e69e0580..b7799ee88c19 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestInternalAvro.java @@ -47,6 +47,11 @@ protected boolean supportsTimestampNanos() { return true; } + @Override + protected boolean supportsVariant() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { List expected = RandomInternalData.generate(schema, 100, 42L); diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java index 19c8822e81be..3900cd466d25 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java @@ -98,4 +98,9 @@ protected boolean supportsUnknown() { protected boolean supportsTimestampNanos() { return true; } + + @Override + protected boolean supportsVariant() { + return true; + } }