diff --git a/Makefile b/Makefile index ff7fe938c1207..7282eb6636062 100644 --- a/Makefile +++ b/Makefile @@ -476,6 +476,7 @@ generate-mockery-datacoord: getdeps $(INSTALL_PATH)/mockery --name=SubCluster --dir=internal/datacoord --filename=mock_subcluster.go --output=internal/datacoord --structname=MockSubCluster --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord --filename=mock_worker_manager.go --output=internal/datacoord --structname=MockWorkerManager --with-expecter --inpackage + $(INSTALL_PATH)/mockery --name=Manager --dir=internal/datacoord --filename=mock_segment_manager.go --output=internal/datacoord --structname=MockManager --with-expecter --inpackage generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index c5ac04149127e..eef98768f2d94 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package datacoord @@ -81,6 +81,64 @@ func (_c *MockManager_AllocImportSegment_Call) RunAndReturn(run func(context.Con return _c } +// AllocNewGrowingSegment provides a mock function with given fields: ctx, collectionID, partitionID, segmentID, channelName +func (_m *MockManager) AllocNewGrowingSegment(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, channelName string) (*SegmentInfo, error) { + ret := _m.Called(ctx, collectionID, partitionID, segmentID, channelName) + + var r0 *SegmentInfo + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string) (*SegmentInfo, error)); ok { + return rf(ctx, collectionID, partitionID, segmentID, channelName) + } + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, int64, string) *SegmentInfo); ok { + r0 = rf(ctx, collectionID, partitionID, segmentID, channelName) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*SegmentInfo) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, int64, string) error); ok { + r1 = rf(ctx, collectionID, partitionID, segmentID, channelName) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockManager_AllocNewGrowingSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocNewGrowingSegment' +type MockManager_AllocNewGrowingSegment_Call struct { + *mock.Call +} + +// AllocNewGrowingSegment is a helper method to define mock.On call +// - ctx context.Context +// - collectionID int64 +// - partitionID int64 +// - segmentID int64 +// - channelName string +func (_e *MockManager_Expecter) AllocNewGrowingSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, segmentID interface{}, channelName interface{}) *MockManager_AllocNewGrowingSegment_Call { + return &MockManager_AllocNewGrowingSegment_Call{Call: _e.mock.On("AllocNewGrowingSegment", ctx, collectionID, partitionID, segmentID, channelName)} +} + +func (_c *MockManager_AllocNewGrowingSegment_Call) Run(run func(ctx context.Context, collectionID int64, partitionID int64, segmentID int64, channelName string)) *MockManager_AllocNewGrowingSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(int64), args[4].(string)) + }) + return _c +} + +func (_c *MockManager_AllocNewGrowingSegment_Call) Return(_a0 *SegmentInfo, _a1 error) *MockManager_AllocNewGrowingSegment_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockManager_AllocNewGrowingSegment_Call) RunAndReturn(run func(context.Context, int64, int64, int64, string) (*SegmentInfo, error)) *MockManager_AllocNewGrowingSegment_Call { + _c.Call.Return(run) + return _c +} + // AllocSegment provides a mock function with given fields: ctx, collectionID, partitionID, channelName, requestRows func (_m *MockManager) AllocSegment(ctx context.Context, collectionID int64, partitionID int64, channelName string, requestRows int64) ([]*Allocation, error) { ret := _m.Called(ctx, collectionID, partitionID, channelName, requestRows) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 49aaedadcd552..ad186de4b55fb 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -74,9 +74,13 @@ func putAllocation(a *Allocation) { type Manager interface { // CreateSegment create new segment when segment not exist - // AllocSegment allocates rows and record the allocation. + // Deprecated: AllocSegment allocates rows and record the allocation, will be deprecated after enabling streamingnode. AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) AllocImportSegment(ctx context.Context, taskID int64, collectionID UniqueID, partitionID UniqueID, channelName string, level datapb.SegmentLevel) (*SegmentInfo, error) + + // AllocNewGrowingSegment allocates segment for streaming node. + AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) + // DropSegment drops the segment from manager. DropSegment(ctx context.Context, segmentID UniqueID) // FlushImportSegments set importing segment state to Flushed. @@ -320,7 +324,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID return nil, err } for _, allocation := range newSegmentAllocations { - segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName, commonpb.SegmentState_Growing, datapb.SegmentLevel_L1) + segment, err := s.openNewSegment(ctx, collectionID, partitionID, channelName) if err != nil { log.Error("Failed to open new segment for segment allocation") return nil, err @@ -417,9 +421,12 @@ func (s *SegmentManager) AllocImportSegment(ctx context.Context, taskID int64, c return segment, nil } -func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, - channelName string, segmentState commonpb.SegmentState, level datapb.SegmentLevel, -) (*SegmentInfo, error) { +// AllocNewGrowingSegment allocates segment for streaming node. +func (s *SegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) { + return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, segmentID, channelName) +} + +func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*SegmentInfo, error) { log := log.Ctx(ctx) ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "open-Segment") defer sp.End() @@ -428,6 +435,10 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique log.Error("failed to open new segment while allocID", zap.Error(err)) return nil, err } + return s.openNewSegmentWithGivenSegmentID(ctx, collectionID, partitionID, id, channelName) +} + +func (s *SegmentManager) openNewSegmentWithGivenSegmentID(ctx context.Context, collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*SegmentInfo, error) { maxNumOfRows, err := s.estimateMaxNumOfRows(collectionID) if err != nil { log.Error("failed to open new segment while estimateMaxNumOfRows", zap.Error(err)) @@ -435,14 +446,14 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique } segmentInfo := &datapb.SegmentInfo{ - ID: id, + ID: segmentID, CollectionID: collectionID, PartitionID: partitionID, InsertChannel: channelName, NumOfRows: 0, - State: segmentState, + State: commonpb.SegmentState_Growing, MaxRowNum: int64(maxNumOfRows), - Level: level, + Level: datapb.SegmentLevel_L1, LastExpireTime: 0, } segment := NewSegmentInfo(segmentInfo) @@ -450,7 +461,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique log.Error("failed to add segment to DataCoord", zap.Error(err)) return nil, err } - s.segments = append(s.segments, id) + s.segments = append(s.segments, segmentID) log.Info("datacoord: estimateTotalRows: ", zap.Int64("CollectionID", segmentInfo.CollectionID), zap.Int64("SegmentID", segmentInfo.ID), diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 17f4497b5e8ad..64848e960e1e9 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -831,6 +831,10 @@ func (s *spySegmentManager) AllocSegment(ctx context.Context, collectionID Uniqu return nil, nil } +func (s *spySegmentManager) AllocNewGrowingSegment(ctx context.Context, collectionID, partitionID, segmentID UniqueID, channelName string) (*SegmentInfo, error) { + return nil, nil +} + func (s *spySegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64, taskID int64) (*Allocation, error) { return nil, nil } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 6407e97f05801..513a9fdeeace2 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -245,6 +245,28 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI }, nil } +// AllocSegment alloc a new growing segment, add it into segment meta. +func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) { + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil + } + // !!! SegmentId must be allocated from rootCoord id allocation. + if req.GetCollectionId() == 0 || req.GetPartitionId() == 0 || req.GetVchannel() == "" || req.GetSegmentId() == 0 { + return &datapb.AllocSegmentResponse{Status: merr.Status(merr.ErrParameterInvalid)}, nil + } + + // Alloc new growing segment and return the segment info. + segmentInfo, err := s.segmentManager.AllocNewGrowingSegment(ctx, req.GetCollectionId(), req.GetPartitionId(), req.GetSegmentId(), req.GetVchannel()) + if err != nil { + return &datapb.AllocSegmentResponse{Status: merr.Status(err)}, nil + } + clonedSegmentInfo := segmentInfo.Clone() + return &datapb.AllocSegmentResponse{ + SegmentInfo: clonedSegmentInfo.SegmentInfo, + Status: merr.Success(), + }, nil +} + // GetSegmentStates returns segments state func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { if err := merr.CheckHealthy(s.GetStateCode()); err != nil { diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index a1af17e61c6a5..b0a10395224ab 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -168,6 +168,12 @@ func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI }) } +func (c *Client) AllocSegment(ctx context.Context, in *datapb.AllocSegmentRequest, opts ...grpc.CallOption) (*datapb.AllocSegmentResponse, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.AllocSegmentResponse, error) { + return client.AllocSegment(ctx, in) + }) +} + // GetSegmentStates requests segment state information // // ctx is the context to control request deadline and cancellation diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index b59453eb0db6f..baa19c62ab1d2 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -303,6 +303,11 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI return s.dataCoord.AssignSegmentID(ctx, req) } +// AllocSegment alloc a new growing segment, add it into segment meta. +func (s *Server) AllocSegment(ctx context.Context, req *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) { + return s.dataCoord.AllocSegment(ctx, req) +} + // GetSegmentStates gets states of segments func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { return s.dataCoord.GetSegmentStates(ctx, req) diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index 5de170cfabcd5..24e7b8e248deb 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -38,6 +38,61 @@ func (_m *MockDataCoord) EXPECT() *MockDataCoord_Expecter { return &MockDataCoord_Expecter{mock: &_m.Mock} } +// AllocSegment provides a mock function with given fields: _a0, _a1 +func (_m *MockDataCoord) AllocSegment(_a0 context.Context, _a1 *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *datapb.AllocSegmentResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest) *datapb.AllocSegmentResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.AllocSegmentResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.AllocSegmentRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoord_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment' +type MockDataCoord_AllocSegment_Call struct { + *mock.Call +} + +// AllocSegment is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *datapb.AllocSegmentRequest +func (_e *MockDataCoord_Expecter) AllocSegment(_a0 interface{}, _a1 interface{}) *MockDataCoord_AllocSegment_Call { + return &MockDataCoord_AllocSegment_Call{Call: _e.mock.On("AllocSegment", _a0, _a1)} +} + +func (_c *MockDataCoord_AllocSegment_Call) Run(run func(_a0 context.Context, _a1 *datapb.AllocSegmentRequest)) *MockDataCoord_AllocSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datapb.AllocSegmentRequest)) + }) + return _c +} + +func (_c *MockDataCoord_AllocSegment_Call) Return(_a0 *datapb.AllocSegmentResponse, _a1 error) *MockDataCoord_AllocSegment_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoord_AllocSegment_Call) RunAndReturn(run func(context.Context, *datapb.AllocSegmentRequest) (*datapb.AllocSegmentResponse, error)) *MockDataCoord_AllocSegment_Call { + _c.Call.Return(run) + return _c +} + // AlterIndex provides a mock function with given fields: _a0, _a1 func (_m *MockDataCoord) AlterIndex(_a0 context.Context, _a1 *indexpb.AlterIndexRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datacoord_client.go b/internal/mocks/mock_datacoord_client.go index f12b76f6ca825..8f53648f30520 100644 --- a/internal/mocks/mock_datacoord_client.go +++ b/internal/mocks/mock_datacoord_client.go @@ -33,6 +33,76 @@ func (_m *MockDataCoordClient) EXPECT() *MockDataCoordClient_Expecter { return &MockDataCoordClient_Expecter{mock: &_m.Mock} } +// AllocSegment provides a mock function with given fields: ctx, in, opts +func (_m *MockDataCoordClient) AllocSegment(ctx context.Context, in *datapb.AllocSegmentRequest, opts ...grpc.CallOption) (*datapb.AllocSegmentResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *datapb.AllocSegmentResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) (*datapb.AllocSegmentResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) *datapb.AllocSegmentResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.AllocSegmentResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoordClient_AllocSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AllocSegment' +type MockDataCoordClient_AllocSegment_Call struct { + *mock.Call +} + +// AllocSegment is a helper method to define mock.On call +// - ctx context.Context +// - in *datapb.AllocSegmentRequest +// - opts ...grpc.CallOption +func (_e *MockDataCoordClient_Expecter) AllocSegment(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_AllocSegment_Call { + return &MockDataCoordClient_AllocSegment_Call{Call: _e.mock.On("AllocSegment", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataCoordClient_AllocSegment_Call) Run(run func(ctx context.Context, in *datapb.AllocSegmentRequest, opts ...grpc.CallOption)) *MockDataCoordClient_AllocSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*datapb.AllocSegmentRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataCoordClient_AllocSegment_Call) Return(_a0 *datapb.AllocSegmentResponse, _a1 error) *MockDataCoordClient_AllocSegment_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoordClient_AllocSegment_Call) RunAndReturn(run func(context.Context, *datapb.AllocSegmentRequest, ...grpc.CallOption) (*datapb.AllocSegmentResponse, error)) *MockDataCoordClient_AllocSegment_Call { + _c.Call.Return(run) + return _c +} + // AlterIndex provides a mock function with given fields: ctx, in, opts func (_m *MockDataCoordClient) AlterIndex(ctx context.Context, in *indexpb.AlterIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index e2b5afb4eaea1..5a5718ae6c74c 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -35,7 +35,12 @@ service DataCoord { rpc Flush(FlushRequest) returns (FlushResponse) {} - rpc AssignSegmentID(AssignSegmentIDRequest) returns (AssignSegmentIDResponse) {} + // AllocSegment alloc a new growing segment, add it into segment meta. + rpc AllocSegment(AllocSegmentRequest) returns (AllocSegmentResponse) {} + + rpc AssignSegmentID(AssignSegmentIDRequest) returns (AssignSegmentIDResponse) { + option deprecated = true; + } rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {} rpc GetSegmentStates(GetSegmentStatesRequest) returns (GetSegmentStatesResponse) {} @@ -168,6 +173,18 @@ message SegmentIDRequest { SegmentLevel level = 7; } +message AllocSegmentRequest { + int64 collection_id = 1; + int64 partition_id = 2; + int64 segment_id = 3; // segment id must be allocate from rootcoord idalloc service. + string vchannel = 4; +} + +message AllocSegmentResponse { + SegmentInfo segment_info = 1; + common.Status status = 2; +} + message AssignSegmentIDRequest { int64 nodeID = 1; string peer_role = 2;