Skip to content

Commit

Permalink
Merge pull request apache#333 from Parquet/compress_schemas_in_split
Browse files Browse the repository at this point in the history
Compress schemas in split
  • Loading branch information
aniket486 committed Mar 24, 2014
2 parents 0b5116a + 4246d18 commit 9fdafc0
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 10 deletions.
2 changes: 1 addition & 1 deletion parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.7</version>
<version>1.5</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion parquet-encoding/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.7</version>
<version>1.5</version>
<scope>compile</scope>
</dependency>
</dependencies>
Expand Down
46 changes: 46 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@
<artifactId>enforcer-rule</artifactId>
<version>0.9.17</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<!-- 1.6 is almost 4x bigger. 1.4 has different chunking behavior.
So be careful with this versioning.
Read http://commons.apache.org/proper/commons-codec/changes-report.html
Due to Hadoop et al also using god knows
what version of this lib, we mvn-shade it here.
-->
<version>1.5</version>
</dependency>
</dependencies>
<executions>
<execution>
Expand Down Expand Up @@ -115,6 +126,41 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration combine.self="override">
<minimizeJar>true</minimizeJar>
<createSourcesJar>false</createSourcesJar>
<artifactSet>
<includes>
<include>commons-codec:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>commons-codec:*</artifact>
<includes>
<include>**</include>
</includes>
<excludes>
<exclude>org/apache/commons/codec/language/*</exclude>
<exclude>org/apache/commons/codec/net/*</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.commons.codec</pattern>
<shadedPattern>parquet.org.apache.commons.codec</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand Down
72 changes: 64 additions & 8 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package parquet.hadoop;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -28,12 +30,16 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import parquet.Log;
import parquet.column.Encoding;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
Expand All @@ -49,6 +55,8 @@
*/
public class ParquetInputSplit extends FileSplit implements Writable {

private static final Log LOG = Log.getLog(ParquetInputSplit.class);

private List<BlockMetaData> blocks;
private String requestedSchema;
private String fileSchema;
Expand Down Expand Up @@ -141,8 +149,8 @@ public void readFields(DataInput in) throws IOException {
for (int i = 0; i < blocksSize; i++) {
blocks.add(readBlock(in));
}
this.requestedSchema = Text.readString(in);
this.fileSchema = Text.readString(in);
this.requestedSchema = decompressString(Text.readString(in));
this.fileSchema = decompressString(Text.readString(in));
this.extraMetadata = readKeyValues(in);
this.readSupportMetadata = readKeyValues(in);
}
Expand All @@ -157,12 +165,60 @@ public void write(DataOutput out) throws IOException {
for (BlockMetaData block : blocks) {
writeBlock(out, block);
}
Text.writeString(out, requestedSchema);
Text.writeString(out, fileSchema);
Text.writeString(out, compressString(requestedSchema));
Text.writeString(out, compressString(fileSchema));
writeKeyValues(out, extraMetadata);
writeKeyValues(out, readSupportMetadata);
}

String compressString(String str) {
ByteArrayOutputStream obj = new ByteArrayOutputStream();
GZIPOutputStream gzip;
try {
gzip = new GZIPOutputStream(obj);
gzip.write(str.getBytes("UTF-8"));
gzip.close();
} catch (IOException e) {
// Not really sure how we can get here. I guess the best thing to do is to croak.
LOG.error("Unable to gzip InputSplit string " + str, e);
throw new RuntimeException("Unable to gzip InputSplit string", e);
}
String compressedStr = Base64.encodeBase64String(obj.toByteArray());
return compressedStr;
}

String decompressString(String str) {
byte[] decoded = Base64.decodeBase64(str);
ByteArrayInputStream obj = new ByteArrayInputStream(decoded);
GZIPInputStream gzip = null;
String outStr = "";
try {
gzip = new GZIPInputStream(obj);
BufferedReader reader = new BufferedReader(new InputStreamReader(gzip, "UTF-8"));
char[] buffer = new char[1024];
int n = 0;
StringBuilder sb = new StringBuilder();
while (-1 != (n = reader.read(buffer))) {
sb.append(buffer, 0, n);
}
outStr = sb.toString();
} catch (IOException e) {
// Not really sure how we can get here. I guess the best thing to do is to croak.
LOG.error("Unable to uncompress InputSplit string " + str, e);
throw new RuntimeException("Unable to uncompress InputSplit String", e);
} finally {
if (null != gzip) {
try {
gzip.close();
} catch (IOException e) {
LOG.error("Unable to uncompress InputSplit string " + str, e);
throw new RuntimeException("Unable to uncompress InputSplit String", e);
}
}
}
return outStr;
}

private BlockMetaData readBlock(DataInput in) throws IOException {
final BlockMetaData block = new BlockMetaData();
int size = in.readInt();
Expand Down Expand Up @@ -251,7 +307,7 @@ private void writeKeyValues(DataOutput out, Map<String, String> map) throws IOEx
}
}
}


@Override
public String toString() {
Expand All @@ -260,7 +316,7 @@ public String toString() {
hosts = getLocations();
}catch(Exception ignore){} // IOException/InterruptedException could be thrown

return this.getClass().getSimpleName() + "{" +
return this.getClass().getSimpleName() + "{" +
"part: " + getPath()
+ " start: " + getStart()
+ " length: " + getLength()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet.hadoop;
import static org.junit.Assert.assertEquals;

import org.junit.Test;

public class TestParquetInputSplit {

@Test
public void testStringCompression() {
String[] strings = {"this is a string",
"this is a string with a \n newline",
"a:chararray, b:{t:(c:chararray, d:chararray)}",
"message pig_schema {\n" +
" optional binary a;\n" +
" optional group b {\n" +
" repeated group t {\n" +
" optional binary c;\n" +
" optional binary d;\n" +
" }\n" +
" }\n" +
"}\n"
};
ParquetInputSplit split = new ParquetInputSplit();
for (String s : strings) {
String cs = split.compressString(s);
String uncs = split.decompressString(cs);
assertEquals("strings should be same after decompressing", s, uncs);
}
}
}

0 comments on commit 9fdafc0

Please sign in to comment.