Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro: Add variant readers and writers #12457

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@

import java.nio.ByteBuffer;

interface Serialized {
public interface Serialized {
ByteBuffer buffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ static VariantMetadata from(ByteBuffer buffer) {
return SerializedMetadata.from(buffer);
}

static VariantMetadata empty() {
return SerializedMetadata.EMPTY_V1_METADATA;
}

static String asString(VariantMetadata metadata) {
StringBuilder builder = new StringBuilder();

Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private enum Codec {
LogicalTypes.register(VariantLogicalType.NAME, schema -> VariantLogicalType.get());
DEFAULT_MODEL.addLogicalTypeConversion(new Conversions.DecimalConversion());
DEFAULT_MODEL.addLogicalTypeConversion(new UUIDConversion());
DEFAULT_MODEL.addLogicalTypeConversion(new VariantConversion());
}

public static WriteBuilder write(OutputFile file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public R map(P partner, Schema map, R valueResult) {
return null;
}

public R variant(P partner, R metadataResult, R valueResult) {
throw new UnsupportedOperationException("Visitor does not support variant");
}

public R primitive(P partner, Schema primitive) {
return null;
}
Expand All @@ -100,7 +104,11 @@ public static <P, R> R visit(
PartnerAccessors<P> accessors) {
switch (schema.getType()) {
case RECORD:
return visitRecord(partner, schema, visitor, accessors);
if (schema.getLogicalType() instanceof VariantLogicalType) {
return visitVariant(partner, schema, visitor, accessors);
} else {
return visitRecord(partner, schema, visitor, accessors);
}

case UNION:
return visitUnion(partner, schema, visitor, accessors);
Expand All @@ -123,6 +131,27 @@ public static <P, R> R visit(
}
}

private static <P, R> R visitVariant(
P partner,
Schema variant,
AvroWithPartnerVisitor<P, R> visitor,
PartnerAccessors<P> accessors) {
// check to make sure this hasn't been visited before
String recordName = variant.getFullName();
Preconditions.checkState(
!visitor.recordLevels.contains(recordName),
"Cannot process recursive Avro record %s",
recordName);
visitor.recordLevels.push(recordName);

R metadataResult = visit(null, variant.getField("metadata").schema(), visitor, accessors);
R valueResult = visit(null, variant.getField("value").schema(), visitor, accessors);

visitor.recordLevels.pop();

return visitor.variant(partner, metadataResult, valueResult);
}

private static <P, R> R visitRecord(
P partnerStruct,
Schema record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public ValueWriter<?> map(Schema map, ValueWriter<?> valueWriter) {
return ValueWriters.map(ValueWriters.strings(), valueWriter);
}

@Override
public ValueWriter<?> variant(
Schema variant, ValueWriter<?> metadataResult, ValueWriter<?> valueResult) {
return ValueWriters.variants();
}

@Override
public ValueWriter<?> primitive(Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public ValueReader<?> map(Type partner, Schema map, ValueReader<?> valueReader)
return ValueReaders.map(ValueReaders.strings(), valueReader);
}

@Override
public ValueReader<?> variant(
Type partner, ValueReader<?> metadataReader, ValueReader<?> valueReader) {
return ValueReaders.variants(metadataReader, valueReader);
}

@Override
public ValueReader<?> primitive(Type partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ public ValueReader<?> map(Pair<Integer, Type> partner, Schema map, ValueReader<?
return ValueReaders.map(ValueReaders.strings(), valueReader);
}

@Override
public ValueReader<?> variant(
Pair<Integer, Type> partner, ValueReader<?> metadataReader, ValueReader<?> valueReader) {
return ValueReaders.variants(metadataReader, valueReader);
}

@Override
public ValueReader<?> primitive(Pair<Integer, Type> partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;

public class ValueReaders {
private ValueReaders() {}
Expand Down Expand Up @@ -141,6 +144,13 @@ public static ValueReader<byte[]> decimalBytesReader(Schema schema) {
}
}

@SuppressWarnings("unchecked")
public static ValueReader<Variant> variants(
ValueReader<?> metadataReader, ValueReader<?> valueReader) {
return new VariantReader(
(ValueReader<ByteBuffer>) metadataReader, (ValueReader<ByteBuffer>) valueReader);
}

public static ValueReader<Object> union(List<ValueReader<?>> readers) {
return new UnionReader(readers);
}
Expand Down Expand Up @@ -653,6 +663,33 @@ public void skip(Decoder decoder) throws IOException {
}
}

private static class VariantReader implements ValueReader<Variant> {
private final ValueReader<ByteBuffer> metadataReader;
private final ValueReader<ByteBuffer> valueReader;

private VariantReader(
ValueReader<ByteBuffer> metadataReader, ValueReader<ByteBuffer> valueReader) {
this.metadataReader = metadataReader;
this.valueReader = valueReader;
}

@Override
public Variant read(Decoder decoder, Object reuse) throws IOException {
VariantMetadata metadata =
VariantMetadata.from(metadataReader.read(decoder, null).order(ByteOrder.LITTLE_ENDIAN));
VariantValue value =
VariantValue.from(
metadata, metadataReader.read(decoder, null).order(ByteOrder.LITTLE_ENDIAN));
return Variant.of(metadata, value);
}

@Override
public void skip(Decoder decoder) throws IOException {
metadataReader.skip(decoder);
valueReader.skip(decoder);
}
}

private static class UnionReader implements ValueReader<Object> {
private final ValueReader[] readers;

Expand Down
85 changes: 85 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@
import org.apache.avro.io.Encoder;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.Serialized;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;

public class ValueWriters {
private ValueWriters() {}
Expand Down Expand Up @@ -109,6 +114,10 @@ public static ValueWriter<BigDecimal> decimal(int precision, int scale) {
return new DecimalWriter(precision, scale);
}

public static ValueWriter<Variant> variants() {
return VariantWriter.INSTANCE;
}

public static <T> ValueWriter<T> option(int nullIndex, ValueWriter<T> writer) {
return new OptionWriter<>(nullIndex, writer);
}
Expand Down Expand Up @@ -373,6 +382,82 @@ public void write(BigDecimal decimal, Encoder encoder) throws IOException {
}
}

private abstract static class VariantBinaryWriter<T> implements ValueWriter<T> {
private ByteBuffer reusedBuffer = ByteBuffer.allocate(2048).order(ByteOrder.LITTLE_ENDIAN);

protected abstract int sizeInBytes(T value);

protected abstract int writeTo(ByteBuffer buffer, int offset, T value);

@Override
public void write(T datum, Encoder encoder) throws IOException {
if (datum instanceof Serialized) {
encoder.writeBytes(((Serialized) datum).buffer());
} else {
encoder.writeBytes(serialize(datum));
}
}

private void ensureCapacity(int requiredSize) {
if (reusedBuffer.capacity() < requiredSize) {
int newCapacity = IOUtil.capacityFor(requiredSize);
this.reusedBuffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN);
} else {
reusedBuffer.limit(requiredSize);
}
}

private ByteBuffer serialize(T value) {
ensureCapacity(sizeInBytes(value));
int size = writeTo(reusedBuffer, 0, value);
reusedBuffer.position(0);
reusedBuffer.limit(size);
return reusedBuffer;
}
}

private static class VariantMetadataWriter extends VariantBinaryWriter<VariantMetadata> {
@Override
protected int sizeInBytes(VariantMetadata metadata) {
return metadata.sizeInBytes();
}

@Override
protected int writeTo(ByteBuffer buffer, int offset, VariantMetadata metadata) {
return metadata.writeTo(buffer, offset);
}
}

private static class VariantValueWriter extends VariantBinaryWriter<VariantValue> {
@Override
protected int sizeInBytes(VariantValue value) {
return value.sizeInBytes();
}

@Override
protected int writeTo(ByteBuffer buffer, int offset, VariantValue value) {
return value.writeTo(buffer, offset);
}
}

private static class VariantWriter implements ValueWriter<Variant> {
private static final VariantWriter INSTANCE = new VariantWriter();

private final VariantMetadataWriter metadataWriter;
private final VariantValueWriter valueWriter;

private VariantWriter() {
this.metadataWriter = new VariantMetadataWriter();
this.valueWriter = new VariantValueWriter();
}

@Override
public void write(Variant variant, Encoder encoder) throws IOException {
metadataWriter.write(variant.metadata(), encoder);
valueWriter.write(variant.value(), encoder);
}
}

private static class OptionWriter<T> implements ValueWriter<T> {
private final int nullIndex;
private final int valueIndex;
Expand Down
68 changes: 68 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/VariantConversion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.avro.Conversion;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;

public class VariantConversion extends Conversion<Variant> {
@Override
public Class<Variant> getConvertedType() {
return Variant.class;
}

@Override
public String getLogicalTypeName() {
return VariantLogicalType.NAME;
}

@Override
public Variant fromRecord(IndexedRecord record, Schema schema, LogicalType type) {
int metadataPos = schema.getField("metadata").pos();
int valuePos = schema.getField("value").pos();
VariantMetadata metadata = VariantMetadata.from((ByteBuffer) record.get(metadataPos));
VariantValue value = VariantValue.from(metadata, (ByteBuffer) record.get(valuePos));
return Variant.of(metadata, value);
}

@Override
public IndexedRecord toRecord(Variant variant, Schema schema, LogicalType type) {
int metadataPos = schema.getField("metadata").pos();
int valuePos = schema.getField("value").pos();
GenericRecord record = new GenericData.Record(schema);
ByteBuffer metadataBuffer =
ByteBuffer.allocate(variant.metadata().sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadataBuffer, 0);
record.put(metadataPos, metadataBuffer);
ByteBuffer valueBuffer =
ByteBuffer.allocate(variant.value().sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(valueBuffer, 0);
record.put(valuePos, valueBuffer);
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public ValueWriter<?> map(Schema map, ValueWriter<?> valueWriter) {
return ValueWriters.map(ValueWriters.strings(), valueWriter);
}

@Override
public ValueWriter<?> variant(
Schema variant, ValueWriter<?> metadataResult, ValueWriter<?> valueResult) {
return ValueWriters.variants();
}

@Override
public ValueWriter<?> primitive(Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public ValueReader<?> map(Type ignored, Schema map, ValueReader<?> valueReader)
return ValueReaders.map(ValueReaders.strings(), valueReader);
}

@Override
public ValueReader<?> variant(
Type partner, ValueReader<?> metadataReader, ValueReader<?> valueReader) {
return ValueReaders.variants(metadataReader, valueReader);
}

@Override
public ValueReader<?> primitive(Type partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
Expand Down
Loading
Loading