Skip to content

Commit

Permalink
Merge pull request apache#399 from egonina/stats
Browse files Browse the repository at this point in the history
Fixed resetting stats after writePage bug, unit testing of readFooter
  • Loading branch information
tsdeng committed May 22, 2014
2 parents b70509d + 54f9b10 commit fd8d18f
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 147 deletions.
6 changes: 0 additions & 6 deletions parquet-column/src/main/java/parquet/column/ColumnWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,5 @@ public interface ColumnWriter {
* @return the number of bytes of memory used to buffer the current data
*/
long getBufferedSizeInMemory();

/**
* get current statistics for the column
* @return the Statistics for the column
*/
Statistics getColumnStatistics();
}

Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,25 @@ public ColumnWriterImpl(
this.pageSizeThreshold = pageSizeThreshold;
// initial check of memory usage. So that we have enough data to make an initial prediction
this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
resetStatistics();

ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), initialSizePerCol);
this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), initialSizePerCol);
this.dataColumn = parquetProps.getValuesWriter(path, initialSizePerCol);
}

private void initStatistics() {
this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
}

private void log(Object value, int r, int d) {
LOG.debug(path + " " + value + " r:" + r + " d:" + d);
}

private void resetStatistics() {
this.statistics = Statistics.getStatsBasedOnType(this.path.getType());
}

/**
* Counts how many values have been written and checks the memory usage to flush the page when we reach the page threshold.
Expand Down Expand Up @@ -151,11 +158,7 @@ private void writePage() {
definitionLevelColumn.reset();
dataColumn.reset();
valueCount = 0;
}

@Override
public Statistics getColumnStatistics() {
return statistics;
resetStatistics();
}

@Override
Expand Down
125 changes: 0 additions & 125 deletions parquet-column/src/test/java/parquet/io/TestColumnIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package parquet.io;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static parquet.example.Paper.pr1;
import static parquet.example.Paper.pr2;
Expand Down Expand Up @@ -51,12 +50,6 @@
import parquet.column.impl.ColumnWriteStoreImpl;
import parquet.column.page.PageReadStore;
import parquet.column.page.mem.MemPageStore;
import parquet.column.statistics.BinaryStatistics;
import parquet.column.statistics.BooleanStatistics;
import parquet.column.statistics.DoubleStatistics;
import parquet.column.statistics.FloatStatistics;
import parquet.column.statistics.IntStatistics;
import parquet.column.statistics.LongStatistics;
import parquet.example.data.Group;
import parquet.example.data.GroupFactory;
import parquet.example.data.GroupWriter;
Expand Down Expand Up @@ -375,69 +368,6 @@ public void testOneOfEach() {
testSchema(oneOfEachSchema, Arrays.asList(g1));
}

@Test
public void testStatisticsWriteToColumn() {
MessageType oneOfEachSchema = MessageTypeParser.parseMessageType(oneOfEach);
GroupFactory gf = new SimpleGroupFactory(oneOfEachSchema);
long currentTime = System.currentTimeMillis() * 1000;
Group g1 = gf.newGroup()
.append("a", 1l)
.append("b", 2)
.append("c", 3.0f)
.append("d", 4.0d)
.append("e", true)
.append("f", Binary.fromString("6"))
.append("g", new NanoTime(12, currentTime));
Group g2 = gf.newGroup()
.append("a", 6l)
.append("b", 9)
.append("c", 10.0f)
.append("d", 40.0d)
.append("e", false)
.append("f", Binary.fromString("hello"))
.append("g", new NanoTime(123, currentTime));
Group g3 = gf.newGroup()
.append("a", -4l)
.append("b", 100)
.append("c", 23.4f)
.append("d", -940.0d)
.append("e", false)
.append("f", Binary.fromString("world"))
.append("g", new NanoTime(1234, currentTime));
Group g4 = gf.newGroup()
.append("a", 90l)
.append("b", 3)
.append("c", 234.0f)
.append("d", 4.0d)
.append("e", true)
.append("f", Binary.fromString(""))
.append("g", new NanoTime(1, currentTime));

List<Group> groupsList = new ArrayList<Group>();
groupsList.add(g1);
groupsList.add(g2);
groupsList.add(g3);
groupsList.add(g4);

IntStatistics intStats = new IntStatistics();
intStats.setMinMax(2, 100);
LongStatistics longStats = new LongStatistics();
longStats.setMinMax(-4l, 90l);
FloatStatistics floatStats = new FloatStatistics();
floatStats.setMinMax(3.0f, 234.0f);
DoubleStatistics doubleStats = new DoubleStatistics();
doubleStats.setMinMax(-940.0d, 40.0d);
BinaryStatistics binaryStats = new BinaryStatistics();
binaryStats.setMinMax(Binary.fromString(""), Binary.fromString("world"));
BooleanStatistics boolStats = new BooleanStatistics();
boolStats.setMinMax(false, true);
BinaryStatistics int96Stats = new BinaryStatistics();
int96Stats.setMinMax(new NanoTime(1, currentTime).toBinary(), new NanoTime(1234, currentTime).toBinary());

testStatisticsWriteToColumn(oneOfEachSchema, groupsList, intStats, longStats,
floatStats, doubleStats, binaryStats, boolStats, int96Stats);
}

@Test
public void testRequiredOfRequired() {
MessageType reqreqSchema = MessageTypeParser.parseMessageType(
Expand Down Expand Up @@ -545,56 +475,6 @@ private void testSchema(MessageType messageSchema, List<Group> groups) {
}
}

private void testStatisticsWriteToColumn(MessageType messageSchema, List<Group> groups,
IntStatistics intStats, LongStatistics longStats,
FloatStatistics floatStats, DoubleStatistics doubleStats,
BinaryStatistics binaryStats, BooleanStatistics boolStats,
BinaryStatistics int96Stats){
MemPageStore memPageStore = new MemPageStore(groups.size());
ColumnWriteStoreImpl columns = newColumnWriteStore(memPageStore);

ColumnIOFactory columnIOFactory = new ColumnIOFactory(true);
MessageColumnIO columnIO = columnIOFactory.getColumnIO(messageSchema);
log(columnIO);

// Write groups.
GroupWriter groupWriter =
new GroupWriter(columnIO.getRecordWriter(columns), messageSchema);
for (Group group : groups) {
groupWriter.write(group);
}
columns.flush();

// Verify statistics for each type of colunm
Set<ColumnDescriptor> columnPaths = columns.getColumnDescriptors();
for (ColumnDescriptor c : columnPaths) {
ColumnWriter cw = columns.getColumnWriter(c);
switch(c.getType()) {
case INT32:
assertTrue(((IntStatistics)cw.getColumnStatistics()).equals(intStats));
break;
case INT64:
assertTrue(((LongStatistics)cw.getColumnStatistics()).equals(longStats));
break;
case FLOAT:
assertTrue(((FloatStatistics)cw.getColumnStatistics()).equals(floatStats));
break;
case DOUBLE:
assertTrue(((DoubleStatistics)cw.getColumnStatistics()).equals(doubleStats));
break;
case BINARY:
assertTrue(((BinaryStatistics)cw.getColumnStatistics()).equals(binaryStats));
break;
case BOOLEAN:
assertTrue(((BooleanStatistics)cw.getColumnStatistics()).equals(boolStats));
break;
case INT96:
assertTrue(((BinaryStatistics)cw.getColumnStatistics()).equals(int96Stats));
break;
}
}
}

private RecordReaderImplementation<Group> getRecordReader(MessageColumnIO columnIO, MessageType schema, PageReadStore pageReadStore) {
RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);

Expand Down Expand Up @@ -748,11 +628,6 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
validate(value, repetitionLevel, definitionLevel);
}

@Override
public Statistics getColumnStatistics() {
throw new UnsupportedOperationException();
}

@Override
public void flush() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public Encoding getEncoding(parquet.column.Encoding encoding) {
return Encoding.valueOf(encoding.name());
}

public Statistics toParquetStatistics(parquet.column.statistics.Statistics statistics) {
public static Statistics toParquetStatistics(parquet.column.statistics.Statistics statistics) {
Statistics stats = new Statistics();
if (!statistics.isEmpty()) {
stats.setMax(statistics.getMaxBytes());
Expand All @@ -239,7 +239,7 @@ public Statistics toParquetStatistics(parquet.column.statistics.Statistics stati
return stats;
}

public parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) {
public static parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) {
// create stats object based on the column type
parquet.column.statistics.Statistics stats = parquet.column.statistics.Statistics.getStatsBasedOnType(type);
// If there was no statistics written to the footer, create an empty Statistics object and return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
import parquet.schema.PrimitiveType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.format.Statistics;
import parquet.format.converter.ParquetMetadataConverter;

public class TestParquetFileWriter {
private static final Log LOG = Log.getLog(TestParquetFileWriter.class);
Expand Down Expand Up @@ -153,6 +156,22 @@ public void testWriteRead() throws Exception {
PrintFooter.main(new String[] {path.toString()});
}

@Test
public void testConvertToThriftStatistics() throws Exception {
long[] longArray = new long[] {39L, 99L, 12L, 1000L, 65L, 542L, 2533461316L, -253346131996L, Long.MAX_VALUE, Long.MIN_VALUE};
LongStatistics parquetMRstats = new LongStatistics();

for (long l: longArray) {
parquetMRstats.updateStats(l);
}
Statistics thriftStats = parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(parquetMRstats);
LongStatistics convertedBackStats = (LongStatistics)parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(thriftStats, PrimitiveTypeName.INT64);

assertEquals(parquetMRstats.getMax(), convertedBackStats.getMax());
assertEquals(parquetMRstats.getMin(), convertedBackStats.getMin());
assertEquals(parquetMRstats.getNumNulls(), convertedBackStats.getNumNulls());
}

@Test
public void testWriteReadStatistics() throws Exception {

Expand Down
Loading

0 comments on commit fd8d18f

Please sign in to comment.