Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

public class GenericDatumReaderExt<T> extends GenericDatumReader<T> {

private final Schema writer;
private final Schema reader;
private Schema writer;
private Schema reader;

public GenericDatumReaderExt(Schema writer, Schema reader) {
super(writer, reader);
this.writer = writer;
this.reader = reader;
}

/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public T read(T reuse, Decoder in) throws IOException {
Expand All @@ -35,6 +38,27 @@ public T read(T reuse, Decoder in) throws IOException {
return result;
}

/**
* {@inheritDoc}
*/
@Override
public void setSchema(Schema writer) {
super.setSchema(writer);
this.writer = writer;
if (this.reader == null) {
this.reader = writer;
}
}

/**
* {@inheritDoc}
*/
@Override
public void setExpected(Schema reader) throws IOException {
super.setExpected(reader);
this.reader = reader;
}

private Object read(Object old, Schema expected,
CachedResolvingDecoder in) throws IOException {
switch (expected.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,39 @@

public class SpecificDatumReaderExt<T> extends SpecificDatumReader<T> {

private final Schema writer;
private final Schema reader;
private Schema writer;
private Schema reader;

public SpecificDatumReaderExt(Schema writer, Schema reader) {
super(writer, reader);
this.writer = writer;
this.reader = reader;
}

/**
* {@inheritDoc}
*/
@Override
public void setSchema(Schema writer) {
super.setSchema(writer);
this.writer = writer;
if (this.reader == null) {
this.reader = writer;
}
}

/**
* {@inheritDoc}
*/
@Override
public void setExpected(Schema reader) throws IOException {
super.setExpected(reader);
this.reader = reader;
}

/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public T read(T reuse, Decoder in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,11 @@ public Symbol generate(
break;
case NULL:
case BOOLEAN:
break;
case INT:
switch (writerType) {
case LONG:
return Symbol.IntLongAdjustAction.INSTANCE;
default:
break;
}
if (writerType == Schema.Type.LONG) {
return Symbol.IntLongAdjustAction.INSTANCE;
}
case STRING:
case FLOAT:
case BYTES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@

package com.linkedin.avroutil1.compatibility.avro15.backports;

import com.linkedin.avroutil1.compatibility.avro15.codec.CachedResolvingDecoder;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.Decoder;

import java.io.IOException;


/**
Expand All @@ -18,7 +23,132 @@
*/
public class GenericDatumReaderExt<T> extends GenericDatumReader<T> {

private Schema writer;
private Schema reader;

public GenericDatumReaderExt(Schema writer, Schema reader, GenericData genericData) {
super(writer, reader, genericData);
this.writer = writer;
this.reader = reader;
}

/**
* {@inheritDoc}
*/
@Override
public void setExpected(Schema reader) throws IOException {
super.setExpected(reader);
this.reader = reader;
}

/**
* {@inheritDoc}
*/
@Override
public void setSchema(Schema writer) {
super.setSchema(writer);
this.writer = writer;
if (reader == null) {
this.reader = writer;
}
}

/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public T read(T reuse, Decoder in) throws IOException {
Comment thread
karthikrg marked this conversation as resolved.
CachedResolvingDecoder resolver = new CachedResolvingDecoder(writer, reader, in);
resolver.init(in);
T result = (T) read(reuse, reader, resolver);
resolver.drain();
return result;
}

private Object read(Object old, Schema expected,
CachedResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD:
return readRecord(old, expected, in);
case ENUM:
return readEnum(expected, in);
case ARRAY:
return readArray(old, expected, in);
case MAP:
return readMap(old, expected, in);
case UNION:
return read(old, expected.getTypes().get(in.readIndex()), in);
case FIXED:
return readFixed(old, expected, in);
case STRING:
return readString(old, expected, in);
case BYTES:
return readBytes(old, in);
case INT:
return readInt(old, expected, in);
case LONG:
return in.readLong();
case FLOAT:
return in.readFloat();
case DOUBLE:
return in.readDouble();
case BOOLEAN:
return in.readBoolean();
case NULL:
in.readNull();
return null;
default:
throw new AvroRuntimeException("Unknown type: " + expected);
}
}

private Object readRecord(Object old, Schema expected,
CachedResolvingDecoder in) throws IOException {
Object record = newRecord(old, expected);
final GenericData data = getData();

for (Schema.Field f : in.readFieldOrder()) {
int pos = f.pos();
String name = f.name();
Object oldDatum = (old != null) ? data.getField(record, name, pos) : null;
data.setField(record, name, pos, read(oldDatum, f.schema(), in));
}

return record;
}

private Object readArray(Object old, Schema expected,
CachedResolvingDecoder in) throws IOException {
Schema expectedType = expected.getElementType();
long l = in.readArrayStart();
long base = 0;
if (l > 0) {
Object array = newArray(old, (int) l, expected);
do {
for (long i = 0; i < l; i++) {
addToArray(array, base + i, read(peekArray(array), expectedType, in));
}
base += l;
} while ((l = in.arrayNext()) > 0);
return array;
} else {
return newArray(old, 0, expected);
}
}

private Object readMap(Object old, Schema expected,
CachedResolvingDecoder in) throws IOException {
Schema eValue = expected.getValueType();
long l = in.readMapStart();
Object map = newMap(old, (int) l);
if (l > 0) {
do {
for (int i = 0; i < l; i++) {
addToMap(map, readString(null, in), read(null, eValue, in));
}
} while ((l = in.mapNext()) > 0);
}
return map;
}
}
Loading