Skip to content

Commit

Permalink
apacheGH-43790: [Go][Parquet] Add support for LZ4_RAW compression cod…
Browse files Browse the repository at this point in the history
…ec (apache#43835)

### Rationale for this change

Fixes: apache#43790

The LZ4 compression codec for Parquet is no longer ambiguous, as it has been superceded by the [LZ4_RAW](https://github.com/apache/parquet-format/blob/master/Compression.md#lz4_raw) spec.

### What changes are included in this PR?

- Add `LZ4Raw` compression codec
- Split out `StreamingCodec` methods from core `Codec` interface
- Various conformance/roundtrip tests
- Set of benchmarks for reading/writing an Arrow table to/from Parquet, using each compression codec

### Are these changes tested?

Yes

### Are there any user-facing changes?

- New codec `LZ4Raw` is available
- `Codec` interface no long provides the following methods, which are now part of `StreamingCodec`:
  - `NewReader`
  - `NewWriter`
  - `NewWriterLevel`

* GitHub Issue: apache#43790

Authored-by: Joel Lubinitsky <[email protected]>
Signed-off-by: Joel Lubinitsky <[email protected]>
  • Loading branch information
joellubi authored Aug 27, 2024
1 parent b836662 commit 6502f0e
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 12 deletions.
22 changes: 14 additions & 8 deletions go/parquet/compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ var Codecs = struct {
Brotli Compression
// LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4
// see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAAri41v24xuA8MGHLDvgSnE+7AAgOhiEukemW_oPNHMvfMmrWw@mail.gmail.com%3E
Lz4 Compression
Zstd Compression
Lz4 Compression
Zstd Compression
Lz4Raw Compression
}{
Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
Snappy: Compression(parquet.CompressionCodec_SNAPPY),
Expand All @@ -59,17 +60,12 @@ var Codecs = struct {
Brotli: Compression(parquet.CompressionCodec_BROTLI),
Lz4: Compression(parquet.CompressionCodec_LZ4),
Zstd: Compression(parquet.CompressionCodec_ZSTD),
Lz4Raw: Compression(parquet.CompressionCodec_LZ4_RAW),
}

// Codec is an interface which is implemented for each compression type in order to make the interactions easy to
// implement. Most consumers won't be calling GetCodec directly.
type Codec interface {
// NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data
NewReader(io.Reader) io.ReadCloser
// NewWriter provides a wrapper around a write stream to compress data before writing it.
NewWriter(io.Writer) io.WriteCloser
// NewWriterLevel is like NewWriter but allows specifying the compression level
NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
// Encode encodes a block of data given by src and returns the compressed block. dst should be either nil
// or sized large enough to fit the compressed block (use CompressBound to allocate). dst and src should not
// overlap since some of the compression types don't allow it.
Expand All @@ -90,6 +86,16 @@ type Codec interface {
Decode(dst, src []byte) []byte
}

// StreamingCodec is an interface that may be implemented for compression codecs that expose a streaming API.
type StreamingCodec interface {
// NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data
NewReader(io.Reader) io.ReadCloser
// NewWriter provides a wrapper around a write stream to compress data before writing it.
NewWriter(io.Writer) io.WriteCloser
// NewWriterLevel is like NewWriter but allows specifying the compression level
NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
}

var codecs = map[Compression]Codec{}

// RegisterCodec adds or overrides a codec implementation for a given compression algorithm.
Expand Down
8 changes: 5 additions & 3 deletions go/parquet/compress/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestCompressDataOneShot(t *testing.T) {
{compress.Codecs.Gzip},
{compress.Codecs.Brotli},
{compress.Codecs.Zstd},
{compress.Codecs.Lz4Raw},
// {compress.Codecs.Lzo},
// {compress.Codecs.Lz4},
}

for _, tt := range tests {
Expand Down Expand Up @@ -107,9 +107,11 @@ func TestCompressReaderWriter(t *testing.T) {
var buf bytes.Buffer
codec, err := compress.GetCodec(tt.c)
assert.NoError(t, err)
streamingCodec, ok := codec.(compress.StreamingCodec)
assert.True(t, ok)
data := makeRandomData(RandomDataSize)

wr := codec.NewWriter(&buf)
wr := streamingCodec.NewWriter(&buf)

const chunkSize = 1111
input := data
Expand All @@ -129,7 +131,7 @@ func TestCompressReaderWriter(t *testing.T) {
}
wr.Close()

rdr := codec.NewReader(&buf)
rdr := streamingCodec.NewReader(&buf)
out, err := io.ReadAll(rdr)
assert.NoError(t, err)
assert.Exactly(t, data, out)
Expand Down
66 changes: 66 additions & 0 deletions go/parquet/compress/lz4_raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package compress

import (
"sync"

"github.com/pierrec/lz4/v4"
)

// lz4.Compressor is not goroutine-safe, so we use a pool to amortize the cost
// of allocating a new one for each call to Encode().
var compressorPool = sync.Pool{New: func() interface{} { return new(lz4.Compressor) }}

func compressBlock(src, dst []byte) (int, error) {
c := compressorPool.Get().(*lz4.Compressor)
defer compressorPool.Put(c)
return c.CompressBlock(src, dst)
}

type lz4RawCodec struct{}

func (c lz4RawCodec) Encode(dst, src []byte) []byte {
n, err := compressBlock(src, dst[:cap(dst)])
if err != nil {
panic(err)
}

return dst[:n]
}

func (c lz4RawCodec) EncodeLevel(dst, src []byte, _ int) []byte {
// the lz4 block implementation does not allow level to be set
return c.Encode(dst, src)
}

func (lz4RawCodec) Decode(dst, src []byte) []byte {
n, err := lz4.UncompressBlock(src, dst)
if err != nil {
panic(err)
}

return dst[:n]
}

func (c lz4RawCodec) CompressBound(len int64) int64 {
return int64(lz4.CompressBlockBound(int(len)))
}

func init() {
RegisterCodec(Codecs.Lz4Raw, lz4RawCodec{})
}
127 changes: 127 additions & 0 deletions go/parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,3 +644,130 @@ func TestDeltaBinaryPackedMultipleBatches(t *testing.T) {

require.Equalf(t, size, totalRows, "Expected %d rows, but got %d rows", size, totalRows)
}

// Test read file lz4_raw_compressed.parquet
// Contents documented at https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
func TestLZ4RawFileRead(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_DATA")
if dir == "" {
t.Skip("no path supplied with PARQUET_TEST_DATA")
}
require.DirExists(t, dir)

props := parquet.NewReaderProperties(memory.DefaultAllocator)
fileReader, err := file.OpenParquetFile(path.Join(dir, "lz4_raw_compressed.parquet"),
false, file.WithReadProps(props))
require.NoError(t, err)
defer fileReader.Close()

nRows := 4
nCols := 3
require.Equal(t, 1, fileReader.NumRowGroups())
rgr := fileReader.RowGroup(0)
require.EqualValues(t, nRows, rgr.NumRows())
require.EqualValues(t, nCols, rgr.NumColumns())

rdr, err := rgr.Column(0)
require.NoError(t, err)

rowsInt64, ok := rdr.(*file.Int64ColumnChunkReader)
require.True(t, ok)

valsInt64 := make([]int64, nRows)
total, read, err := rowsInt64.ReadBatch(int64(nRows), valsInt64, nil, nil)
require.NoError(t, err)
require.Equal(t, int64(nRows), total)
require.Equal(t, nRows, read)

expectedValsInt64 := []int64{
1593604800,
1593604800,
1593604801,
1593604801,
}
require.Equal(t, expectedValsInt64, valsInt64)

rdr, err = rgr.Column(1)
require.NoError(t, err)

rowsByteArray, ok := rdr.(*file.ByteArrayColumnChunkReader)
require.True(t, ok)

valsByteArray := make([]parquet.ByteArray, nRows)
total, read, err = rowsByteArray.ReadBatch(int64(nRows), valsByteArray, nil, nil)
require.NoError(t, err)
require.Equal(t, int64(nRows), total)
require.Equal(t, nRows, read)

expectedValsByteArray := []parquet.ByteArray{
[]byte("abc"),
[]byte("def"),
[]byte("abc"),
[]byte("def"),
}
require.Equal(t, expectedValsByteArray, valsByteArray)

rdr, err = rgr.Column(2)
require.NoError(t, err)

rowsFloat64, ok := rdr.(*file.Float64ColumnChunkReader)
require.True(t, ok)

valsFloat64 := make([]float64, nRows)
total, read, err = rowsFloat64.ReadBatch(int64(nRows), valsFloat64, nil, nil)
require.NoError(t, err)
require.Equal(t, int64(nRows), total)
require.Equal(t, nRows, read)

expectedValsFloat64 := []float64{
42.0,
7.7,
42.125,
7.7,
}
require.Equal(t, expectedValsFloat64, valsFloat64)
}

// Test read file lz4_raw_compressed_larger.parquet
// Contents documented at https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
func TestLZ4RawLargerFileRead(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_DATA")
if dir == "" {
t.Skip("no path supplied with PARQUET_TEST_DATA")
}
require.DirExists(t, dir)

props := parquet.NewReaderProperties(memory.DefaultAllocator)
fileReader, err := file.OpenParquetFile(path.Join(dir, "lz4_raw_compressed_larger.parquet"),
false, file.WithReadProps(props))
require.NoError(t, err)
defer fileReader.Close()

nRows := 10000
nCols := 1
require.Equal(t, 1, fileReader.NumRowGroups())
rgr := fileReader.RowGroup(0)
require.EqualValues(t, nRows, rgr.NumRows())
require.EqualValues(t, nCols, rgr.NumColumns())

rdr, err := rgr.Column(0)
require.NoError(t, err)

rows, ok := rdr.(*file.ByteArrayColumnChunkReader)
require.True(t, ok)

vals := make([]parquet.ByteArray, nRows)
total, read, err := rows.ReadBatch(int64(nRows), vals, nil, nil)
require.NoError(t, err)
require.Equal(t, int64(nRows), total)
require.Equal(t, nRows, read)

expectedValsHead := []parquet.ByteArray{
[]byte("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b"),
[]byte("e8fb9197-cb9f-4118-b67f-fbfa65f61843"),
[]byte("885136e1-0aa1-4fdb-8847-63d87b07c205"),
[]byte("ce7b2019-8ebe-4906-a74d-0afa2409e5df"),
[]byte("a9ee2527-821b-4b71-a926-03f73c3fc8b7"),
}
require.Equal(t, expectedValsHead, vals[:len(expectedValsHead)])
}
58 changes: 57 additions & 1 deletion go/parquet/file/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (t *SerializeTestSuite) TestSmallFile() {
compress.Codecs.Brotli,
compress.Codecs.Gzip,
compress.Codecs.Zstd,
// compress.Codecs.Lz4,
compress.Codecs.Lz4Raw,
// compress.Codecs.Lzo,
}
for _, c := range codecs {
Expand Down Expand Up @@ -540,3 +540,59 @@ func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T) {

require.NoError(t, rdr.Close())
}

func TestLZ4RawFileRoundtrip(t *testing.T) {
input := []int64{
-1, 0, 1, 2, 3, 4, 5, 123456789, -123456789,
}

size := len(input)

field, err := schema.NewPrimitiveNodeLogical("int64", parquet.Repetitions.Required, nil, parquet.Types.Int64, 0, 1)
require.NoError(t, err)

schema, err := schema.NewGroupNode("test", parquet.Repetitions.Required, schema.FieldList{field}, 0)
require.NoError(t, err)

sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
writer := file.NewParquetWriter(sink, schema, file.WithWriterProps(parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Lz4Raw))))

rgw := writer.AppendRowGroup()
cw, err := rgw.NextColumn()
require.NoError(t, err)

i64ColumnWriter, ok := cw.(*file.Int64ColumnChunkWriter)
require.True(t, ok)

nVals, err := i64ColumnWriter.WriteBatch(input, nil, nil)
require.NoError(t, err)
require.EqualValues(t, size, nVals)

require.NoError(t, cw.Close())
require.NoError(t, rgw.Close())
require.NoError(t, writer.Close())

rdr, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
require.NoError(t, err)

require.Equal(t, 1, rdr.NumRowGroups())
require.EqualValues(t, size, rdr.NumRows())

rgr := rdr.RowGroup(0)
cr, err := rgr.Column(0)
require.NoError(t, err)

i64ColumnReader, ok := cr.(*file.Int64ColumnChunkReader)
require.True(t, ok)

output := make([]int64, size)

total, valuesRead, err := i64ColumnReader.ReadBatch(int64(size), output, nil, nil)
require.NoError(t, err)
require.EqualValues(t, size, total)
require.EqualValues(t, size, valuesRead)

require.Equal(t, input, output)

require.NoError(t, rdr.Close())
}
Loading

0 comments on commit 6502f0e

Please sign in to comment.