Skip to content

Commit

Permalink
Merge pull request apache#267 from Parquet/handler_only_handle_ignore…
Browse files Browse the repository at this point in the history
…d_fields

handler only handle ignored field, exception during will be thrown as Sk...
  • Loading branch information
aniket486 committed Jan 3, 2014
2 parents 7dac815 + 622a400 commit 79cc35d
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import parquet.hadoop.ParquetOutputFormat;
import parquet.hadoop.util.ContextUtil;
import parquet.thrift.ReadWriteErrorHandler;
import parquet.thrift.FieldIgnoredHandler;

/**
* Output format that turns Thrift bytes into Parquet format using the thrift TProtocol layer
Expand Down Expand Up @@ -63,7 +63,7 @@ public ParquetThriftBytesOutputFormat() {
* @param buffered whether we should buffer each record
* @param errorHandler handle record corruption and schema incompatible exception
*/
public ParquetThriftBytesOutputFormat(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, ReadWriteErrorHandler errorHandler) {
public ParquetThriftBytesOutputFormat(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) {
super(new ThriftBytesWriteSupport(protocolFactory, thriftClass, buffered, errorHandler));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import parquet.thrift.ParquetWriteProtocol;
import parquet.thrift.ProtocolPipe;
import parquet.thrift.ProtocolReadToWrite;
import parquet.thrift.ReadWriteErrorHandler;
import parquet.thrift.FieldIgnoredHandler;
import parquet.thrift.ThriftSchemaConverter;
import parquet.thrift.struct.ThriftType.StructType;

Expand Down Expand Up @@ -70,14 +70,14 @@ public static Class<TProtocolFactory> getTProtocolFactoryClass(Configuration con
private MessageType schema;
private StructType thriftStruct;
private ParquetWriteProtocol parquetWriteProtocol;
private final ReadWriteErrorHandler errorHandler;
private final FieldIgnoredHandler errorHandler;

public ThriftBytesWriteSupport() {
this.buffered = true;
this.errorHandler = null;
}

public ThriftBytesWriteSupport(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, ReadWriteErrorHandler errorHandler) {
public ThriftBytesWriteSupport(TProtocolFactory protocolFactory, Class<? extends TBase<?, ?>> thriftClass, boolean buffered, FieldIgnoredHandler errorHandler) {
super();
this.protocolFactory = protocolFactory;
this.thriftClass = thriftClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.thrift.TBase;
import org.apache.thrift.protocol.TProtocolFactory;

import parquet.thrift.ReadWriteErrorHandler;
import parquet.thrift.FieldIgnoredHandler;

/**
* To create a Parquet file from the Thrift binary of records
Expand Down Expand Up @@ -71,7 +71,7 @@ public ThriftToParquetFileWriter(
TaskAttemptContext taskAttemptContext,
TProtocolFactory protocolFactory,
Class<? extends TBase<?,?>> thriftClass,
ReadWriteErrorHandler errorHandler) throws IOException, InterruptedException {
FieldIgnoredHandler errorHandler) throws IOException, InterruptedException {
this(fileToCreate, taskAttemptContext, protocolFactory, thriftClass, true, errorHandler);
}

Expand All @@ -90,7 +90,7 @@ public ThriftToParquetFileWriter(
TProtocolFactory protocolFactory,
Class<? extends TBase<?,?>> thriftClass,
boolean buffered,
ReadWriteErrorHandler errorHandler) throws IOException, InterruptedException {
FieldIgnoredHandler errorHandler) throws IOException, InterruptedException {
this.taskAttemptContext = taskAttemptContext;
this.recordWriter = new ParquetThriftBytesOutputFormat(protocolFactory, thriftClass, buffered, errorHandler).getRecordWriter(taskAttemptContext, fileToCreate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
/**
* Class to read from one protocol in a buffer and then write to another one
* When there is an exception during reading, it's a skippable exception.
* When schema is not compatible, the {@link SkippableException} will be passed to {@link ReadWriteErrorHandler#handleSkippedCorruptedRecord(SkippableException)}.
* When schema is not compatible, the {@link SkippableException} will be thrown.
* <p/>
* When there are fields in the data that are not defined in the schema, the fields will be ignored and the handler will
* be notified through {@link ReadWriteErrorHandler#handleFieldIgnored(org.apache.thrift.protocol.TField)}
* and {@link parquet.thrift.ReadWriteErrorHandler#handleRecordHasFieldIgnored()}
* be notified through {@link FieldIgnoredHandler#handleFieldIgnored(org.apache.thrift.protocol.TField)}
* and {@link FieldIgnoredHandler#handleRecordHasFieldIgnored()}
*
* @author Julien Le Dem
*
Expand Down Expand Up @@ -105,14 +105,14 @@ public String toDebugString() {
}
};
//error handler is global
private final ReadWriteErrorHandler errorHandler;
private final FieldIgnoredHandler errorHandler;
private final StructType thriftType;

public BufferedProtocolReadToWrite(StructType thriftType) {
this(thriftType, null);
}

public BufferedProtocolReadToWrite(StructType thriftType, ReadWriteErrorHandler errorHandler) {
public BufferedProtocolReadToWrite(StructType thriftType, FieldIgnoredHandler errorHandler) {
super();
this.thriftType = thriftType;
this.errorHandler = errorHandler;
Expand All @@ -121,7 +121,7 @@ public BufferedProtocolReadToWrite(StructType thriftType, ReadWriteErrorHandler
/**
* Reads one record from in and writes it to out.
* Exceptions encountered during reading are treated as skippable exceptions,
* {@link ReadWriteErrorHandler} will be notified when registered.
* {@link FieldIgnoredHandler} will be notified when registered.
*
* @param in input protocol
* @param out output protocol
Expand All @@ -130,18 +130,13 @@ public BufferedProtocolReadToWrite(StructType thriftType, ReadWriteErrorHandler
@Override
public void readOne(TProtocol in, TProtocol out) throws TException {
List<Action> buffer = new LinkedList<Action>();
try {
boolean hasFieldsIgnored = readOneStruct(in, buffer, thriftType);
if (hasFieldsIgnored) {
notifyRecordHasFieldIgnored();
}
} catch (DecodingSchemaMismatchException e) {
notifySkippedSchemaMismatchedRecordWhenReading(e);
notifySkippedCorruptedRecord(new SkippableException(error("Error while reading", buffer), e));
return;
try{
boolean hasFieldsIgnored = readOneStruct(in, buffer, thriftType);
if (hasFieldsIgnored) {
notifyRecordHasFieldIgnored();
}
} catch (Exception e) {
notifySkippedCorruptedRecord(new SkippableException(error("Error while reading", buffer), e));
return;
throw new SkippableException(error("Error while reading", buffer), e);
}

try {
Expand All @@ -153,24 +148,12 @@ public void readOne(TProtocol in, TProtocol out) throws TException {
}
}

private void notifySkippedSchemaMismatchedRecordWhenReading(DecodingSchemaMismatchException e) {
if (errorHandler != null) {
errorHandler.handleSkipRecordDueToSchemaMismatch(e);
}
}

private void notifyRecordHasFieldIgnored() {
if (errorHandler != null) {
errorHandler.handleRecordHasFieldIgnored();
}
}

private void notifySkippedCorruptedRecord(SkippableException e) {
if (errorHandler != null) {
errorHandler.handleSkippedCorruptedRecord(e);
}
}

private void notifyIgnoredFieldsOfRecord(TField field) {
if (errorHandler != null) {
errorHandler.handleFieldIgnored(field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,18 @@
import org.apache.thrift.protocol.TField;

/**
* Implements this class to handle errors encountered during reading and writing
* Implements this class to handle when fields get ignored in {@link BufferedProtocolReadToWrite}
* @author Tianshuo Deng
*/
public interface ReadWriteErrorHandler {
/**
* handle when a record can not be read due to an exception,
* in this case the record will be skipped and this method will be called with the exception that caused reading failure
*
* @param e
*/
void handleSkippedCorruptedRecord(SkippableException e);
public abstract class FieldIgnoredHandler {

/**
* handle when a record that contains fields that are ignored, meaning that the schema provided does not cover all the columns in data,
* the record will still be written but with fields that are not defined in the schema ignored.
* For each record, this method will be called at most once.
*/
void handleRecordHasFieldIgnored();
public void handleRecordHasFieldIgnored() {
}

/**
* handle when a field gets ignored,
Expand All @@ -29,11 +23,6 @@ public interface ReadWriteErrorHandler {
*
* @param field
*/
void handleFieldIgnored(TField field);

/**
* handle when there is a record that has incompatible schema
* @param e
*/
void handleSkipRecordDueToSchemaMismatch(DecodingSchemaMismatchException e);
public void handleFieldIgnored(TField field) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ public void testMapSet() throws Exception {
writeReadCompare(a);
}

private void writeReadCompare(TBase<?,?> a)
throws TException, InstantiationException, IllegalAccessException {
ProtocolPipe[] pipes = {new ProtocolReadToWrite(), new BufferedProtocolReadToWrite(new ThriftSchemaConverter().toStructType((Class<TBase<?,?>>)a.getClass()))};
private void writeReadCompare(TBase<?, ?> a)
throws TException, InstantiationException, IllegalAccessException {
ProtocolPipe[] pipes = {new ProtocolReadToWrite(), new BufferedProtocolReadToWrite(new ThriftSchemaConverter().toStructType((Class<TBase<?, ?>>)a.getClass()))};
for (ProtocolPipe p : pipes) {
final ByteArrayOutputStream in = new ByteArrayOutputStream();
final ByteArrayOutputStream out = new ByteArrayOutputStream();
a.write(protocol(in));
p.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));
TBase<?,?> b = a.getClass().newInstance();
TBase<?, ?> b = a.getClass().newInstance();
b.read(protocol(new ByteArrayInputStream(out.toByteArray())));

assertEquals(p.getClass().getSimpleName(), a, b);
Expand All @@ -104,14 +104,7 @@ private void writeReadCompare(TBase<?,?> a)
@Test
public void testIncompatibleSchemaRecord() throws Exception {
//handler will rethrow the exception for verifying purpose
CountingErrorHandler countingHandler = new CountingErrorHandler() {
@Override
public void handleSkipRecordDueToSchemaMismatch(DecodingSchemaMismatchException e) {
super.handleSkipRecordDueToSchemaMismatch(e);
assertTrue(e.getMessage().contains("the data type does not match the expected thrift structure"));
assertTrue(e.getMessage().contains("got BOOL"));
}
};
CountingErrorHandler countingHandler = new CountingErrorHandler();

BufferedProtocolReadToWrite p = new BufferedProtocolReadToWrite(new ThriftSchemaConverter().toStructType(AddressBook.class), countingHandler);

Expand All @@ -121,43 +114,38 @@ public void handleSkipRecordDueToSchemaMismatch(DecodingSchemaMismatchException
true, false, (byte)8, (short)16, (int)32, (long)64, (double)1234, "string", "å", false,
ByteBuffer.wrap("a".getBytes()), new ArrayList<Byte>(), new ArrayList<Short>(), new ArrayList<Long>());
a.write(protocol(in));

try {
p.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));
assertEquals(1, countingHandler.corruptedCount);
assertEquals(1, countingHandler.schemaMismatchCount);
} catch (SkippableException e) {
Throwable cause = e.getCause();
assertTrue(cause instanceof DecodingSchemaMismatchException);
assertTrue(cause.getMessage().contains("the data type does not match the expected thrift structure"));
assertTrue(cause.getMessage().contains("got BOOL"));
}
assertEquals(0, countingHandler.recordCountOfMissingFields);
assertEquals(0, countingHandler.fieldIgnoredCount);
}

/**
* When enum value in data has an undefined index, it's considered as corrupted record and will be skipped.
* Handler will be notified
*
* @throws Exception
*/
@Test
public void testEnumMissingSchema() throws Exception {
CountingErrorHandler countingHandler = new CountingErrorHandler() {
@Override
public void handleSkippedCorruptedRecord(SkippableException e) {
super.handleSkippedCorruptedRecord(e);
}

@Override
public void handleSkipRecordDueToSchemaMismatch(DecodingSchemaMismatchException e) {
super.handleSkipRecordDueToSchemaMismatch(e); //To change body of overridden methods use File | Settings | File Templates.
assertTrue(e.getMessage().contains("can not find index 4 in enum"));
}
};
CountingErrorHandler countingHandler = new CountingErrorHandler();
BufferedProtocolReadToWrite p = new BufferedProtocolReadToWrite(new ThriftSchemaConverter().toStructType(StructWithEnum.class), countingHandler);
final ByteArrayOutputStream in = new ByteArrayOutputStream();
final ByteArrayOutputStream out = new ByteArrayOutputStream();
StructWithMoreEnum moreEnum = new StructWithMoreEnum(NumberEnumWithMoreValue.FOUR);
moreEnum.write(protocol(in));

p.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));
assertEquals(1, countingHandler.corruptedCount);
assertEquals(1, countingHandler.schemaMismatchCount);
try {
p.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));
} catch (SkippableException e) {
Throwable cause = e.getCause();
assertTrue(cause instanceof DecodingSchemaMismatchException);
assertTrue(cause.getMessage().contains("can not find index 4 in enum"));
}
assertEquals(0, countingHandler.recordCountOfMissingFields);
assertEquals(0, countingHandler.fieldIgnoredCount);
}
Expand Down Expand Up @@ -193,10 +181,8 @@ public void handleFieldIgnored(TField field) {
structForRead.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));

//record will be read without extra field
assertEquals(0, countingHandler.corruptedCount);
assertEquals(1, countingHandler.recordCountOfMissingFields);
assertEquals(1, countingHandler.fieldIgnoredCount);
assertEquals(0, countingHandler.schemaMismatchCount);

StructV4WithExtracStructField b = StructV4WithExtracStructField.class.newInstance();
b.read(protocol(new ByteArrayInputStream(out.toByteArray())));
Expand Down Expand Up @@ -227,10 +213,8 @@ public void handleFieldIgnored(TField field) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
structForRead.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));

assertEquals(0, countingHandler.corruptedCount);
assertEquals(1, countingHandler.recordCountOfMissingFields);
assertEquals(1, countingHandler.fieldIgnoredCount);
assertEquals(0, countingHandler.schemaMismatchCount);
}

private TCompactProtocol protocol(OutputStream to) {
Expand All @@ -241,16 +225,9 @@ private TCompactProtocol protocol(InputStream from) {
return new TCompactProtocol(new TIOStreamTransport(from));
}

class CountingErrorHandler implements ReadWriteErrorHandler {
int corruptedCount = 0;
class CountingErrorHandler extends FieldIgnoredHandler {
int fieldIgnoredCount = 0;
int recordCountOfMissingFields = 0;
int schemaMismatchCount = 0;

@Override
public void handleSkippedCorruptedRecord(SkippableException e) {
corruptedCount++;
}

@Override
public void handleRecordHasFieldIgnored() {
Expand All @@ -261,10 +238,5 @@ public void handleRecordHasFieldIgnored() {
public void handleFieldIgnored(TField field) {
fieldIgnoredCount++;
}

@Override
public void handleSkipRecordDueToSchemaMismatch(DecodingSchemaMismatchException e) {
schemaMismatchCount++;
}
}
}

0 comments on commit 79cc35d

Please sign in to comment.