diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index fdd8d388..31543aec 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,7 +13,7 @@ on: jobs: test: name: Test - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - name: Checkout code diff --git a/.github/workflows/cpp-ci.yml b/.github/workflows/cpp-ci.yml index b2634c22..599e0f05 100644 --- a/.github/workflows/cpp-ci.yml +++ b/.github/workflows/cpp-ci.yml @@ -11,7 +11,7 @@ on: - '!go/**' jobs: unittest: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: - uses: actions/checkout@v3 diff --git a/cpp/include/milvus-storage/common/macro.h b/cpp/include/milvus-storage/common/macro.h index afb1488e..b2bad43c 100644 --- a/cpp/include/milvus-storage/common/macro.h +++ b/cpp/include/milvus-storage/common/macro.h @@ -25,16 +25,18 @@ namespace milvus_storage { #undef RETURN_NOT_OK #define RETURN_NOT_OK(status) \ do { \ - if (!(status).ok()) { \ - return status; \ + auto _s = (status); \ + if (!_s.ok()) { \ + return _s; \ } \ } while (false) -#define RETURN_ARROW_NOT_OK(status) \ - do { \ - if (!(status).ok()) { \ - return Status::ArrowError((status).ToString()); \ - } \ +#define RETURN_ARROW_NOT_OK(status) \ + do { \ + auto _s = (status); \ + if (!_s.ok()) { \ + return Status::ArrowError((_s).ToString()); \ + } \ } while (false) #define RETURN_ARROW_NOT_OK_WITH_PREFIX(msg, staus) \ diff --git a/cpp/include/milvus-storage/packed/reader.h b/cpp/include/milvus-storage/packed/reader.h index 087dafee..2bd60277 100644 --- a/cpp/include/milvus-storage/packed/reader.h +++ b/cpp/include/milvus-storage/packed/reader.h @@ -53,6 +53,12 @@ class PackedRecordBatchReader : public arrow::RecordBatchReader { arrow::Status Close() override; private: + void initialize(arrow::fs::FileSystem& fs, + const std::string& file_path, + const std::shared_ptr schema, + const std::set& needed_columns, + const int64_t buffer_size); + Status initializeColumnOffsets(arrow::fs::FileSystem& fs, const std::set& needed_columns, size_t num_fields); // Advance buffer to fill the expected buffer size arrow::Status advanceBuffer(); diff --git a/cpp/include/milvus-storage/packed/reader_c.h b/cpp/include/milvus-storage/packed/reader_c.h index b1f29f16..14c63958 100644 --- a/cpp/include/milvus-storage/packed/reader_c.h +++ b/cpp/include/milvus-storage/packed/reader_c.h @@ -24,15 +24,34 @@ typedef void* CPackedReader; typedef void* CArrowArray; typedef void* CArrowSchema; -int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out); - +/** + * @brief Open a packed reader to read needed columns in the specified path. + * + * @param path The root path of the packed files to read. + * @param schema The original schema of data. + * @param buffer_size The max buffer size of the packed reader. + * @param c_packed_reader The output pointer of the packed reader. + */ int NewPackedReader(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, CPackedReader* c_packed_reader); +/** + * @brief Read the next record batch from the packed reader. + * By default, the maximum return batch is 1024 rows. + * + * @param c_packed_reader The packed reader to read. + * @param out_array The output pointer of the arrow array. + * @param out_schema The output pointer of the arrow schema. + */ int ReadNext(CPackedReader c_packed_reader, CArrowArray* out_array, CArrowSchema* out_schema); +/** + * @brief Close the packed reader and release the resources. + * + * @param c_packed_reader The packed reader to close. + */ int CloseReader(CPackedReader c_packed_reader); #ifdef __cplusplus diff --git a/cpp/src/packed/column_group.cpp b/cpp/src/packed/column_group.cpp index e0887341..84c0ac50 100644 --- a/cpp/src/packed/column_group.cpp +++ b/cpp/src/packed/column_group.cpp @@ -17,8 +17,6 @@ #include #include "common/status.h" -using namespace std; - namespace milvus_storage { ColumnGroup::ColumnGroup(GroupId group_id, const std::vector& origin_column_indices) diff --git a/cpp/src/packed/reader.cpp b/cpp/src/packed/reader.cpp index 83c4334f..de1706ac 100644 --- a/cpp/src/packed/reader.cpp +++ b/cpp/src/packed/reader.cpp @@ -40,6 +40,14 @@ PackedRecordBatchReader::PackedRecordBatchReader(arrow::fs::FileSystem& fs, row_limit_(0), absolute_row_position_(0), read_count_(0) { + initialize(fs, file_path_, schema_, needed_columns, buffer_size); +} + +void PackedRecordBatchReader::initialize(arrow::fs::FileSystem& fs, + const std::string& file_path, + const std::shared_ptr schema, + const std::set& needed_columns, + const int64_t buffer_size) { auto status = initializeColumnOffsets(fs, needed_columns, schema->num_fields()); if (!status.ok()) { throw std::runtime_error(status.ToString()); diff --git a/cpp/src/packed/reader_c.cpp b/cpp/src/packed/reader_c.cpp index 9892e63b..0d5fa716 100644 --- a/cpp/src/packed/reader_c.cpp +++ b/cpp/src/packed/reader_c.cpp @@ -23,32 +23,6 @@ #include #include -int Open(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, struct ArrowArrayStream* out) { - auto truePath = std::string(path); - auto factory = std::make_shared(); - auto conf = milvus_storage::StorageConfig(); - conf.uri = "file:///tmp/"; - auto r = factory->BuildFileSystem(conf, &truePath); - if (!r.ok()) { - LOG_STORAGE_ERROR_ << "Error building filesystem: " << path; - return -2; - } - auto trueFs = r.value(); - auto trueSchema = arrow::ImportSchema(schema).ValueOrDie(); - std::set needed_columns; - for (int i = 0; i < trueSchema->num_fields(); i++) { - needed_columns.emplace(i); - } - auto reader = - std::make_shared(*trueFs, path, trueSchema, needed_columns, buffer_size); - auto status = ExportRecordBatchReader(reader, out); - if (!status.ok()) { - LOG_STORAGE_ERROR_ << "Error exporting record batch reader" << status.ToString(); - return static_cast(status.code()); - } - return 0; -} - int NewPackedReader(const char* path, struct ArrowSchema* schema, const int64_t buffer_size, diff --git a/cpp/src/packed/writer.cpp b/cpp/src/packed/writer.cpp index c6d00672..1522cda4 100644 --- a/cpp/src/packed/writer.cpp +++ b/cpp/src/packed/writer.cpp @@ -97,13 +97,13 @@ Status PackedRecordBatchWriter::writeWithSplitIndex(const std::shared_ptr= memory_limit_ && !max_heap_.empty()) { - LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_ + LOG_STORAGE_DEBUG_ << "Current memory usage: " << current_memory_usage_ / 1024 / 1024 << " MB, " << ", flushing column group: " << max_heap_.top().first; auto max_group = max_heap_.top(); + max_heap_.pop(); current_memory_usage_ -= max_group.second; ColumnGroupWriter* writer = group_writers_[max_group.first].get(); - max_heap_.pop(); RETURN_NOT_OK(writer->Flush()); } diff --git a/cpp/test/packed/packed_integration_test.cpp b/cpp/test/packed/packed_integration_test.cpp index 2188fa32..4bda4ff7 100644 --- a/cpp/test/packed/packed_integration_test.cpp +++ b/cpp/test/packed/packed_integration_test.cpp @@ -27,8 +27,6 @@ TEST_F(PackedIntegrationTest, TestOneFile) { } EXPECT_TRUE(writer.Close().ok()); - std::vector paths = {file_path_ + "/0"}; - std::set needed_columns = {0, 1, 2}; PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_); @@ -47,8 +45,6 @@ TEST_F(PackedIntegrationTest, TestSplitColumnGroup) { } EXPECT_TRUE(writer.Close().ok()); - std::vector paths = {file_path_ + "/0", file_path_ + "/1"}; - std::set needed_columns = {0, 1, 2}; PackedRecordBatchReader pr(*fs_, file_path_, schema_, needed_columns, reader_memory_); diff --git a/go/Makefile b/go/Makefile index 059abb46..82c5d305 100644 --- a/go/Makefile +++ b/go/Makefile @@ -22,7 +22,7 @@ test: LD_LIBRARY_PATH=$(MILVUS_STORAGE_LD_DIR):$$LD_LIBRARY_PATH \ CGO_CFLAGS="$(CPPFLAGS)" \ CGO_LDFLAGS="$(LDFLAGS) -lmilvus-storage" \ - go test -count=1 -timeout 30s ./... + go test -count=1 -timeout 30s ./... -gcflags "all=-N -l" -o gdb/ proto: mkdir -p proto/manifest_proto diff --git a/go/packed/packed_reader.go b/go/packed/packed_reader.go index b386e734..04cfc4f4 100644 --- a/go/packed/packed_reader.go +++ b/go/packed/packed_reader.go @@ -31,7 +31,7 @@ import ( "github.com/apache/arrow/go/v12/arrow/cdata" ) -func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) { +func NewPackedReader(path string, schema *arrow.Schema, bufferSize int) (*PackedReader, error) { var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) @@ -49,7 +49,7 @@ func newPackedReader(path string, schema *arrow.Schema, bufferSize int) (*Packed return &PackedReader{cPackedReader: cPackedReader, schema: schema}, nil } -func (pr *PackedReader) readNext() (arrow.Record, error) { +func (pr *PackedReader) ReadNext() (arrow.Record, error) { var cArr C.CArrowArray var cSchema C.CArrowSchema status := C.ReadNext(pr.cPackedReader, &cArr, &cSchema) @@ -73,7 +73,7 @@ func (pr *PackedReader) readNext() (arrow.Record, error) { return recordBatch, nil } -func (pr *PackedReader) close() error { +func (pr *PackedReader) Close() error { status := C.CloseReader(pr.cPackedReader) if status != 0 { return errors.New("PackedReader: failed to close file") diff --git a/go/packed/packed_test.go b/go/packed/packed_test.go index e05fb219..592de259 100644 --- a/go/packed/packed_test.go +++ b/go/packed/packed_test.go @@ -21,9 +21,10 @@ import ( "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/memory" "github.com/stretchr/testify/assert" + "golang.org/x/exp/rand" ) -func TestPacked(t *testing.T) { +func TestPackedOneFile(t *testing.T) { batches := 100 schema := arrow.NewSchema([]arrow.Field{ {Name: "a", Type: arrow.PrimitiveTypes.Int32}, @@ -53,19 +54,93 @@ func TestPacked(t *testing.T) { defer rec.Release() path := "/tmp" bufferSize := 10 * 1024 * 1024 // 10MB - pw, err := newPackedWriter(path, schema, bufferSize) + pw, err := NewPackedWriter(path, schema, bufferSize) assert.NoError(t, err) for i := 0; i < batches; i++ { - err = pw.writeRecordBatch(rec) + err = pw.WriteRecordBatch(rec) assert.NoError(t, err) } - err = pw.close() + err = pw.Close() assert.NoError(t, err) - reader, err := newPackedReader(path, schema, bufferSize) + reader, err := NewPackedReader(path, schema, bufferSize) assert.NoError(t, err) - rr, err := reader.readNext() + rr, err := reader.ReadNext() assert.NoError(t, err) defer rr.Release() assert.Equal(t, int64(3*batches), rr.NumRows()) } + +func TestPackedMultiFiles(t *testing.T) { + batches := 1000 + schema := arrow.NewSchema([]arrow.Field{ + {Name: "a", Type: arrow.PrimitiveTypes.Int32}, + {Name: "b", Type: arrow.PrimitiveTypes.Int64}, + {Name: "c", Type: arrow.BinaryTypes.String}, + }, nil) + + b := array.NewRecordBuilder(memory.DefaultAllocator, schema) + strLen := 1000 + arrLen := 30 + defer b.Release() + for idx := range schema.Fields() { + switch idx { + case 0: + values := make([]int32, arrLen) + for i := 0; i < arrLen; i++ { + values[i] = int32(i + 1) + } + b.Field(idx).(*array.Int32Builder).AppendValues(values, nil) + case 1: + values := make([]int64, arrLen) + for i := 0; i < arrLen; i++ { + values[i] = int64(i + 1) + } + b.Field(idx).(*array.Int64Builder).AppendValues(values, nil) + case 2: + values := make([]string, arrLen) + for i := 0; i < arrLen; i++ { + values[i] = randomString(strLen) + } + b.Field(idx).(*array.StringBuilder).AppendValues(values, nil) + } + } + rec := b.NewRecord() + defer rec.Release() + path := "/tmp" + bufferSize := 10 * 1024 * 1024 // 10MB + pw, err := NewPackedWriter(path, schema, bufferSize) + assert.NoError(t, err) + for i := 0; i < batches; i++ { + err = pw.WriteRecordBatch(rec) + assert.NoError(t, err) + } + err = pw.Close() + assert.NoError(t, err) + + reader, err := NewPackedReader(path, schema, bufferSize) + assert.NoError(t, err) + var rows int64 = 0 + var rr arrow.Record + for { + rr, err = reader.ReadNext() + assert.NoError(t, err) + if rr == nil { + // end of file + break + } + + rows += rr.NumRows() + } + + assert.Equal(t, int64(arrLen*batches), rows) +} + +func randomString(length int) string { + const charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + result := make([]byte, length) + for i := range result { + result[i] = charset[rand.Intn(len(charset))] + } + return string(result) +} diff --git a/go/packed/packed_writer.go b/go/packed/packed_writer.go index c7ab8ad6..588922db 100644 --- a/go/packed/packed_writer.go +++ b/go/packed/packed_writer.go @@ -31,7 +31,7 @@ import ( "github.com/apache/arrow/go/v12/arrow/cdata" ) -func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { +func NewPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*PackedWriter, error) { var cas cdata.CArrowSchema cdata.ExportArrowSchema(schema, &cas) cSchema := (*C.struct_ArrowSchema)(unsafe.Pointer(&cas)) @@ -49,7 +49,7 @@ func newPackedWriter(path string, schema *arrow.Schema, bufferSize int) (*Packed return &PackedWriter{cPackedWriter: cPackedWriter}, nil } -func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error { +func (pw *PackedWriter) WriteRecordBatch(recordBatch arrow.Record) error { var caa cdata.CArrowArray var cas cdata.CArrowSchema @@ -66,7 +66,7 @@ func (pw *PackedWriter) writeRecordBatch(recordBatch arrow.Record) error { return nil } -func (pw *PackedWriter) close() error { +func (pw *PackedWriter) Close() error { status := C.CloseWriter(pw.cPackedWriter) if status != 0 { return errors.New("PackedWriter: failed to close file")