Skip to content

Commit

Permalink
[feat]: Packed reader multi files test (#161)
Browse files Browse the repository at this point in the history
related: #158

---------

Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang authored Jan 13, 2025
1 parent f51fd09 commit e19535c
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ on:
jobs:
test:
name: Test
runs-on: ubuntu-latest
runs-on: ubuntu-22.04

steps:
- name: Checkout code
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cpp-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- '!go/**'
jobs:
unittest:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3

Expand Down
16 changes: 9 additions & 7 deletions cpp/include/milvus-storage/common/macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ namespace milvus_storage {
#undef RETURN_NOT_OK
#define RETURN_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return status; \
auto _s = (status); \
if (!_s.ok()) { \
return _s; \
} \
} while (false)

#define RETURN_ARROW_NOT_OK(status) \
do { \
if (!(status).ok()) { \
return Status::ArrowError((status).ToString()); \
} \
#define RETURN_ARROW_NOT_OK(status) \
do { \
auto _s = (status); \
if (!_s.ok()) { \
return Status::ArrowError((_s).ToString()); \
} \
} while (false)

#define RETURN_ARROW_NOT_OK_WITH_PREFIX(msg, staus) \
Expand Down
6 changes: 6 additions & 0 deletions cpp/include/milvus-storage/packed/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader {
arrow::Status Close() override;

private:
void initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size);

Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set<int>& needed_columns, size_t num_fields);
// Advance buffer to fill the expected buffer size
arrow::Status advanceBuffer();
Expand Down
23 changes: 21 additions & 2 deletions cpp/include/milvus-storage/packed/reader_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,34 @@ typedef void* CPackedReader;
typedef void* CArrowArray;
typedef void* CArrowSchema;

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out);

/**
* @brief Open a packed reader to read needed columns in the specified path.
*
* @param path The root path of the packed files to read.
* @param schema The original schema of data.
* @param buffer_size The max buffer size of the packed reader.
* @param c_packed_reader The output pointer of the packed reader.
*/
int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
CPackedReader* c_packed_reader);

/**
* @brief Read the next record batch from the packed reader.
* By default, the maximum return batch is 1024 rows.
*
* @param c_packed_reader The packed reader to read.
* @param out_array The output pointer of the arrow array.
* @param out_schema The output pointer of the arrow schema.
*/
int ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema);

/**
* @brief Close the packed reader and release the resources.
*
* @param c_packed_reader The packed reader to close.
*/
int CloseReader(CPackedReader c_packed_reader);

#ifdef __cplusplus
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/packed/column_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include <arrow/table.h>
#include "common/status.h"

using namespace std;

namespace milvus_storage {

ColumnGroup::ColumnGroup(GroupId group_id, const std::vector<int>& origin_column_indices)
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/packed/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs,
row_limit_(0),
absolute_row_position_(0),
read_count_(0) {
initialize(fs, file_path_, schema_, needed_columns, buffer_size);
}

void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs,
const std::string& file_path,
const std::shared_ptr<arrow::Schema> schema,
const std::set<int>& needed_columns,
const int64_t buffer_size) {
auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields());
if (!status.ok()) {
throw std::runtime_error(status.ToString());
Expand Down
26 changes: 0 additions & 26 deletions cpp/src/packed/reader_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,6 @@
#include <arrow/status.h>
#include <memory>

int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) {
auto truePath = std::string(path);
auto factory = std::make_shared<milvus_storage::FileSystemFactory>();
auto conf = milvus_storage::StorageConfig();
conf.uri = "file:///tmp/";
auto r = factory->BuildFileSystem(conf, &truePath);
if (!r.ok()) {
LOG_STORAGE_ERROR_ << "Error building filesystem: " << path;
return -2;
}
auto trueFs = r.value();
auto trueSchema = arrow::ImportSchema(schema).ValueOrDie();
std::set<int> needed_columns;
for (int i = 0; i < trueSchema->num_fields(); i++) {
needed_columns.emplace(i);
}
auto reader =
std::make_shared<milvus_storage::PackedRecordBatchReader>(*trueFs, path, trueSchema, needed_columns, buffer_size);
auto status = ExportRecordBatchReader(reader, out);
if (!status.ok()) {
LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString();
return static_cast<int>(status.code());
}
return 0;
}

int NewPackedReader(const char* path,
struct ArrowSchema* schema,
const int64_t buffer_size,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/packed/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr<arrow:
// Flush column groups until there's enough room for the new column groups
// to ensure that memory usage stays strictly below the limit
while (current_memory_usage_ + next_batch_size >= memory_limit_ && !max_heap_.empty()) {
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_
LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_ / 1024 / 1024 << " MB, "
<< ", flushing column group: " << max_heap_.top().first;
auto max_group = max_heap_.top();
max_heap_.pop();
current_memory_usage_ -= max_group.second;

ColumnGroupWriter* writer = group_writers_[max_group.first].get();
max_heap_.pop();
RETURN_NOT_OK(writer->Flush());
}

Expand Down
4 changes: 0 additions & 4 deletions cpp/test/packed/packed_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ TEST_F(PackedIntegrationTest, TestOneFile) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand All @@ -47,8 +45,6 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) {
}
EXPECT_TRUE(writer.Close().ok());

std::vector<std::string> paths = {file_path_ + "/0", file_path_ + "/1"};

std::set<int> needed_columns = {0, 1, 2};

PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_);
Expand Down
2 changes: 1 addition & 1 deletion go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ test:
LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \
CGO_CFLAGS="$(CPPFLAGS)" \
CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \
go test -count=1 -timeout 30s ./...
go test -count=1 -timeout 30s ./... -gcflags "all=-N -l" -o gdb/

proto:
mkdir -p proto/manifest_proto
Expand Down
6 changes: 3 additions & 3 deletions go/packed/packed_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/cdata"
)

func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
func NewPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
Expand All @@ -49,7 +49,7 @@ func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*Packed
return &PackedReader{cPackedReader: cPackedReader, schema: schema}, nil
}

func (pr *PackedReader) readNext() (arrow.Record, error) {
func (pr *PackedReader) ReadNext() (arrow.Record, error) {
var cArr C.CArrowArray
var cSchema C.CArrowSchema
status := C.ReadNext(pr.cPackedReader, &cArr, &cSchema)
Expand All @@ -73,7 +73,7 @@ func (pr *PackedReader) readNext() (arrow.Record, error) {
return recordBatch, nil
}

func (pr *PackedReader) close() error {
func (pr *PackedReader) Close() error {
status := C.CloseReader(pr.cPackedReader)
if status != 0 {
return errors.New("PackedReader: failed to close file")
Expand Down
87 changes: 81 additions & 6 deletions go/packed/packed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/rand"
)

func TestPacked(t *testing.T) {
func TestPackedOneFile(t *testing.T) {
batches := 100
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int32},
Expand Down Expand Up @@ -53,19 +54,93 @@ func TestPacked(t *testing.T) {
defer rec.Release()
path := "/tmp"
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := newPackedWriter(path, schema, bufferSize)
pw, err := NewPackedWriter(path, schema, bufferSize)
assert.NoError(t, err)
for i := 0; i < batches; i++ {
err = pw.writeRecordBatch(rec)
err = pw.WriteRecordBatch(rec)
assert.NoError(t, err)
}
err = pw.close()
err = pw.Close()
assert.NoError(t, err)

reader, err := newPackedReader(path, schema, bufferSize)
reader, err := NewPackedReader(path, schema, bufferSize)
assert.NoError(t, err)
rr, err := reader.readNext()
rr, err := reader.ReadNext()
assert.NoError(t, err)
defer rr.Release()
assert.Equal(t, int64(3*batches), rr.NumRows())
}

func TestPackedMultiFiles(t *testing.T) {
batches := 1000
schema := arrow.NewSchema([]arrow.Field{
{Name: "a", Type: arrow.PrimitiveTypes.Int32},
{Name: "b", Type: arrow.PrimitiveTypes.Int64},
{Name: "c", Type: arrow.BinaryTypes.String},
}, nil)

b := array.NewRecordBuilder(memory.DefaultAllocator, schema)
strLen := 1000
arrLen := 30
defer b.Release()
for idx := range schema.Fields() {
switch idx {
case 0:
values := make([]int32, arrLen)
for i := 0; i < arrLen; i++ {
values[i] = int32(i + 1)
}
b.Field(idx).(*array.Int32Builder).AppendValues(values, nil)
case 1:
values := make([]int64, arrLen)
for i := 0; i < arrLen; i++ {
values[i] = int64(i + 1)
}
b.Field(idx).(*array.Int64Builder).AppendValues(values, nil)
case 2:
values := make([]string, arrLen)
for i := 0; i < arrLen; i++ {
values[i] = randomString(strLen)
}
b.Field(idx).(*array.StringBuilder).AppendValues(values, nil)
}
}
rec := b.NewRecord()
defer rec.Release()
path := "/tmp"
bufferSize := 10 * 1024 * 1024 // 10MB
pw, err := NewPackedWriter(path, schema, bufferSize)
assert.NoError(t, err)
for i := 0; i < batches; i++ {
err = pw.WriteRecordBatch(rec)
assert.NoError(t, err)
}
err = pw.Close()
assert.NoError(t, err)

reader, err := NewPackedReader(path, schema, bufferSize)
assert.NoError(t, err)
var rows int64 = 0
var rr arrow.Record
for {
rr, err = reader.ReadNext()
assert.NoError(t, err)
if rr == nil {
// end of file
break
}

rows += rr.NumRows()
}

assert.Equal(t, int64(arrLen*batches), rows)
}

func randomString(length int) string {
const charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
result := make([]byte, length)
for i := range result {
result[i] = charset[rand.Intn(len(charset))]
}
return string(result)
}
6 changes: 3 additions & 3 deletions go/packed/packed_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/cdata"
)

func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) {
func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) {
var cas cdata.CArrowSchema
cdata.ExportArrowSchema(schema, &cas)
cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas))
Expand All @@ -49,7 +49,7 @@ func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*Packed
return &PackedWriter{cPackedWriter: cPackedWriter}, nil
}

func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error {
func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error {
var caa cdata.CArrowArray
var cas cdata.CArrowSchema

Expand All @@ -66,7 +66,7 @@ func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error {
return nil
}

func (pw *PackedWriter) close() error {
func (pw *PackedWriter) Close() error {
status := C.CloseWriter(pw.cPackedWriter)
if status != 0 {
return errors.New("PackedWriter: failed to close file")
Expand Down

0 comments on commit e19535c

Please sign in to comment.