From 4ab6d6aed0412f6696389f2f6e4cfb522bca4042 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Mon, 20 Feb 2023 19:04:49 +0800 Subject: [PATCH] Add audit log and unify v2.1-2.2 proto logic Signed-off-by: Congqi Xia --- models/audit.go | 40 ++++ models/channel.go | 7 + models/channel_watch.go | 64 +++++++ models/channel_watch_state.go | 40 ++++ models/collection.go | 129 +++++++++++++ models/collection_loaded.go | 110 +++++++++++ models/collection_schema.go | 42 ++++ models/collection_state.go | 28 +++ models/common.go | 12 ++ models/consistency_level.go | 31 +++ models/data_type.go | 52 +++++ models/segment.go | 232 +++++++++++++++++++++++ models/segment_state.go | 40 ++++ models/version.go | 6 + states/current_version.go | 55 ++++++ states/distribution.go | 73 +++++++ states/download_pk.go | 2 +- states/download_segment.go | 2 +- states/etcd/audit/audit.go | 85 +++++++++ states/etcd/audit/audit_write.go | 44 +++++ states/etcd/commands.go | 8 +- states/etcd/common/channel.go | 46 ++++- states/etcd/common/collection.go | 162 ++++++++++++---- states/etcd/common/collection_history.go | 95 ++++++---- states/etcd/common/collection_loaded.go | 39 ++++ states/etcd/common/index.go | 4 +- states/etcd/common/legacy.go | 4 +- states/etcd/common/list.go | 4 +- states/etcd/common/replica.go | 6 +- states/etcd/common/segment.go | 99 +++++++++- states/etcd/common/session.go | 4 +- states/etcd/remove/channel.go | 2 +- states/etcd/remove/segment.go | 6 +- states/etcd/repair/channel.go | 4 +- states/etcd/repair/checkpoint.go | 10 +- states/etcd/repair/segment.go | 4 +- states/etcd/repair/segment_empty.go | 2 +- states/etcd/show/channel_watched.go | 40 ++-- states/etcd/show/checkpoint.go | 8 +- states/etcd/show/collection.go | 96 ++++------ states/etcd/show/collection_history.go | 38 ++-- states/etcd/show/collection_loaded.go | 38 ++-- states/etcd/show/index.go | 6 +- states/etcd/show/legacy_qc_channel.go | 4 +- states/etcd/show/legacy_qc_cluster.go | 4 +- states/etcd/show/legacy_qc_task.go | 12 +- states/etcd/show/replica.go | 2 +- states/etcd/show/segment.go | 68 ++++--- states/etcd/show/segment_index.go | 4 +- states/etcd/show/segment_loaded.go | 2 +- states/etcd/show/session.go | 2 +- states/etcd/version/version.go | 16 ++ states/etcd_backup.go | 10 +- states/etcd_connect.go | 8 +- states/etcd_restore.go | 4 +- states/force_release.go | 17 +- states/garbage_collect.go | 4 +- states/inspect_primary_key.go | 2 +- states/instance.go | 40 +++- states/kill.go | 4 +- states/metrics.go | 2 +- states/start.go | 4 + states/update_log_level.go | 4 +- states/util.go | 2 +- states/visit.go | 6 +- 65 files changed, 1733 insertions(+), 307 deletions(-) create mode 100644 models/audit.go create mode 100644 models/channel.go create mode 100644 models/channel_watch.go create mode 100644 models/channel_watch_state.go create mode 100644 models/collection.go create mode 100644 models/collection_loaded.go create mode 100644 models/collection_schema.go create mode 100644 models/collection_state.go create mode 100644 models/common.go create mode 100644 models/consistency_level.go create mode 100644 models/data_type.go create mode 100644 models/segment.go create mode 100644 models/segment_state.go create mode 100644 models/version.go create mode 100644 states/current_version.go create mode 100644 states/distribution.go create mode 100644 states/etcd/audit/audit.go create mode 100644 states/etcd/audit/audit_write.go create mode 100644 states/etcd/common/collection_loaded.go create mode 100644 states/etcd/version/version.go diff --git a/models/audit.go b/models/audit.go new file mode 100644 index 00000000..a9f15fbd --- /dev/null +++ b/models/audit.go @@ -0,0 +1,40 @@ +package models + +import "github.com/golang/protobuf/proto" + +// AuditHeader stores birdwatcher audit log header info. +type AuditHeader struct { + // Version number for audit format + Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"` + // OpType enum for operation type + OpType int32 `protobuf:"varint,2,opt,name=op_type,proto3" json:"op_type,omitempty"` + // EntriesNum following entries number + EntriesNum int32 `protobuf:"varint,3,opt,name=entries_num,proto3" json:"entries_num,omitempty"` +} + +// Reset implements protoiface.MessageV1 +func (v *AuditHeader) Reset() { + *v = AuditHeader{} +} + +// String implements protoiface.MessageV1 +func (v *AuditHeader) String() string { + return proto.CompactTextString(v) +} + +// String implements protoiface.MessageV1 +func (v *AuditHeader) ProtoMessage() {} + +// AuditOpType operation enum type for audit log. +type AuditOpType int32 + +const ( + // OpDel operation type for delete. + OpDel AuditOpType = 1 + // OpPut operation type for put. + OpPut AuditOpType = 2 + // OpPutBefore sub header for put before value. + OpPutBefore AuditOpType = 3 + // OpPutAfter sub header for put after value. + OpPutAfter AuditOpType = 4 +) diff --git a/models/channel.go b/models/channel.go new file mode 100644 index 00000000..6d209ce1 --- /dev/null +++ b/models/channel.go @@ -0,0 +1,7 @@ +package models + +type Channel struct { + PhysicalName string + VirtualName string + StartPosition *MsgPosition +} diff --git a/models/channel_watch.go b/models/channel_watch.go new file mode 100644 index 00000000..10fb2e52 --- /dev/null +++ b/models/channel_watch.go @@ -0,0 +1,64 @@ +package models + +type ChannelWatch struct { + Vchan VChannelInfo + StartTs int64 + State ChannelWatchState + TimeoutTs int64 + + // key + key string +} + +func (c *ChannelWatch) Key() string { + return c.key +} + +type VChannelInfo struct { + CollectionID int64 + ChannelName string + SeekPosition *MsgPosition + UnflushedSegmentIds []int64 + FlushedSegmentIds []int64 + DroppedSegmentIds []int64 +} + +type vchannelInfoBase interface { + GetCollectionID() int64 + GetChannelName() string + GetUnflushedSegmentIds() []int64 + GetFlushedSegmentIds() []int64 + GetDroppedSegmentIds() []int64 +} + +func GetChannelWatchInfo[ChannelWatchBase interface { + GetVchan() vchan + GetStartTs() int64 + GetState() watchState + GetTimeoutTs() int64 +}, watchState ~int32, vchan interface { + vchannelInfoBase + GetSeekPosition() pos +}, pos msgPosBase](info ChannelWatchBase, key string) *ChannelWatch { + return &ChannelWatch{ + Vchan: getVChannelInfo[vchan, pos](info.GetVchan()), + StartTs: info.GetStartTs(), + State: ChannelWatchState(info.GetState()), + TimeoutTs: info.GetTimeoutTs(), + key: key, + } +} + +func getVChannelInfo[info interface { + vchannelInfoBase + GetSeekPosition() pos +}, pos msgPosBase](vchan info) VChannelInfo { + return VChannelInfo{ + CollectionID: vchan.GetCollectionID(), + ChannelName: vchan.GetChannelName(), + UnflushedSegmentIds: vchan.GetUnflushedSegmentIds(), + FlushedSegmentIds: vchan.GetFlushedSegmentIds(), + DroppedSegmentIds: vchan.GetDroppedSegmentIds(), + SeekPosition: newMsgPosition(vchan.GetSeekPosition()), + } +} diff --git a/models/channel_watch_state.go b/models/channel_watch_state.go new file mode 100644 index 00000000..f94994d9 --- /dev/null +++ b/models/channel_watch_state.go @@ -0,0 +1,40 @@ +package models + +type ChannelWatchState int32 + +const ( + ChannelWatchStateUncomplete ChannelWatchState = 0 + ChannelWatchStateComplete ChannelWatchState = 1 + ChannelWatchStateToWatch ChannelWatchState = 2 + ChannelWatchStateWatchSuccess ChannelWatchState = 3 + ChannelWatchStateWatchFailure ChannelWatchState = 4 + ChannelWatchStateToRelease ChannelWatchState = 5 + ChannelWatchStateReleaseSuccess ChannelWatchState = 6 + ChannelWatchStateReleaseFailure ChannelWatchState = 7 +) + +var ChannelWatchStatename = map[int32]string{ + 0: "Uncomplete", + 1: "Complete", + 2: "ToWatch", + 3: "WatchSuccess", + 4: "WatchFailure", + 5: "ToRelease", + 6: "ReleaseSuccess", + 7: "ReleaseFailure", +} + +var ChannelWatchStatevalue = map[string]int32{ + "Uncomplete": 0, + "Complete": 1, + "ToWatch": 2, + "WatchSuccess": 3, + "WatchFailure": 4, + "ToRelease": 5, + "ReleaseSuccess": 6, + "ReleaseFailure": 7, +} + +func (x ChannelWatchState) String() string { + return EnumName(ChannelWatchStatename, int32(x)) +} diff --git a/models/collection.go b/models/collection.go new file mode 100644 index 00000000..23359e50 --- /dev/null +++ b/models/collection.go @@ -0,0 +1,129 @@ +package models + +import ( + "sync" + + "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" + "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" + "github.com/milvus-io/birdwatcher/proto/v2.0/schemapb" + commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb" + etcdpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/etcdpb" + schemapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/schemapb" + "github.com/samber/lo" +) + +// Collection model for collection information. +type Collection struct { + ID int64 + // TODO partitions + Schema CollectionSchema + CreateTime uint64 + Channels []Channel + ShardsNum int32 + ConsistencyLevel ConsistencyLevel + State CollectionState + Properties map[string]string + + // etcd collection key + key string + + // lazy load func + loadOnce sync.Once + lazyLoad func(*Collection) +} + +// CollectionHistory collection models with extra history data. +type CollectionHistory struct { + Collection + Ts uint64 + Dropped bool +} + +// newCollectionFromBase fetchs common base information form proto objects +func newCollectionFromBase[collectionBase interface { + GetID() int64 + GetCreateTime() uint64 + GetShardsNum() int32 + GetPhysicalChannelNames() []string + GetVirtualChannelNames() []string + GetStartPositions() []KD +}, KD interface { + GetKey() string + GetData() []byte +}](info collectionBase) *Collection { + c := &Collection{} + + c.ID = info.GetID() + c.CreateTime = info.GetCreateTime() + c.ShardsNum = info.GetShardsNum() + c.Channels = getChannels(info.GetPhysicalChannelNames(), info.GetVirtualChannelNames(), info.GetStartPositions()) + + return c +} + +// NewCollectionFrom2_1 parses etcdpb.CollectionInfo(proto v2.0) to models.Collection. +func NewCollectionFromV2_1(info *etcdpb.CollectionInfo, key string) *Collection { + c := newCollectionFromBase[*etcdpb.CollectionInfo, *commonpb.KeyDataPair](info) + c.key = key + schema := info.GetSchema() + c.Schema = newSchemaFromBase(schema) + c.Schema.Fields = lo.Map(schema.GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) FieldSchema { + fs := NewFieldSchemaFromBase[*schemapb.FieldSchema, schemapb.DataType](fieldSchema) + fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams()) + return fs + }) + // hard code created for version <= v2.1.4 + c.State = CollectionStateCollectionCreated + c.ConsistencyLevel = ConsistencyLevel(info.GetConsistencyLevel()) + + return c +} + +// NewCollectionFromV2_2 parses etcdpb.CollectionInfo(proto v2.2) to models.Collections. +func NewCollectionFromV2_2(info *etcdpbv2.CollectionInfo, key string, fields []*schemapbv2.FieldSchema) *Collection { + c := newCollectionFromBase[*etcdpbv2.CollectionInfo, *commonpbv2.KeyDataPair](info) + c.key = key + c.State = CollectionState(info.GetState()) + schema := info.GetSchema() + schema.Fields = fields + c.Schema = newSchemaFromBase(schema) + + c.Schema.Fields = lo.Map(fields, func(fieldSchema *schemapbv2.FieldSchema, _ int) FieldSchema { + fs := NewFieldSchemaFromBase[*schemapbv2.FieldSchema, schemapbv2.DataType](fieldSchema) + fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams()) + return fs + }) + + c.ConsistencyLevel = ConsistencyLevel(info.GetConsistencyLevel()) + info.GetStartPositions() + + return c +} + +func getChannels[cp interface { + GetKey() string + GetData() []byte +}](pcs, vcs []string, cps []cp) []Channel { + return lo.Map(cps, func(c cp, idx int) Channel { + return Channel{ + PhysicalName: pcs[idx], + VirtualName: vcs[idx], + StartPosition: &MsgPosition{ + ChannelName: c.GetKey(), + MsgID: c.GetData(), + }, + } + }) +} + +// GetMapFromKVPairs parses kv pairs to map[string]string. +func GetMapFromKVPairs[kvPair interface { + GetKey() string + GetValue() string +}](pairs []kvPair) map[string]string { + result := make(map[string]string) + for _, kv := range pairs { + result[kv.GetKey()] = kv.GetValue() + } + return result +} diff --git a/models/collection_loaded.go b/models/collection_loaded.go new file mode 100644 index 00000000..35924fc4 --- /dev/null +++ b/models/collection_loaded.go @@ -0,0 +1,110 @@ +package models + +import ( + "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" + querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" +) + +// CollectionLoaded models collection loaded information. +type CollectionLoaded struct { + CollectionID int64 + /* v1 deprecated + PartitionIDs []int64 + PartitionStates []PartitionState + */ + LoadType LoadType + Schema CollectionSchema + ReleasedPartitionIDs []int64 + InMemoryPercentage int64 + ReplicaIDs []int64 + ReplicaNumber int32 + // since 2.2 + Status LoadStatus + FieldIndexID map[int64]int64 + + // orignial etcd key + key string + Version string +} + +func newCollectionLoadedBase[base interface { + GetCollectionID() int64 + GetReplicaNumber() int32 +}](info base) *CollectionLoaded { + return &CollectionLoaded{ + CollectionID: info.GetCollectionID(), + ReplicaNumber: info.GetReplicaNumber(), + } +} + +func NewCollectionLoadedV2_1(info *querypb.CollectionInfo, key string) *CollectionLoaded { + c := newCollectionLoadedBase(info) + c.ReleasedPartitionIDs = info.GetPartitionIDs() + + c.LoadType = LoadType(info.GetLoadType()) + c.Schema = newSchemaFromBase(info.GetSchema()) + c.InMemoryPercentage = info.GetInMemoryPercentage() + c.ReplicaIDs = info.GetReplicaIds() + c.Version = LTEVersion2_1 + c.key = key + + return c +} + +func NewCollectionLoadedV2_2(info *querypbv2.CollectionLoadInfo, key string) *CollectionLoaded { + c := newCollectionLoadedBase(info) + c.ReleasedPartitionIDs = info.GetReleasedPartitions() + c.Status = LoadStatus(info.GetStatus()) + c.FieldIndexID = info.GetFieldIndexID() + c.Version = GTEVersion2_2 + c.key = key + return c +} + +type LoadType int32 + +const ( + LoadTypeUnKnownType LoadType = 0 + LoadTypeLoadPartition LoadType = 1 + LoadTypeLoadCollection LoadType = 2 +) + +var LoadTypename = map[int32]string{ + 0: "UnKnownType", + 1: "LoadPartition", + 2: "LoadCollection", +} + +var LoadTypevalue = map[string]int32{ + "UnKnownType": 0, + "LoadPartition": 1, + "LoadCollection": 2, +} + +func (x LoadType) String() string { + return EnumName(LoadTypename, int32(x)) +} + +type LoadStatus int32 + +const ( + LoadStatusInvalid LoadStatus = 0 + LoadStatusLoading LoadStatus = 1 + LoadStatusLoaded LoadStatus = 2 +) + +var LoadStatusname = map[int32]string{ + 0: "Invalid", + 1: "Loading", + 2: "Loaded", +} + +var LoadStatusvalue = map[string]int32{ + "Invalid": 0, + "Loading": 1, + "Loaded": 2, +} + +func (x LoadStatus) String() string { + return EnumName(LoadStatusname, int32(x)) +} diff --git a/models/collection_schema.go b/models/collection_schema.go new file mode 100644 index 00000000..ed72648b --- /dev/null +++ b/models/collection_schema.go @@ -0,0 +1,42 @@ +package models + +type CollectionSchema struct { + Name string + Fields []FieldSchema +} + +type FieldSchema struct { + FieldID int64 + Name string + IsPrimaryKey bool + AutoID bool + DataType DataType + Description string + Properties map[string]string +} + +func newSchemaFromBase[schemaBase interface { + GetName() string +}](schema schemaBase) CollectionSchema { + return CollectionSchema{ + Name: schema.GetName(), + } +} + +func NewFieldSchemaFromBase[fieldSchemaBase interface { + GetFieldID() int64 + GetName() string + GetAutoID() bool + GetIsPrimaryKey() bool + GetDescription() string + GetDataType() dt +}, dt ~int32](schema fieldSchemaBase) FieldSchema { + return FieldSchema{ + FieldID: schema.GetFieldID(), + Name: schema.GetName(), + DataType: DataType(schema.GetDataType()), + IsPrimaryKey: schema.GetIsPrimaryKey(), + AutoID: schema.GetAutoID(), + Description: schema.GetDescription(), + } +} diff --git a/models/collection_state.go b/models/collection_state.go new file mode 100644 index 00000000..206cac9d --- /dev/null +++ b/models/collection_state.go @@ -0,0 +1,28 @@ +package models + +type CollectionState int32 + +const ( + CollectionStateCollectionCreated CollectionState = 0 + CollectionStateCollectionCreating CollectionState = 1 + CollectionStateCollectionDropping CollectionState = 2 + CollectionStateCollectionDropped CollectionState = 3 +) + +var CollectionStatename = map[int32]string{ + 0: "CollectionCreated", + 1: "CollectionCreating", + 2: "CollectionDropping", + 3: "CollectionDropped", +} + +var CollectionStatevalue = map[string]int32{ + "CollectionCreated": 0, + "CollectionCreating": 1, + "CollectionDropping": 2, + "CollectionDropped": 3, +} + +func (x CollectionState) String() string { + return EnumName(CollectionStatename, int32(x)) +} diff --git a/models/common.go b/models/common.go new file mode 100644 index 00000000..e5f5be6e --- /dev/null +++ b/models/common.go @@ -0,0 +1,12 @@ +package models + +import "strconv" + +// EnumName returns proto name base on value-name mapping. +func EnumName(m map[int32]string, v int32) string { + s, ok := m[v] + if ok { + return s + } + return strconv.Itoa(int(v)) +} diff --git a/models/consistency_level.go b/models/consistency_level.go new file mode 100644 index 00000000..807ce89d --- /dev/null +++ b/models/consistency_level.go @@ -0,0 +1,31 @@ +package models + +type ConsistencyLevel int32 + +const ( + ConsistencyLevelStrong ConsistencyLevel = 0 + ConsistencyLevelSession ConsistencyLevel = 1 + ConsistencyLevelBounded ConsistencyLevel = 2 + ConsistencyLevelEventually ConsistencyLevel = 3 + ConsistencyLevelCustomized ConsistencyLevel = 4 +) + +var ConsistencyLevelname = map[int32]string{ + 0: "Strong", + 1: "Session", + 2: "Bounded", + 3: "Eventually", + 4: "Customized", +} + +var ConsistencyLevelvalue = map[string]int32{ + "Strong": 0, + "Session": 1, + "Bounded": 2, + "Eventually": 3, + "Customized": 4, +} + +func (x ConsistencyLevel) String() string { + return EnumName(ConsistencyLevelname, int32(x)) +} diff --git a/models/data_type.go b/models/data_type.go new file mode 100644 index 00000000..b472c900 --- /dev/null +++ b/models/data_type.go @@ -0,0 +1,52 @@ +package models + +type DataType int32 + +const ( + DataTypeNone DataType = 0 + DataTypeBool DataType = 1 + DataTypeInt8 DataType = 2 + DataTypeInt16 DataType = 3 + DataTypeInt32 DataType = 4 + DataTypeInt64 DataType = 5 + DataTypeFloat DataType = 10 + DataTypeDouble DataType = 11 + DataTypeString DataType = 20 + DataTypeVarChar DataType = 21 + DataTypeBinaryVector DataType = 100 + DataTypeFloatVector DataType = 101 +) + +var DataTypename = map[int32]string{ + 0: "None", + 1: "Bool", + 2: "Int8", + 3: "Int16", + 4: "Int32", + 5: "Int64", + 10: "Float", + 11: "Double", + 20: "String", + 21: "VarChar", + 100: "BinaryVector", + 101: "FloatVector", +} + +var DataTypevalue = map[string]int32{ + "None": 0, + "Bool": 1, + "Int8": 2, + "Int16": 3, + "Int32": 4, + "Int64": 5, + "Float": 10, + "Double": 11, + "String": 20, + "VarChar": 21, + "BinaryVector": 100, + "FloatVector": 101, +} + +func (x DataType) String() string { + return EnumName(DataTypename, int32(x)) +} diff --git a/models/segment.go b/models/segment.go new file mode 100644 index 00000000..2f4275a7 --- /dev/null +++ b/models/segment.go @@ -0,0 +1,232 @@ +package models + +import ( + "fmt" + "sync" + + "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" + "github.com/samber/lo" +) + +// Segment is the common model for segment information. +type Segment struct { + ID int64 + CollectionID int64 + PartitionID int64 + InsertChannel string + NumOfRows int64 + State SegmentState + MaxRowNum int64 + LastExpireTime uint64 + CreatedByCompaction bool + CompactionFrom []int64 + DroppedAt uint64 + // position + StartPosition *MsgPosition + DmlPosition *MsgPosition + // field binlogs + binlogs []*FieldBinlog + statslogs []*FieldBinlog + deltalogs []*FieldBinlog + // Semantic version + Version string + + // etcd segment key + key string + + // lazy load func + loadOnce sync.Once + lazyLoad func(*Segment) +} + +func newSegmentFromBase[segmentBase interface { + GetID() int64 + GetCollectionID() int64 + GetPartitionID() int64 + GetInsertChannel() string + GetNumOfRows() int64 + GetMaxRowNum() int64 + GetLastExpireTime() uint64 + GetCreatedByCompaction() bool + GetCompactionFrom() []int64 + GetDroppedAt() uint64 +}](info segmentBase) *Segment { + s := &Segment{} + + s.ID = info.GetID() + s.CollectionID = info.GetCollectionID() + s.PartitionID = info.GetPartitionID() + s.InsertChannel = info.GetInsertChannel() + s.NumOfRows = info.GetNumOfRows() + s.MaxRowNum = info.GetMaxRowNum() + s.LastExpireTime = info.GetLastExpireTime() + s.CreatedByCompaction = info.GetCreatedByCompaction() + s.CompactionFrom = info.GetCompactionFrom() + s.DroppedAt = info.GetDroppedAt() + + return s +} + +func NewSegmentFromV2_1(info *datapb.SegmentInfo, key string) *Segment { + s := newSegmentFromBase(info) + s.key = key + s.State = SegmentState(info.GetState()) + s.StartPosition = newMsgPosition(info.GetStartPosition()) + s.DmlPosition = newMsgPosition(info.GetDmlPosition()) + + mFunc := func(fbl *datapb.FieldBinlog, _ int) *FieldBinlog { + r := &FieldBinlog{ + FieldID: fbl.GetFieldID(), + Binlogs: lo.Map(fbl.GetBinlogs(), func(binlog *datapb.Binlog, _ int) *Binlog { + return newBinlog(binlog) + }), + } + return r + } + s.binlogs = lo.Map(info.GetBinlogs(), mFunc) + s.statslogs = lo.Map(info.GetStatslogs(), mFunc) + s.deltalogs = lo.Map(info.GetDeltalogs(), mFunc) + + s.Version = "<=2.1.4" + return s +} + +func NewSegmentFromV2_2(info *datapbv2.SegmentInfo, key string, + lazy func() ([]datapbv2.FieldBinlog, []datapbv2.FieldBinlog, []datapbv2.FieldBinlog, error)) *Segment { + s := newSegmentFromBase(info) + s.key = key + s.State = SegmentState(info.GetState()) + s.StartPosition = newMsgPosition(info.GetStartPosition()) + s.DmlPosition = newMsgPosition(info.GetDmlPosition()) + + s.lazyLoad = func(s *Segment) { + mFunc := func(fbl datapbv2.FieldBinlog, _ int) *FieldBinlog { + r := &FieldBinlog{ + FieldID: fbl.GetFieldID(), + Binlogs: lo.Map(fbl.GetBinlogs(), func(binlog *datapbv2.Binlog, _ int) *Binlog { + return newBinlog(binlog) + }), + } + return r + } + binlogs, statslogs, deltalogs, err := lazy() + if err != nil { + fmt.Println("lazy load binlog failed", err.Error()) + return + } + s.binlogs = lo.Map(binlogs, mFunc) + s.statslogs = lo.Map(statslogs, mFunc) + s.deltalogs = lo.Map(deltalogs, mFunc) + } + + s.Version = ">=2.2.0" + return s +} + +func (s *Segment) GetBinlogs() []*FieldBinlog { + s.loadOnce.Do(func() { + if s.lazyLoad != nil { + s.lazyLoad(s) + } + }) + return s.binlogs +} + +func (s *Segment) GetStatslogs() []*FieldBinlog { + s.loadOnce.Do(func() { + if s.lazyLoad != nil { + s.lazyLoad(s) + } + }) + return s.statslogs +} + +func (s *Segment) GetDeltalogs() []*FieldBinlog { + s.loadOnce.Do(func() { + if s.lazyLoad != nil { + s.lazyLoad(s) + } + }) + return s.deltalogs +} + +func (s *Segment) GetStartPosition() *MsgPosition { + if s == nil { + return nil + } + return s.StartPosition +} + +func (s *Segment) GetDmlPosition() *MsgPosition { + if s == nil { + return nil + } + return s.DmlPosition +} + +type MsgPosition struct { + ChannelName string + MsgID []byte + MsgGroup string + Timestamp uint64 +} + +type msgPosBase interface { + GetChannelName() string + GetMsgID() []byte + GetMsgGroup() string + GetTimestamp() uint64 +} + +func newMsgPosition[T msgPosBase](pos T) *MsgPosition { + return &MsgPosition{ + ChannelName: pos.GetChannelName(), + MsgID: pos.GetMsgID(), + MsgGroup: pos.GetMsgGroup(), + Timestamp: pos.GetTimestamp(), + } +} + +func (pos *MsgPosition) GetTimestamp() uint64 { + if pos == nil { + return 0 + } + return pos.Timestamp +} + +func (pos *MsgPosition) GetChannelName() string { + if pos == nil { + return "" + } + return pos.ChannelName +} + +type FieldBinlog struct { + FieldID int64 + Binlogs []*Binlog +} + +type Binlog struct { + EntriesNum int64 + TimestampFrom uint64 + TimestampTo uint64 + LogPath string + LogSize int64 +} + +func newBinlog[T interface { + GetEntriesNum() int64 + GetTimestampFrom() uint64 + GetTimestampTo() uint64 + GetLogPath() string + GetLogSize() int64 +}](binlog T) *Binlog { + return &Binlog{ + EntriesNum: binlog.GetEntriesNum(), + TimestampFrom: binlog.GetTimestampFrom(), + TimestampTo: binlog.GetTimestampTo(), + LogPath: binlog.GetLogPath(), + LogSize: binlog.GetLogSize(), + } +} diff --git a/models/segment_state.go b/models/segment_state.go new file mode 100644 index 00000000..7e33a5cf --- /dev/null +++ b/models/segment_state.go @@ -0,0 +1,40 @@ +package models + +type SegmentState int32 + +const ( + SegmentStateSegmentStateNone SegmentState = 0 + SegmentStateNotExist SegmentState = 1 + SegmentStateGrowing SegmentState = 2 + SegmentStateSealed SegmentState = 3 + SegmentStateFlushed SegmentState = 4 + SegmentStateFlushing SegmentState = 5 + SegmentStateDropped SegmentState = 6 + SegmentStateImporting SegmentState = 7 +) + +var SegmentStatename = map[int32]string{ + 0: "SegmentStateNone", + 1: "NotExist", + 2: "Growing", + 3: "Sealed", + 4: "Flushed", + 5: "Flushing", + 6: "Dropped", + 7: "Importing", +} + +var SegmentStatevalue = map[string]int32{ + "SegmentStateNone": 0, + "NotExist": 1, + "Growing": 2, + "Sealed": 3, + "Flushed": 4, + "Flushing": 5, + "Dropped": 6, + "Importing": 7, +} + +func (x SegmentState) String() string { + return EnumName(SegmentStatename, int32(x)) +} diff --git a/models/version.go b/models/version.go new file mode 100644 index 00000000..1f163c0e --- /dev/null +++ b/models/version.go @@ -0,0 +1,6 @@ +package models + +const ( + LTEVersion2_1 = `<= 2.1.4` + GTEVersion2_2 = `>= 2.2.0` +) diff --git a/states/current_version.go b/states/current_version.go new file mode 100644 index 00000000..dd9d9a4f --- /dev/null +++ b/states/current_version.go @@ -0,0 +1,55 @@ +package states + +import ( + "fmt" + + "github.com/milvus-io/birdwatcher/models" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" + "github.com/spf13/cobra" +) + +// CurrentVersionCommand returns command for show current-version. +func CurrentVersionCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "current-version", + Run: func(_ *cobra.Command, args []string) { + fmt.Println("Current Version:", etcdversion.GetVersion()) + }, + } + return cmd +} + +// SetCurrentVersionCommand returns command for set current-version. +func SetCurrentVersionCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "set", + } + + subCmd := &cobra.Command{ + Use: "current-version", + Run: func(_ *cobra.Command, args []string) { + if len(args) != 1 { + fmt.Println("invalid parameter numbers") + return + } + + newVersion := args[0] + switch newVersion { + case models.LTEVersion2_1: + fallthrough + case "LTEVersion2_1": + etcdversion.SetVersion(models.LTEVersion2_1) + case models.GTEVersion2_2: + fallthrough + case "GTEVersion2_2": + etcdversion.SetVersion(models.GTEVersion2_2) + default: + fmt.Println("Invalid version string:", newVersion) + } + }, + } + + cmd.AddCommand(subCmd) + + return cmd +} diff --git a/states/distribution.go b/states/distribution.go new file mode 100644 index 00000000..c534bfc4 --- /dev/null +++ b/states/distribution.go @@ -0,0 +1,73 @@ +package states + +import ( + "context" + "fmt" + "time" + + commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb" + querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" + "github.com/milvus-io/birdwatcher/states/etcd/common" + "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" +) + +func GetDistributionCommand(cli clientv3.KV, basePath string) *cobra.Command { + cmd := &cobra.Command{ + Use: "segment-loaded", + Short: "list segments loaded infomration", + RunE: func(cmd *cobra.Command, args []string) error { + sessions, err := common.ListSessions(cli, basePath) + if err != nil { + return err + } + + for _, session := range sessions { + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithTimeout(2 * time.Second), + } + + conn, err := grpc.DialContext(context.Background(), session.Address, opts...) + if err != nil { + fmt.Printf("failed to connect %s(%d), err: %s\n", session.ServerName, session.ServerID, err.Error()) + continue + } + if session.ServerName == "querynode" { + fmt.Println("===========") + fmt.Printf("ServerID %d\n", session.ServerID) + clientv2 := querypbv2.NewQueryNodeClient(conn) + resp, err := clientv2.GetDataDistribution(context.Background(), &querypbv2.GetDataDistributionRequest{ + Base: &commonpbv2.MsgBase{ + SourceID: -1, + TargetID: session.ServerID, + }, + }) + if err != nil { + fmt.Println(err.Error()) + return err + } + + // print channel + for _, channel := range resp.GetChannels() { + fmt.Printf("Channel %s, collection: %d, version %d\n", channel.Channel, channel.Collection, channel.Version) + } + + for _, lv := range resp.GetLeaderViews() { + fmt.Printf("Leader view for channel: %s\n", lv.GetChannel()) + growings := lv.GetGrowingSegmentIDs() + fmt.Printf("Growing segments number: %d , ids: %v\n", len(growings), growings) + } + + fmt.Printf("Node Loaded Segments number: %d\n", len(resp.GetSegments())) + + } + } + + return nil + }, + } + return cmd +} diff --git a/states/download_pk.go b/states/download_pk.go index cce52c72..976ecaf0 100644 --- a/states/download_pk.go +++ b/states/download_pk.go @@ -18,7 +18,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func getDownloadPKCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getDownloadPKCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "download-pk", Short: "download pk column of a collection", diff --git a/states/download_segment.go b/states/download_segment.go index c03e89ef..2a7c2381 100644 --- a/states/download_segment.go +++ b/states/download_segment.go @@ -20,7 +20,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func getDownloadSegmentCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getDownloadSegmentCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "download-segment", Short: "download segment file with provided segment id", diff --git a/states/etcd/audit/audit.go b/states/etcd/audit/audit.go new file mode 100644 index 00000000..66cdc77f --- /dev/null +++ b/states/etcd/audit/audit.go @@ -0,0 +1,85 @@ +package audit + +import ( + "context" + "fmt" + "os" + + "github.com/milvus-io/birdwatcher/models" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type FileAuditKV struct { + cli clientv3.KV + file *os.File +} + +// NewFileAuditKV creates a file auditing log kv. +func NewFileAuditKV(kv clientv3.KV, file *os.File) *FileAuditKV { + return &FileAuditKV{ + cli: kv, + file: file, + } +} + +// Put puts a key-value pair into etcd. +// Note that key,value can be plain bytes array and string is +// an immutable representation of that bytes array. +// To get a string of bytes, do string([]byte{0x10, 0x20}). +func (c *FileAuditKV) Put(ctx context.Context, key string, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + opts = append(opts, clientv3.WithPrevKV()) + resp, err := c.cli.Put(ctx, key, val, opts...) + c.writeHeader(models.OpPut, 2) + if resp.PrevKv != nil { + c.writeHeader(models.OpPutBefore, 1) + c.writeLogKV(resp.PrevKv) + } + c.writeHeader(models.OpPutAfter, 1) + + return resp, err +} + +// Get retrieves keys. +// By default, Get will return the value for "key", if any. +// When passed WithRange(end), Get will return the keys in the range [key, end). +// When passed WithFromKey(), Get returns keys greater than or equal to key. +// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; +// if the required revision is compacted, the request will fail with ErrCompacted . +// When passed WithLimit(limit), the number of returned keys is bounded by limit. +// When passed WithSort(), the keys will be sorted. +func (c *FileAuditKV) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + return c.cli.Get(ctx, key, opts...) +} + +// Delete deletes a key, or optionally using WithRange(end), [key, end). +func (c *FileAuditKV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + fmt.Println("audio delete", key) + opts = append(opts, clientv3.WithPrevKV()) + resp, err := c.cli.Delete(ctx, key, opts...) + c.writeHeader(models.OpDel, int32(len(resp.PrevKvs))) + for _, kv := range resp.PrevKvs { + c.writeLogKV(kv) + } + return resp, err +} + +// Compact compacts etcd KV history before the given rev. +func (c *FileAuditKV) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { + return c.cli.Compact(ctx, rev, opts...) +} + +// Do applies a single Op on KV without a transaction. +// Do is useful when creating arbitrary operations to be issued at a +// later time; the user can range over the operations, calling Do to +// execute them. Get/Put/Delete, on the other hand, are best suited +// for when the operation should be issued at the time of declaration. +func (c *FileAuditKV) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + resp, err := c.cli.Do(ctx, op) + // TODO add do audit + return resp, err +} + +// Txn creates a transaction. +func (c *FileAuditKV) Txn(ctx context.Context) clientv3.Txn { + return c.cli.Txn(ctx) +} diff --git a/states/etcd/audit/audit_write.go b/states/etcd/audit/audit_write.go new file mode 100644 index 00000000..15c816f1 --- /dev/null +++ b/states/etcd/audit/audit_write.go @@ -0,0 +1,44 @@ +package audit + +import ( + "encoding/binary" + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/birdwatcher/models" + "go.etcd.io/etcd/api/v3/mvccpb" +) + +func (c *FileAuditKV) writeHeader(op models.AuditOpType, entriesNum int32) { + header := &models.AuditHeader{ + Version: 1, + OpType: int32(op), + EntriesNum: entriesNum, + } + bs, _ := proto.Marshal(header) + c.writeData(bs) +} + +func (c *FileAuditKV) writeLogKV(kv *mvccpb.KeyValue) { + bs, _ := proto.Marshal(kv) + c.writeData(bs) +} + +func (c *FileAuditKV) writeKeyValue(key, value string) { + kv := &mvccpb.KeyValue{ + Key: []byte(key), + Value: []byte(value), + } + c.writeLogKV(kv) +} + +func (c *FileAuditKV) writeData(data []byte) { + lb := make([]byte, 8) + binary.LittleEndian.PutUint64(lb, uint64(len(data))) + n, err := c.file.Write(lb) + fmt.Println(n, err) + if len(data) > 0 { + n, err = c.file.Write(data) + fmt.Println(n, err) + } +} diff --git a/states/etcd/commands.go b/states/etcd/commands.go index 43b2c125..b4872ce2 100644 --- a/states/etcd/commands.go +++ b/states/etcd/commands.go @@ -14,7 +14,7 @@ import ( // ShowCommand returns sub command for instanceState. // show [subCommand] [options...] // sub command [collection|session|segment] -func ShowCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func ShowCommand(cli clientv3.KV, basePath string) *cobra.Command { showCmd := &cobra.Command{ Use: "show", } @@ -57,7 +57,7 @@ func ShowCommand(cli *clientv3.Client, basePath string) *cobra.Command { } // RepairCommand returns etcd repair commands. -func RepairCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func RepairCommand(cli clientv3.KV, basePath string) *cobra.Command { repairCmd := &cobra.Command{ Use: "repair", } @@ -78,7 +78,7 @@ func RepairCommand(cli *clientv3.Client, basePath string) *cobra.Command { // RemoveCommand returns etcd remove commands. // WARNING this command shall be used with EXTRA CARE! -func RemoveCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func RemoveCommand(cli clientv3.KV, basePath string) *cobra.Command { removeCmd := &cobra.Command{ Use: "remove", } @@ -94,7 +94,7 @@ func RemoveCommand(cli *clientv3.Client, basePath string) *cobra.Command { } // RawCommands provides raw "get" command to list kv in etcd -func RawCommands(cli *clientv3.Client) []*cobra.Command { +func RawCommands(cli clientv3.KV) []*cobra.Command { cmd := &cobra.Command{ Use: "get", Run: func(cmd *cobra.Command, args []string) { diff --git a/states/etcd/common/channel.go b/states/etcd/common/channel.go index 5110f264..623ac7be 100644 --- a/states/etcd/common/channel.go +++ b/states/etcd/common/channel.go @@ -2,16 +2,21 @@ package common import ( "context" + "errors" "path" "time" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + "github.com/milvus-io/birdwatcher/proto/v2.0/internalpb" datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" + internalpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/internalpb" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" ) // ListChannelWatchV1 list v2.1 channel watch info meta. -func ListChannelWatchV1(cli *clientv3.Client, basePath string, filters ...func(channel *datapb.ChannelWatchInfo) bool) ([]datapb.ChannelWatchInfo, []string, error) { +func ListChannelWatchV1(cli clientv3.KV, basePath string, filters ...func(channel *datapb.ChannelWatchInfo) bool) ([]datapb.ChannelWatchInfo, []string, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() @@ -20,10 +25,47 @@ func ListChannelWatchV1(cli *clientv3.Client, basePath string, filters ...func(c } // ListChannelWatchV2 lists v2.2 channel watch info meta. -func ListChannelWatchV2(cli *clientv3.Client, basePath string, filters ...func(channel *datapbv2.ChannelWatchInfo) bool) ([]datapbv2.ChannelWatchInfo, []string, error) { +func ListChannelWatchV2(cli clientv3.KV, basePath string, filters ...func(channel *datapbv2.ChannelWatchInfo) bool) ([]datapbv2.ChannelWatchInfo, []string, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, "channelwatch") + "/" return ListProtoObjects(ctx, cli, prefix, filters...) } + +// ListChannelWatch lists channel watch info meta. +func ListChannelWatch(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.ChannelWatch) bool) ([]*models.ChannelWatch, error) { + prefix := path.Join(basePath, "channelwatch") + "/" + var result []*models.ChannelWatch + switch version { + case models.LTEVersion2_1: + infos, paths, err := ListProtoObjects[datapb.ChannelWatchInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + result = lo.Map(infos, func(info datapb.ChannelWatchInfo, idx int) *models.ChannelWatch { + return models.GetChannelWatchInfo[*datapb.ChannelWatchInfo, datapb.ChannelWatchState, *datapb.VchannelInfo, *internalpb.MsgPosition](&info, paths[idx]) + + }) + case models.GTEVersion2_2: + infos, paths, err := ListProtoObjects[datapbv2.ChannelWatchInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + result = lo.Map(infos, func(info datapbv2.ChannelWatchInfo, idx int) *models.ChannelWatch { + return models.GetChannelWatchInfo[*datapbv2.ChannelWatchInfo, datapbv2.ChannelWatchState, *datapbv2.VchannelInfo, *internalpbv2.MsgPosition](&info, paths[idx]) + + }) + default: + return nil, errors.New("version not supported") + } + result = lo.Filter(result, func(info *models.ChannelWatch, _ int) bool { + for _, filter := range filters { + if !filter(info) { + return false + } + } + return true + }) + return result, nil +} diff --git a/states/etcd/common/collection.go b/states/etcd/common/collection.go index b2194444..0b2ca26e 100644 --- a/states/etcd/common/collection.go +++ b/states/etcd/common/collection.go @@ -10,14 +10,18 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" - "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" "github.com/milvus-io/birdwatcher/proto/v2.0/schemapb" - querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" + etcdpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/etcdpb" + schemapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/schemapb" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" ) const ( + // CollectionMetaPrefix is prefix for rootcoord collection meta. + CollectionMetaPrefix = `root-coord/collection` // CollectionLoadPrefix is prefix for querycoord collection loaded in milvus v2.1.x CollectionLoadPrefix = "queryCoord-collectionMeta" // CollectionLoadPrefixV2 is prefix for querycoord collection loaded in milvus v2.2.x @@ -25,28 +29,80 @@ const ( ) var ( + // ErrCollectionDropped sample error for collection dropped. ErrCollectionDropped = errors.New("collection dropped") + // ErrCollectionNotFound sample error for collection not found. + ErrCollectionNotFound = errors.New("collection not found") // CollectionTombstone is the special mark for collection dropped. CollectionTombstone = []byte{0xE2, 0x9B, 0xBC} ) // ListCollections returns collection information. // the field info might not include. -func ListCollections(cli *clientv3.Client, basePath string, filter func(*etcdpb.CollectionInfo) bool) ([]etcdpb.CollectionInfo, error) { +func ListCollections(cli clientv3.KV, basePath string, filter func(*etcdpb.CollectionInfo) bool) ([]etcdpb.CollectionInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - colls, _, err := ListProtoObjectsAdv(ctx, cli, path.Join(basePath, "root-coord/collection"), func(_ string, value []byte) bool { + colls, _, err := ListProtoObjectsAdv(ctx, cli, path.Join(basePath, CollectionMetaPrefix), func(_ string, value []byte) bool { return !bytes.Equal(value, CollectionTombstone) }, filter) return colls, err } +// ListCollectionsVersion returns collection information as provided version. +func ListCollectionsVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.Collection) bool) ([]*models.Collection, error) { + prefix := path.Join(basePath, CollectionMetaPrefix) + + switch version { + case models.LTEVersion2_1: + collections, keys, err := ListProtoObjectsAdv[etcdpb.CollectionInfo](ctx, cli, prefix, func(_ string, value []byte) bool { + // TODO maybe add dropped collection info in result? + return !bytes.Equal(value, CollectionTombstone) + }) + if err != nil { + return nil, err + } + return lo.FilterMap(collections, func(collection etcdpb.CollectionInfo, idx int) (*models.Collection, bool) { + c := models.NewCollectionFromV2_1(&collection, keys[idx]) + for _, filter := range filters { + if !filter(c) { + return nil, false + } + } + return c, true + }), nil + case models.GTEVersion2_2: + collections, keys, err := ListProtoObjectsAdv[etcdpbv2.CollectionInfo](ctx, cli, prefix, func(_ string, value []byte) bool { + return !bytes.Equal(value, CollectionTombstone) + }) + if err != nil { + return nil, err + } + + return lo.FilterMap(collections, func(collection etcdpbv2.CollectionInfo, idx int) (*models.Collection, bool) { + fields, err := getCollectionFields(ctx, cli, basePath, collection.ID) + if err != nil { + fmt.Println(err.Error()) + return nil, false + } + c := models.NewCollectionFromV2_2(&collection, keys[idx], fields) + for _, filter := range filters { + if !filter(c) { + return nil, false + } + } + return c, true + }), nil + default: + return nil, fmt.Errorf("undefined version: %s", version) + } +} + // GetCollectionByID returns collection info from etcd with provided id. -func GetCollectionByID(cli *clientv3.Client, basePath string, collID int64) (*etcdpb.CollectionInfo, error) { +func GetCollectionByID(cli clientv3.KV, basePath string, collID int64) (*etcdpb.CollectionInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - resp, err := cli.Get(ctx, path.Join(basePath, "root-coord/collection", strconv.FormatInt(collID, 10))) + resp, err := cli.Get(ctx, path.Join(basePath, CollectionMetaPrefix, strconv.FormatInt(collID, 10))) if err != nil { return nil, err @@ -75,7 +131,59 @@ func GetCollectionByID(cli *clientv3.Client, basePath string, collID int64) (*et return coll, nil } -func FillFieldSchemaIfEmpty(cli *clientv3.Client, basePath string, collection *etcdpb.CollectionInfo) error { +// GetCollectionByIDVersion retruns collection info from etcd with provided version & id. +func GetCollectionByIDVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, collID int64) (*models.Collection, error) { + prefix := path.Join(basePath, CollectionMetaPrefix, strconv.FormatInt(collID, 10)) + resp, err := cli.Get(ctx, prefix) + if err != nil { + return nil, err + } + + if len(resp.Kvs) != 1 { + return nil, fmt.Errorf("collection %d not found in etcd", collID) + } + + if bytes.Equal(resp.Kvs[0].Value, CollectionTombstone) { + return nil, fmt.Errorf("%w, collection id: %d", ErrCollectionDropped, collID) + } + + switch version { + case models.LTEVersion2_1: + info := &etcdpb.CollectionInfo{} + err := proto.Unmarshal(resp.Kvs[0].Value, info) + if err != nil { + return nil, err + } + c := models.NewCollectionFromV2_1(info, string(resp.Kvs[0].Key)) + return c, nil + + case models.GTEVersion2_2: + info := &etcdpbv2.CollectionInfo{} + err := proto.Unmarshal(resp.Kvs[0].Value, info) + if err != nil { + return nil, err + } + fields, err := getCollectionFields(ctx, cli, basePath, info.ID) + if err != nil { + return nil, err + } + c := models.NewCollectionFromV2_2(info, string(resp.Kvs[0].Key), fields) + return c, nil + default: + return nil, errors.New("not supported version") + } +} + +func getCollectionFields(ctx context.Context, cli clientv3.KV, basePath string, collID int64) ([]*schemapbv2.FieldSchema, error) { + fields, _, err := ListProtoObjects[schemapbv2.FieldSchema](ctx, cli, path.Join(basePath, fmt.Sprintf("root-coord/fields/%d", collID))) + if err != nil { + fmt.Println(err.Error()) + } + return lo.Map(fields, func(field schemapbv2.FieldSchema, _ int) *schemapbv2.FieldSchema { return &field }), nil + +} + +func FillFieldSchemaIfEmpty(cli clientv3.KV, basePath string, collection *etcdpb.CollectionInfo) error { if len(collection.GetSchema().GetFields()) == 0 { // fields separated from schema after 2.1.1 resp, err := cli.Get(context.TODO(), path.Join(basePath, fmt.Sprintf("root-coord/fields/%d", collection.ID)), clientv3.WithPrefix()) if err != nil { @@ -95,34 +203,22 @@ func FillFieldSchemaIfEmpty(cli *clientv3.Client, basePath string, collection *e return nil } -// ListLoadedCollectionInfoV2_1 returns collection info from querycoord milvus v2.1.x. -func ListLoadedCollectionInfoV2_1(cli *clientv3.Client, basePath string) ([]*querypb.CollectionInfo, error) { - prefix := path.Join(basePath, CollectionLoadPrefix) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) - if err != nil { - return nil, err - } - ret := make([]*querypb.CollectionInfo, 0) - for _, kv := range resp.Kvs { - collectionInfo := &querypb.CollectionInfo{} - err = proto.Unmarshal(kv.Value, collectionInfo) +func FillFieldSchemaIfEmptyV2(cli clientv3.KV, basePath string, collection *etcdpbv2.CollectionInfo) error { + if len(collection.GetSchema().GetFields()) == 0 { // fields separated from schema after 2.1.1 + resp, err := cli.Get(context.TODO(), path.Join(basePath, fmt.Sprintf("root-coord/fields/%d", collection.ID)), clientv3.WithPrefix()) if err != nil { - return nil, err + return err + } + for _, kv := range resp.Kvs { + field := &schemapbv2.FieldSchema{} + err := proto.Unmarshal(kv.Value, field) + if err != nil { + fmt.Println("found error field:", string(kv.Key), err.Error()) + continue + } + collection.Schema.Fields = append(collection.Schema.Fields, field) } - ret = append(ret, collectionInfo) } - return ret, nil -} - -// ListLoadedCollectionInfoV2_1 returns collection info from querycoord milvus v2.2.x. -func ListLoadedCollectionInfoV2_2(cli *clientv3.Client, basePath string) ([]querypbv2.CollectionLoadInfo, error) { - prefix := path.Join(basePath, CollectionLoadPrefixV2) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - info, _, err := ListProtoObjects[querypbv2.CollectionLoadInfo](ctx, cli, prefix) - return info, err + return nil } diff --git a/states/etcd/common/collection_history.go b/states/etcd/common/collection_history.go index 83e9505c..bea031bd 100644 --- a/states/etcd/common/collection_history.go +++ b/states/etcd/common/collection_history.go @@ -6,25 +6,75 @@ import ( "path" "strconv" "strings" - "time" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" + etcdpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/etcdpb" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/runtime/protoiface" ) -type CollectionHistory struct { - Info etcdpb.CollectionInfo - Ts uint64 - Dropped bool +// ListCollectionHistory list collection history from snapshots. +func ListCollectionHistory(ctx context.Context, cli clientv3.KV, basePath string, version string, collectionID int64) ([]*models.CollectionHistory, error) { + prefix := path.Join(basePath, "snapshots/root-coord/collection", strconv.FormatInt(collectionID, 10)) + + var dropped, paths []string + var err error + var result []*models.CollectionHistory + switch version { + case models.LTEVersion2_1: + var colls []etcdpb.CollectionInfo + colls, paths, dropped, err = ListHistoryCollection[etcdpb.CollectionInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + result = lo.Map(colls, func(coll etcdpb.CollectionInfo, idx int) *models.CollectionHistory { + ch := &models.CollectionHistory{} + ch.Collection = *models.NewCollectionFromV2_1(&coll, paths[idx]) + ch.Ts = parseHistoryTs(paths[idx]) + return ch + }) + case models.GTEVersion2_2: + var colls []etcdpbv2.CollectionInfo + colls, paths, dropped, err = ListHistoryCollection[etcdpbv2.CollectionInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + result = lo.Map(colls, func(coll etcdpbv2.CollectionInfo, idx int) *models.CollectionHistory { + ch := &models.CollectionHistory{} + //TODO add history field schema + ch.Collection = *models.NewCollectionFromV2_2(&coll, paths[idx], nil) + ch.Ts = parseHistoryTs(paths[idx]) + return ch + }) + } + + for _, entry := range dropped { + collHistory := &models.CollectionHistory{Dropped: true} + collHistory.Ts = parseHistoryTs(entry) + result = append(result, collHistory) + } + + return result, nil } -// ListCollectionHistory list collection history from snapshots. -func ListCollectionHistory(cli *clientv3.Client, basePath string, collectionID int64) ([]CollectionHistory, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() +func parseHistoryTs(entry string) uint64 { + parts := strings.Split(entry, "_ts") + if len(parts) != 2 { + return 0 + } + + result, _ := strconv.ParseUint(parts[1], 10, 64) + return result +} +func ListHistoryCollection[T any, P interface { + *T + protoiface.MessageV1 +}](ctx context.Context, cli clientv3.KV, prefix string) ([]T, []string, []string, error) { var dropped []string - colls, paths, err := ListProtoObjectsAdv[etcdpb.CollectionInfo](ctx, cli, path.Join(basePath, "snapshots/root-coord/collection", strconv.FormatInt(collectionID, 10)), + colls, paths, err := ListProtoObjectsAdv[T, P](ctx, cli, prefix, func(key string, value []byte) bool { isTombstone := bytes.Equal(value, CollectionTombstone) if isTombstone { @@ -33,29 +83,8 @@ func ListCollectionHistory(cli *clientv3.Client, basePath string, collectionID i return !isTombstone }) if err != nil { - return nil, err + return nil, nil, nil, err } - result := make([]CollectionHistory, 0, len(colls)+len(dropped)) - for _, entry := range dropped { - collHistory := CollectionHistory{Dropped: true} - parts := strings.Split(entry, "_ts") - if len(parts) == 2 { - collHistory.Ts, _ = strconv.ParseUint(parts[1], 10, 64) - } - result = append(result, collHistory) - } - - for idx, coll := range colls { - collHistory := CollectionHistory{ - Info: coll, - } - path := paths[idx] - parts := strings.Split(path, "_ts") - if len(parts) == 2 { - collHistory.Ts, _ = strconv.ParseUint(parts[1], 10, 64) - } - result = append(result, collHistory) - } - return result, nil + return colls, paths, dropped, nil } diff --git a/states/etcd/common/collection_loaded.go b/states/etcd/common/collection_loaded.go new file mode 100644 index 00000000..4ba4a168 --- /dev/null +++ b/states/etcd/common/collection_loaded.go @@ -0,0 +1,39 @@ +package common + +import ( + "context" + "errors" + "path" + + "github.com/milvus-io/birdwatcher/models" + "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" + querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" + "github.com/samber/lo" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// ListCollectionLoadedInfo returns collection loaded info with provided version. +func ListCollectionLoadedInfo(ctx context.Context, cli clientv3.KV, basePath string, version string) ([]*models.CollectionLoaded, error) { + switch version { + case models.LTEVersion2_1: + prefix := path.Join(basePath, CollectionLoadPrefix) + infos, paths, err := ListProtoObjects[querypb.CollectionInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + return lo.Map(infos, func(info querypb.CollectionInfo, idx int) *models.CollectionLoaded { + return models.NewCollectionLoadedV2_1(&info, paths[idx]) + }), nil + case models.GTEVersion2_2: + prefix := path.Join(basePath, CollectionLoadPrefixV2) + infos, paths, err := ListProtoObjects[querypbv2.CollectionLoadInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + return lo.Map(infos, func(info querypbv2.CollectionLoadInfo, idx int) *models.CollectionLoaded { + return models.NewCollectionLoadedV2_2(&info, paths[idx]) + }), nil + default: + return nil, errors.New("version not supported") + } +} diff --git a/states/etcd/common/index.go b/states/etcd/common/index.go index 53068ec4..0ba6c553 100644 --- a/states/etcd/common/index.go +++ b/states/etcd/common/index.go @@ -11,7 +11,7 @@ import ( ) // ListIndex list all index with all filter satified. -func ListIndex(cli *clientv3.Client, basePath string, filters ...func(index *indexpb.IndexMeta) bool) ([]indexpb.IndexMeta, error) { +func ListIndex(cli clientv3.KV, basePath string, filters ...func(index *indexpb.IndexMeta) bool) ([]indexpb.IndexMeta, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, "indexes") + "/" @@ -21,7 +21,7 @@ func ListIndex(cli *clientv3.Client, basePath string, filters ...func(index *ind } // ListSegmentIndex list segment index info. -func ListSegmentIndex(cli *clientv3.Client, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) { +func ListSegmentIndex(cli clientv3.KV, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() diff --git a/states/etcd/common/legacy.go b/states/etcd/common/legacy.go index fe171229..fc657d6f 100644 --- a/states/etcd/common/legacy.go +++ b/states/etcd/common/legacy.go @@ -16,7 +16,7 @@ const ( QCDeltaChannelMetaPrefix = "queryCoord-deltaChannel" ) -func ListQueryCoordDMLChannelInfos(cli *clientv3.Client, basePath string) ([]*querypb.DmChannelWatchInfo, error) { +func ListQueryCoordDMLChannelInfos(cli clientv3.KV, basePath string) ([]*querypb.DmChannelWatchInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, QCDmChannelMetaPrefix) @@ -38,7 +38,7 @@ func ListQueryCoordDMLChannelInfos(cli *clientv3.Client, basePath string) ([]*qu return ret, nil } -func ListQueryCoordDeltaChannelInfos(cli *clientv3.Client, basePath string) ([]*datapb.VchannelInfo, error) { +func ListQueryCoordDeltaChannelInfos(cli clientv3.KV, basePath string) ([]*datapb.VchannelInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, QCDeltaChannelMetaPrefix) diff --git a/states/etcd/common/list.go b/states/etcd/common/list.go index c4bad790..00996120 100644 --- a/states/etcd/common/list.go +++ b/states/etcd/common/list.go @@ -13,7 +13,7 @@ import ( func ListProtoObjects[T any, P interface { *T protoiface.MessageV1 -}](ctx context.Context, cli *clientv3.Client, prefix string, filters ...func(t *T) bool) ([]T, []string, error) { +}](ctx context.Context, cli clientv3.KV, prefix string, filters ...func(t *T) bool) ([]T, []string, error) { resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, nil, err @@ -46,7 +46,7 @@ LOOP: func ListProtoObjectsAdv[T any, P interface { *T protoiface.MessageV1 -}](ctx context.Context, cli *clientv3.Client, prefix string, preFilter func(string, []byte) bool, filters ...func(t *T) bool) ([]T, []string, error) { +}](ctx context.Context, cli clientv3.KV, prefix string, preFilter func(string, []byte) bool, filters ...func(t *T) bool) ([]T, []string, error) { resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, nil, err diff --git a/states/etcd/common/replica.go b/states/etcd/common/replica.go index 0fbac31f..c50470b0 100644 --- a/states/etcd/common/replica.go +++ b/states/etcd/common/replica.go @@ -13,7 +13,7 @@ import ( ) // ListReplica list current replica info -func ListReplica(cli *clientv3.Client, basePath string, collectionID int64) ([]*models.Replica, error) { +func ListReplica(cli clientv3.KV, basePath string, collectionID int64) ([]*models.Replica, error) { v1Results, err := listReplicas(cli, basePath, func(replica *milvuspb.ReplicaInfo) bool { return collectionID == 0 || replica.GetCollectionID() == collectionID }) @@ -60,7 +60,7 @@ func ListReplica(cli *clientv3.Client, basePath string, collectionID int64) ([]* return results, nil } -func listReplicas(cli *clientv3.Client, basePath string, filters ...func(*milvuspb.ReplicaInfo) bool) ([]milvuspb.ReplicaInfo, error) { +func listReplicas(cli clientv3.KV, basePath string, filters ...func(*milvuspb.ReplicaInfo) bool) ([]milvuspb.ReplicaInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, "queryCoord-ReplicaMeta") @@ -74,7 +74,7 @@ func listReplicas(cli *clientv3.Client, basePath string, filters ...func(*milvus return replicas, nil } -func listQCReplicas(cli *clientv3.Client, basePath string, filters ...func(*querypb.Replica) bool) ([]querypb.Replica, error) { +func listQCReplicas(cli clientv3.KV, basePath string, filters ...func(*querypb.Replica) bool) ([]querypb.Replica, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() diff --git a/states/etcd/common/segment.go b/states/etcd/common/segment.go index 6696188c..e542bef1 100644 --- a/states/etcd/common/segment.go +++ b/states/etcd/common/segment.go @@ -8,17 +8,106 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" datapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/datapb" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" ) +const ( + segmentMetaPrefix = "datacoord-meta/s" +) + +// ListSegmentsVersion list segment info as specified version. +func ListSegmentsVersion(ctx context.Context, cli clientv3.KV, basePath string, version string, filters ...func(*models.Segment) bool) ([]*models.Segment, error) { + prefix := path.Join(basePath, segmentMetaPrefix) + "/" + switch version { + case models.LTEVersion2_1: + segments, keys, err := ListProtoObjects[datapb.SegmentInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + + return lo.FilterMap(segments, func(segment datapb.SegmentInfo, idx int) (*models.Segment, bool) { + s := models.NewSegmentFromV2_1(&segment, keys[idx]) + for _, filter := range filters { + if !filter(s) { + return nil, false + } + } + return s, true + }), nil + case models.GTEVersion2_2: + segments, keys, err := ListProtoObjects[datapbv2.SegmentInfo](ctx, cli, prefix) + if err != nil { + return nil, err + } + + return lo.FilterMap(segments, func(segment datapbv2.SegmentInfo, idx int) (*models.Segment, bool) { + s := models.NewSegmentFromV2_2(&segment, keys[idx], getSegmentLazyFunc(cli, basePath, segment)) + for _, filter := range filters { + if !filter(s) { + return nil, false + } + } + return s, true + }), nil + default: + return nil, fmt.Errorf("undefined version: %s", version) + } +} + +func getSegmentLazyFunc(cli clientv3.KV, basePath string, segment datapbv2.SegmentInfo) func() ([]datapbv2.FieldBinlog, []datapbv2.FieldBinlog, []datapbv2.FieldBinlog, error) { + return func() ([]datapbv2.FieldBinlog, []datapbv2.FieldBinlog, []datapbv2.FieldBinlog, error) { + prefix := path.Join(basePath, "datacoord-meta", fmt.Sprintf("binlog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) + + f := func(pb func(segment datapbv2.SegmentInfo, fieldID int64, logID int64) string) ([]datapbv2.FieldBinlog, error) { + fields, _, err := ListProtoObjects[datapbv2.FieldBinlog](context.Background(), cli, prefix) + if err != nil { + return nil, err + } + for _, field := range fields { + for _, binlog := range field.GetBinlogs() { + binlog.LogPath = pb(segment, field.GetFieldID(), binlog.GetLogID()) + } + } + return fields, err + } + + binlogs, err := f(func(segment datapbv2.SegmentInfo, fieldID int64, logID int64) string { + return fmt.Sprintf("files/insert_log/%d/%d/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID, fieldID, logID) + }) + if err != nil { + return nil, nil, nil, err + } + + prefix = path.Join(basePath, "datacoord-meta", fmt.Sprintf("statslog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) + statslogs, err := f(func(segment datapbv2.SegmentInfo, fieldID int64, logID int64) string { + return fmt.Sprintf("files/statslog/%d/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID, logID) + }) + if err != nil { + return nil, nil, nil, err + } + + prefix = path.Join(basePath, "datacoord-meta", fmt.Sprintf("deltalog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) + deltalogs, err := f(func(segment datapbv2.SegmentInfo, fieldID int64, logID int64) string { + return fmt.Sprintf("files/delta_log/%d/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID, logID) + }) + if err != nil { + return nil, nil, nil, err + } + + return binlogs, statslogs, deltalogs, nil + } +} + // ListSegments list segment info from etcd -func ListSegments(cli *clientv3.Client, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) { +func ListSegments(cli clientv3.KV, basePath string, filter func(*datapb.SegmentInfo) bool) ([]*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - resp, err := cli.Get(ctx, path.Join(basePath, "datacoord-meta/s")+"/", clientv3.WithPrefix()) + resp, err := cli.Get(ctx, path.Join(basePath, segmentMetaPrefix)+"/", clientv3.WithPrefix()) if err != nil { return nil, err } @@ -41,7 +130,7 @@ func ListSegments(cli *clientv3.Client, basePath string, filter func(*datapb.Seg } // FillFieldsIfV2 fill binlog paths fields for v2 segment info. -func FillFieldsIfV2(cli *clientv3.Client, basePath string, segment *datapb.SegmentInfo) error { +func FillFieldsIfV2(cli clientv3.KV, basePath string, segment *datapb.SegmentInfo) error { if len(segment.Binlogs) == 0 { prefix := path.Join(basePath, "datacoord-meta", fmt.Sprintf("binlog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) fields, _, err := ListProtoObjects[datapbv2.FieldBinlog](context.Background(), cli, prefix) @@ -139,7 +228,7 @@ func FillFieldsIfV2(cli *clientv3.Client, basePath string, segment *datapb.Segme } // ListLoadedSegments list v2.1 loaded segment info. -func ListLoadedSegments(cli *clientv3.Client, basePath string, filter func(*querypb.SegmentInfo) bool) ([]querypb.SegmentInfo, error) { +func ListLoadedSegments(cli clientv3.KV, basePath string, filter func(*querypb.SegmentInfo) bool) ([]querypb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, "queryCoord-segmentMeta") @@ -153,7 +242,7 @@ func ListLoadedSegments(cli *clientv3.Client, basePath string, filter func(*quer } // RemoveSegment delete segment entry from etcd. -func RemoveSegment(cli *clientv3.Client, basePath string, info *datapb.SegmentInfo) error { +func RemoveSegment(cli clientv3.KV, basePath string, info *datapb.SegmentInfo) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() diff --git a/states/etcd/common/session.go b/states/etcd/common/session.go index efeec7b4..44ce7d74 100644 --- a/states/etcd/common/session.go +++ b/states/etcd/common/session.go @@ -15,13 +15,13 @@ const ( ) // ListSessions returns all session. -func ListSessions(cli *clientv3.Client, basePath string) ([]*models.Session, error) { +func ListSessions(cli clientv3.KV, basePath string) ([]*models.Session, error) { prefix := path.Join(basePath, sessionPrefix) return ListSessionsByPrefix(cli, prefix) } // ListSessionsByPrefix returns all session with provided prefix. -func ListSessionsByPrefix(cli *clientv3.Client, prefix string) ([]*models.Session, error) { +func ListSessionsByPrefix(cli clientv3.KV, prefix string) ([]*models.Session, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) diff --git a/states/etcd/remove/channel.go b/states/etcd/remove/channel.go index 1391ff13..28d7da80 100644 --- a/states/etcd/remove/channel.go +++ b/states/etcd/remove/channel.go @@ -14,7 +14,7 @@ import ( ) // ChannelCommand returns remove channel command. -func ChannelCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "channel", Short: "Remove channel from datacoord meta with specified condition if orphan", diff --git a/states/etcd/remove/segment.go b/states/etcd/remove/segment.go index bd9f663f..05a5723e 100644 --- a/states/etcd/remove/segment.go +++ b/states/etcd/remove/segment.go @@ -8,13 +8,12 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" "github.com/milvus-io/birdwatcher/states/etcd/common" - "github.com/milvus-io/birdwatcher/states/etcd/show" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) // SegmentCommand returns remove segment command. -func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "segment", Short: "Remove segment from meta with specified segment id", @@ -45,7 +44,8 @@ func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { // dry run, display segment first if !run { - show.PrintSegmentInfo(segments[0], false) + //show.PrintSegmentInfo(segments[0], false) + fmt.Printf("segment info %v", segments[0]) return } diff --git a/states/etcd/repair/channel.go b/states/etcd/repair/channel.go index dd437f07..f1835b22 100644 --- a/states/etcd/repair/channel.go +++ b/states/etcd/repair/channel.go @@ -14,7 +14,7 @@ import ( ) // ChannelCommand returns repair channel command. -func ChannelCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "channel", Aliases: []string{"channels"}, @@ -79,7 +79,7 @@ func ChannelCommand(cli *clientv3.Client, basePath string) *cobra.Command { return cmd } -func doDatacoordWatch(cli *clientv3.Client, basePath string, collectionID int64, vchannels []string) { +func doDatacoordWatch(cli clientv3.KV, basePath string, collectionID int64, vchannels []string) { sessions, err := common.ListSessions(cli, basePath) if err != nil { fmt.Println("failed to list session") diff --git a/states/etcd/repair/checkpoint.go b/states/etcd/repair/checkpoint.go index 87297112..e7713ba6 100644 --- a/states/etcd/repair/checkpoint.go +++ b/states/etcd/repair/checkpoint.go @@ -22,7 +22,7 @@ import ( // CheckpointCommand usage: // repair checkpoint --collection 437744071571606912 --vchannel by-dev-rootcoord-dml_3_437744071571606912v1 --mq_type kafka --address localhost:9092 --set_to latest-msgid // repair checkpoint --collection 437744071571606912 --vchannel by-dev-rootcoord-dml_3_437744071571606912v1 --mq_type pulsar --address pulsar://localhost:6650 --set_to latest-msgid -func CheckpointCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func CheckpointCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "checkpoint", Short: "reset checkpoint of vchannels to latest checkpoint(or latest msgID) of physical channel", @@ -85,7 +85,7 @@ func CheckpointCommand(cli *clientv3.Client, basePath string) *cobra.Command { return cmd } -func setCheckPointWithLatestMsgID(cli *clientv3.Client, basePath string, coll *etcdpb.CollectionInfo, mqType, address, vchannel string) { +func setCheckPointWithLatestMsgID(cli clientv3.KV, basePath string, coll *etcdpb.CollectionInfo, mqType, address, vchannel string) { for _, ch := range coll.GetVirtualChannelNames() { if ch == vchannel { pChannel := ToPhysicalChannel(ch) @@ -108,7 +108,7 @@ func setCheckPointWithLatestMsgID(cli *clientv3.Client, basePath string, coll *e fmt.Printf("vchannel:%s doesn't exists in collection: %d\n", vchannel, coll.ID) } -func setCheckPointWithLatestCheckPoint(cli *clientv3.Client, basePath string, coll *etcdpb.CollectionInfo, vchannel string) { +func setCheckPointWithLatestCheckPoint(cli clientv3.KV, basePath string, coll *etcdpb.CollectionInfo, vchannel string) { pChannelName2LatestCP, err := getLatestCheckpointFromPChannel(cli, basePath) if err != nil { fmt.Println("failed to get latest cp of all pchannel", err.Error()) @@ -144,7 +144,7 @@ func setCheckPointWithLatestCheckPoint(cli *clientv3.Client, basePath string, co fmt.Printf("vchannel:%s doesn't exists in collection: %d\n", vchannel, coll.ID) } -func saveChannelCheckpoint(cli *clientv3.Client, basePath string, channelName string, pos *internalpb.MsgPosition) error { +func saveChannelCheckpoint(cli clientv3.KV, basePath string, channelName string, pos *internalpb.MsgPosition) error { key := path.Join(basePath, "datacoord-meta", "channel-cp", channelName) bs, err := proto.Marshal(pos) if err != nil { @@ -154,7 +154,7 @@ func saveChannelCheckpoint(cli *clientv3.Client, basePath string, channelName st return err } -func getLatestCheckpointFromPChannel(cli *clientv3.Client, basePath string) (map[string]*internalpb.MsgPosition, error) { +func getLatestCheckpointFromPChannel(cli clientv3.KV, basePath string) (map[string]*internalpb.MsgPosition, error) { segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { return true }) diff --git a/states/etcd/repair/segment.go b/states/etcd/repair/segment.go index 0b1fe6e9..6d26db81 100644 --- a/states/etcd/repair/segment.go +++ b/states/etcd/repair/segment.go @@ -17,7 +17,7 @@ import ( ) // SegmentCommand return repair segment command. -func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "segment", Aliases: []string{"segments"}, @@ -284,7 +284,7 @@ func integrityCheck(segment *datapb.SegmentInfo) bool { return true } -func writeRepairedSegment(cli *clientv3.Client, basePath string, segment *datapb.SegmentInfo) error { +func writeRepairedSegment(cli clientv3.KV, basePath string, segment *datapb.SegmentInfo) error { p := path.Join(basePath, fmt.Sprintf("datacoord-meta/s/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID)) bs, err := proto.Marshal(segment) diff --git a/states/etcd/repair/segment_empty.go b/states/etcd/repair/segment_empty.go index a5af581f..25d346e9 100644 --- a/states/etcd/repair/segment_empty.go +++ b/states/etcd/repair/segment_empty.go @@ -11,7 +11,7 @@ import ( ) // EmptySegmentCommand returns repair empty-segment command. -func EmptySegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func EmptySegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "empty-segment", Short: "Remove empty segment from meta", diff --git a/states/etcd/show/channel_watched.go b/states/etcd/show/channel_watched.go index 6829ac64..14aa13a5 100644 --- a/states/etcd/show/channel_watched.go +++ b/states/etcd/show/channel_watched.go @@ -1,18 +1,20 @@ package show import ( + "context" "fmt" "time" - "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) // ChannelWatchedCommand return show channel-watched commands. -func ChannelWatchedCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func ChannelWatchedCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "channel-watch", Short: "display channel watching info from data coord meta store", @@ -24,16 +26,18 @@ func ChannelWatchedCommand(cli *clientv3.Client, basePath string) *cobra.Command return } - infos, keys, err := common.ListChannelWatchV1(cli, basePath, func(channel *datapb.ChannelWatchInfo) bool { - return collID == 0 || channel.GetVchan().GetCollectionID() == collID + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + infos, err := common.ListChannelWatch(ctx, cli, basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool { + return collID == 0 || channel.Vchan.CollectionID == collID }) if err != nil { fmt.Println("failed to list channel watch info", err.Error()) return } - for i, info := range infos { - printChannelWatchInfo(info, keys[i]) + for _, info := range infos { + printChannelWatchInfo(info) } fmt.Printf("--- Total Channels: %d\n", len(infos)) @@ -43,21 +47,23 @@ func ChannelWatchedCommand(cli *clientv3.Client, basePath string) *cobra.Command return cmd } -func printChannelWatchInfo(info datapb.ChannelWatchInfo, key string) { +func printChannelWatchInfo(info *models.ChannelWatch) { fmt.Println("=============================") - fmt.Printf("key: %s\n", key) - fmt.Printf("Channel Name:%s \t WatchState: %s\n", info.GetVchan().GetChannelName(), info.GetState().String()) + fmt.Printf("key: %s\n", info.Key()) + fmt.Printf("Channel Name:%s \t WatchState: %s\n", info.Vchan.ChannelName, info.State.String()) //t, _ := ParseTS(uint64(info.GetStartTs())) //to, _ := ParseTS(uint64(info.GetTimeoutTs())) - t := time.Unix(info.GetStartTs(), 0) - to := time.Unix(0, info.GetTimeoutTs()) + t := time.Unix(info.StartTs, 0) + to := time.Unix(0, info.TimeoutTs) fmt.Printf("Channel Watch start from: %s, timeout at: %s\n", t.Format(tsPrintFormat), to.Format(tsPrintFormat)) - pos := info.GetVchan().GetSeekPosition() - startTime, _ := utils.ParseTS(pos.GetTimestamp()) - fmt.Printf("Start Position ID: %v, time: %s\n", pos.GetMsgID(), startTime.Format(tsPrintFormat)) + pos := info.Vchan.SeekPosition + if pos != nil { + startTime, _ := utils.ParseTS(pos.Timestamp) + fmt.Printf("Start Position ID: %v, time: %s\n", pos.MsgID, startTime.Format(tsPrintFormat)) + } - fmt.Printf("Unflushed segments: %v\n", info.Vchan.GetUnflushedSegmentIds()) - fmt.Printf("Flushed segments: %v\n", info.Vchan.GetFlushedSegmentIds()) - fmt.Printf("Dropped segments: %v\n", info.Vchan.GetDroppedSegmentIds()) + fmt.Printf("Unflushed segments: %v\n", info.Vchan.UnflushedSegmentIds) + fmt.Printf("Flushed segments: %v\n", info.Vchan.FlushedSegmentIds) + fmt.Printf("Dropped segments: %v\n", info.Vchan.DroppedSegmentIds) } diff --git a/states/etcd/show/checkpoint.go b/states/etcd/show/checkpoint.go index 36c6e35c..d5ddbc2e 100644 --- a/states/etcd/show/checkpoint.go +++ b/states/etcd/show/checkpoint.go @@ -15,7 +15,7 @@ import ( ) // CheckpointCommand returns show checkpoint command. -func CheckpointCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func CheckpointCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "checkpoint", Short: "list checkpoint collection vchannels", @@ -48,7 +48,7 @@ func CheckpointCommand(cli *clientv3.Client, basePath string) *cobra.Command { fmt.Printf("vchannel %s position nil\n", vchannel) } else { t, _ := utils.ParseTS(cp.GetTimestamp()) - fmt.Printf("vchannel %s seek to %v", vchannel, t) + fmt.Printf("vchannel %s seek to %v, cp channel: %s", vchannel, t, cp.ChannelName) if segmentID > 0 { fmt.Printf(", for segment ID:%d\n", segmentID) } else { @@ -62,7 +62,7 @@ func CheckpointCommand(cli *clientv3.Client, basePath string) *cobra.Command { return cmd } -func getChannelCheckpoint(cli *clientv3.Client, basePath string, channelName string) (*internalpb.MsgPosition, error) { +func getChannelCheckpoint(cli clientv3.KV, basePath string, channelName string) (*internalpb.MsgPosition, error) { prefix := path.Join(basePath, "datacoord-meta", "channel-cp", channelName) results, _, err := common.ListProtoObjects[internalpb.MsgPosition](context.Background(), cli, prefix) if err != nil { @@ -76,7 +76,7 @@ func getChannelCheckpoint(cli *clientv3.Client, basePath string, channelName str return &results[0], nil } -func getCheckpointFromSegments(cli *clientv3.Client, basePath string, collID int64, vchannel string) (*internalpb.MsgPosition, int64, error) { +func getCheckpointFromSegments(cli clientv3.KV, basePath string, collID int64, vchannel string) (*internalpb.MsgPosition, int64, error) { segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { return info.CollectionID == collID && info.InsertChannel == vchannel }) diff --git a/states/etcd/show/collection.go b/states/etcd/show/collection.go index 63d5d886..ab2fd873 100644 --- a/states/etcd/show/collection.go +++ b/states/etcd/show/collection.go @@ -1,65 +1,54 @@ package show import ( - "bytes" "context" "fmt" - "path" "sort" - "strconv" - "time" - "github.com/golang/protobuf/proto" "github.com/spf13/cobra" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" - "github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" ) // CollectionCommand returns sub command for showCmd. // show collection [options...] -func CollectionCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func CollectionCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "collections", Short: "list current available collection from RootCoord", Aliases: []string{"collection"}, - RunE: func(cmd *cobra.Command, args []string) error { + Run: func(cmd *cobra.Command, args []string) { collectionID, err := cmd.Flags().GetInt64("id") if err != nil { - return err + fmt.Println(err.Error()) + return } - var kvs []*mvccpb.KeyValue + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var collections []*models.Collection + // perform get by id to accelerate if collectionID > 0 { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - resp, err := cli.Get(ctx, path.Join(basePath, "root-coord/collection", strconv.FormatInt(collectionID, 10))) - if err != nil { - return err + var collection *models.Collection + collection, err = common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) + if err == nil { + collections = append(collections, collection) } - kvs = resp.Kvs } else { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - resp, err := cli.Get(ctx, path.Join(basePath, "root-coord/collection"), clientv3.WithPrefix()) - if err != nil { - return err - } - kvs = resp.Kvs + collections, err = common.ListCollectionsVersion(ctx, cli, basePath, etcdversion.GetVersion()) } - errors := make(map[string]error) - for _, kv := range kvs { - processCollectionKV(cli, basePath, kv, func(key string, err error) { - errors[key] = err - }) + if err != nil { + fmt.Println(err.Error()) + return } - for key, err := range errors { - fmt.Printf("key:%s meet error when trying to parse as Collection: %v\n", key, err) + for _, collection := range collections { + printCollection(collection) } - return nil }, } @@ -67,34 +56,15 @@ func CollectionCommand(cli *clientv3.Client, basePath string) *cobra.Command { return cmd } -func processCollectionKV(cli *clientv3.Client, basePath string, kv *mvccpb.KeyValue, handleErr func(key string, err error)) { - if bytes.Equal(kv.Value, common.CollectionTombstone) { - return - } - - collection := &etcdpb.CollectionInfo{} - err := proto.Unmarshal(kv.Value, collection) - if err != nil { - handleErr(string(kv.Key), err) - return - } - - err = common.FillFieldSchemaIfEmpty(cli, basePath, collection) - if err != nil { - handleErr(string(kv.Key), err) - return - } - - printCollection(collection) -} - -func printCollection(collection *etcdpb.CollectionInfo) { +func printCollection(collection *models.Collection) { fmt.Println("================================================================================") fmt.Printf("Collection ID: %d\tCollection Name: %s\n", collection.ID, collection.Schema.Name) - fmt.Printf("Partitions:\n") - for idx, partID := range collection.GetPartitionIDs() { - fmt.Printf(" - Partition ID: %d\tPartition Name: %s\n", partID, collection.GetPartitionNames()[idx]) - } + fmt.Printf("Collection State: %s\n", collection.State.String()) + /* + fmt.Printf("Partitions:\n") + for idx, partID := range collection.GetPartitionIDs() { + fmt.Printf(" - Partition ID: %d\tPartition Name: %s\n", partID, collection.GetPartitionNames()[idx]) + }*/ fmt.Printf("Fields:\n") fields := collection.Schema.Fields sort.Slice(fields, func(i, j int) bool { @@ -105,13 +75,13 @@ func printCollection(collection *etcdpb.CollectionInfo) { if field.IsPrimaryKey { fmt.Printf("\t - Primary Key, AutoID: %v\n", field.AutoID) } - for _, tp := range field.TypeParams { - fmt.Printf("\t - Type Param %s: %s\n", tp.Key, tp.Value) + for key, value := range field.Properties { + fmt.Printf("\t - Type Param %s: %s\n", key, value) } } - fmt.Printf("Consistency Level: %s\n", collection.GetConsistencyLevel().String()) - for _, startPos := range collection.StartPositions { - fmt.Printf("Start position for channel %s: %v\n", startPos.Key, startPos.Data) + fmt.Printf("Consistency Level: %s\n", collection.ConsistencyLevel.String()) + for _, channel := range collection.Channels { + fmt.Printf("Start position for channel %s(%s): %v\n", channel.PhysicalName, channel.VirtualName, channel.StartPosition.MsgID) } } diff --git a/states/etcd/show/collection_history.go b/states/etcd/show/collection_history.go index e41f4152..d77a5764 100644 --- a/states/etcd/show/collection_history.go +++ b/states/etcd/show/collection_history.go @@ -1,14 +1,13 @@ package show import ( - "bytes" "context" + "errors" "fmt" - "path" - "strconv" "time" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" @@ -16,7 +15,7 @@ import ( // CollectionHistoryCommand returns sub command for showCmd. // show collection-history [options...] -func CollectionHistoryCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func CollectionHistoryCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "collection-history", Short: "display collection change history", @@ -35,25 +34,22 @@ func CollectionHistoryCommand(cli *clientv3.Client, basePath string) *cobra.Comm ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - resp, err := cli.Get(ctx, path.Join(basePath, "root-coord/collection/", strconv.FormatInt(collectionID, 10))) + collection, err := common.GetCollectionByIDVersion(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) if err != nil { - fmt.Println("failed to get current collection status", err.Error()) - return - } - if len(resp.Kvs) == 0 { - fmt.Printf("collection id %d does not exist\n", collectionID) - } - - for _, kv := range resp.Kvs { - if bytes.Equal(kv.Value, common.CollectionTombstone) { - fmt.Println("[Current] Collection Already Dropped") - continue + switch { + case errors.Is(err, common.ErrCollectionDropped): + fmt.Printf("[Current] collection id %d already marked with Tombstone\n", collectionID) + case errors.Is(err, common.ErrCollectionNotFound): + fmt.Printf("[Current] collection id %d not found\n", collectionID) + return + default: + fmt.Println("failed to get current collection state:", err.Error()) + return } - fmt.Println("[Current] Collection still healthy") } - + printCollection(collection) // fetch history - items, err := common.ListCollectionHistory(cli, basePath, collectionID) + items, err := common.ListCollectionHistory(ctx, cli, basePath, etcdversion.GetVersion(), collectionID) if err != nil { fmt.Println("failed to list history", err.Error()) return @@ -66,10 +62,8 @@ func CollectionHistoryCommand(cli *clientv3.Client, basePath string) *cobra.Comm fmt.Println("Collection Dropped") continue } - common.FillFieldSchemaIfEmpty(cli, basePath, &item.Info) - printCollection(&item.Info) + printCollection(&item.Collection) } - }, } diff --git a/states/etcd/show/collection_loaded.go b/states/etcd/show/collection_loaded.go index faf3f308..ea0708f0 100644 --- a/states/etcd/show/collection_loaded.go +++ b/states/etcd/show/collection_loaded.go @@ -1,12 +1,14 @@ package show import ( + "context" "fmt" "sort" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" - querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -26,33 +28,35 @@ func printLoadedCollections(infos []*querypb.CollectionInfo) { } } -func printCollectionLoadInfoV2(loadInfov2 querypbv2.CollectionLoadInfo) { - fmt.Println(loadInfov2.String()) +func printCollectionLoaded(info *models.CollectionLoaded) { + fmt.Printf("Version: [%s]\tCollectionID: %d\n", info.Version, info.CollectionID) + fmt.Printf("ReplicaNumber: %d", info.ReplicaNumber) + switch info.Version { + case models.LTEVersion2_1: + fmt.Printf("\tInMemoryPercent: %d\n", info.InMemoryPercentage) + case models.GTEVersion2_2: + fmt.Printf("\tLoadStatus: %s\n", info.Status.String()) + } } // CollectionLoadedCommand return show collection-loaded command. -func CollectionLoadedCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func CollectionLoadedCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "collection-loaded", Short: "display information of loaded collection from querycoord", Aliases: []string{"collection-load"}, - RunE: func(cmd *cobra.Command, args []string) error { - collectionLoadInfos, err := common.ListLoadedCollectionInfoV2_1(cli, basePath) - + Run: func(cmd *cobra.Command, args []string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + infos, err := common.ListCollectionLoadedInfo(ctx, cli, basePath, etcdversion.GetVersion()) if err != nil { - return err + fmt.Println("failed to list collection load info:", err.Error()) + return } - printLoadedCollections(collectionLoadInfos) - loadInfov2, err := common.ListLoadedCollectionInfoV2_2(cli, basePath) - if err != nil { - return err + for _, info := range infos { + printCollectionLoaded(info) } - for _, info := range loadInfov2 { - printCollectionLoadInfoV2(info) - } - - return nil }, } return cmd diff --git a/states/etcd/show/index.go b/states/etcd/show/index.go index 8c6ac018..19669185 100644 --- a/states/etcd/show/index.go +++ b/states/etcd/show/index.go @@ -15,7 +15,7 @@ import ( ) // IndexCommand returns show index command. -func IndexCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func IndexCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "index", Aliases: []string{"indexes"}, @@ -54,7 +54,7 @@ type IndexInfoV1 struct { collectionID int64 } -func listIndexMeta(cli *clientv3.Client, basePath string) ([]IndexInfoV1, error) { +func listIndexMeta(cli clientv3.KV, basePath string) ([]IndexInfoV1, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() @@ -75,7 +75,7 @@ func listIndexMeta(cli *clientv3.Client, basePath string) ([]IndexInfoV1, error) return result, err } -func listIndexMetaV2(cli *clientv3.Client, basePath string) ([]indexpbv2.FieldIndex, error) { +func listIndexMetaV2(cli clientv3.KV, basePath string) ([]indexpbv2.FieldIndex, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() indexes, _, err := common.ListProtoObjects[indexpbv2.FieldIndex](ctx, cli, path.Join(basePath, "field-index")) diff --git a/states/etcd/show/legacy_qc_channel.go b/states/etcd/show/legacy_qc_channel.go index f74b1959..f964d9d7 100644 --- a/states/etcd/show/legacy_qc_channel.go +++ b/states/etcd/show/legacy_qc_channel.go @@ -47,7 +47,7 @@ func printNodeUnsubChannelInfos(infos []*querypb.UnsubscribeChannelInfo) { } } -func listQueryCoordUnsubChannelInfos(cli *clientv3.Client, basePath string) ([]*querypb.UnsubscribeChannelInfo, error) { +func listQueryCoordUnsubChannelInfos(cli clientv3.KV, basePath string) ([]*querypb.UnsubscribeChannelInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, unsubscribeChannelInfoPrefix) @@ -86,7 +86,7 @@ func printDeltaChannelInfos(infos []*datapb.VchannelInfo) { } // QueryCoordChannelCommand returns show querycoord-channel command. -func QueryCoordChannelCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func QueryCoordChannelCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "querycoord-channel", Short: "display querynode information from querycoord cluster", diff --git a/states/etcd/show/legacy_qc_cluster.go b/states/etcd/show/legacy_qc_cluster.go index a95da30a..8f5639ed 100644 --- a/states/etcd/show/legacy_qc_cluster.go +++ b/states/etcd/show/legacy_qc_cluster.go @@ -14,13 +14,13 @@ const ( queryNodeInfoPrefix = "queryCoord-queryNodeInfo" ) -func listQueryCoordClusterNodeInfo(cli *clientv3.Client, basePath string) ([]*models.Session, error) { +func listQueryCoordClusterNodeInfo(cli clientv3.KV, basePath string) ([]*models.Session, error) { prefix := path.Join(basePath, queryNodeInfoPrefix) return common.ListSessionsByPrefix(cli, prefix) } // QueryCoordClusterCommand returns show querycoord-cluster command. -func QueryCoordClusterCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func QueryCoordClusterCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "querycoord-cluster", Short: "display querynode information from querycoord cluster", diff --git a/states/etcd/show/legacy_qc_task.go b/states/etcd/show/legacy_qc_task.go index ce2902ea..489e8ac0 100644 --- a/states/etcd/show/legacy_qc_task.go +++ b/states/etcd/show/legacy_qc_task.go @@ -15,7 +15,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func listQueryCoordTriggerTasks(cli *clientv3.Client, basePath string) (map[UniqueID]queryCoordTask, error) { +func listQueryCoordTriggerTasks(cli clientv3.KV, basePath string) (map[UniqueID]queryCoordTask, error) { prefix := path.Join(basePath, triggerTaskPrefix) triggerTasks, err := listQueryCoordTasksByPrefix(cli, prefix) if err != nil { @@ -27,7 +27,7 @@ func listQueryCoordTriggerTasks(cli *clientv3.Client, basePath string) (map[Uniq return triggerTasks, nil } -func listQueryCoordActivateTasks(cli *clientv3.Client, basePath string) (map[UniqueID]queryCoordTask, error) { +func listQueryCoordActivateTasks(cli clientv3.KV, basePath string) (map[UniqueID]queryCoordTask, error) { prefix := path.Join(basePath, activeTaskPrefix) activateTasks, err := listQueryCoordTasksByPrefix(cli, prefix) if err != nil { @@ -39,7 +39,7 @@ func listQueryCoordActivateTasks(cli *clientv3.Client, basePath string) (map[Uni return activateTasks, nil } -func listQueryCoordTasksByPrefix(cli *clientv3.Client, prefix string) (map[UniqueID]queryCoordTask, error) { +func listQueryCoordTasksByPrefix(cli clientv3.KV, prefix string) (map[UniqueID]queryCoordTask, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() tasks := make(map[int64]queryCoordTask) @@ -64,7 +64,7 @@ func listQueryCoordTasksByPrefix(cli *clientv3.Client, prefix string) (map[Uniqu return tasks, nil } -func listQueryCoordTaskStates(cli *clientv3.Client, basePath string) (map[UniqueID]taskState, error) { +func listQueryCoordTaskStates(cli clientv3.KV, basePath string) (map[UniqueID]taskState, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() prefix := path.Join(basePath, taskInfoPrefix) @@ -101,7 +101,7 @@ func checkAndSetTaskState(tasks map[UniqueID]queryCoordTask, states map[UniqueID return nil } -func listQueryCoordTasks(cli *clientv3.Client, basePath string, filter func(task queryCoordTask) bool) (map[UniqueID]queryCoordTask, map[UniqueID]queryCoordTask, error) { +func listQueryCoordTasks(cli clientv3.KV, basePath string, filter func(task queryCoordTask) bool) (map[UniqueID]queryCoordTask, map[UniqueID]queryCoordTask, error) { triggerTasks, err := listQueryCoordTriggerTasks(cli, basePath) if err != nil { return nil, nil, err @@ -140,7 +140,7 @@ func listQueryCoordTasks(cli *clientv3.Client, basePath string, filter func(task // QueryCoordTasks returns show querycoord-tasks commands. // DEPRECATED from milvus 2.2.0. -func QueryCoordTasks(cli *clientv3.Client, basePath string) *cobra.Command { +func QueryCoordTasks(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "querycoord-task", Short: "display task information from querycoord", diff --git a/states/etcd/show/replica.go b/states/etcd/show/replica.go index 5ae5ca64..82ef745a 100644 --- a/states/etcd/show/replica.go +++ b/states/etcd/show/replica.go @@ -10,7 +10,7 @@ import ( ) // ReplicaCommand returns command for show querycoord replicas. -func ReplicaCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func ReplicaCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "replica", Short: "list current replica information from QueryCoord", diff --git a/states/etcd/show/segment.go b/states/etcd/show/segment.go index 59cff849..259cffb4 100644 --- a/states/etcd/show/segment.go +++ b/states/etcd/show/segment.go @@ -1,13 +1,14 @@ package show import ( + "context" "fmt" "sort" "time" - "github.com/milvus-io/birdwatcher/proto/v2.0/commonpb" - "github.com/milvus-io/birdwatcher/proto/v2.0/datapb" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/milvus-io/birdwatcher/utils" "github.com/spf13/cobra" @@ -15,7 +16,7 @@ import ( ) // SegmentCommand returns show segments command. -func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func SegmentCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "segment", Short: "display segment information from data coord meta store", @@ -43,10 +44,10 @@ func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { return err } - segments, err := common.ListSegments(cli, basePath, func(info *datapb.SegmentInfo) bool { - return (collID == 0 || info.CollectionID == collID) && - (segmentID == 0 || info.ID == segmentID) && - (state == "" || info.State.String() == state) + segments, err := common.ListSegmentsVersion(context.Background(), cli, basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool { + return (collID == 0 || segment.CollectionID == collID) && + (segmentID == 0 || segment.ID == segmentID) && + (state == "" || segment.State.String() == state) }) if err != nil { fmt.Println("failed to list segments", err.Error()) @@ -60,35 +61,33 @@ func SegmentCommand(cli *clientv3.Client, basePath string) *cobra.Command { fieldSize := make(map[int64]int64) for _, info := range segments { - if info.State != commonpb.SegmentState_Dropped { + if info.State != models.SegmentStateDropped { totalRC += info.NumOfRows healthy++ } switch info.State { - case commonpb.SegmentState_Growing: + case models.SegmentStateGrowing: growing++ - case commonpb.SegmentState_Sealed: + case models.SegmentStateSealed: sealed++ - case commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed: + case models.SegmentStateFlushing, models.SegmentStateFlushed: flushed++ } switch format { case "table": - common.FillFieldsIfV2(cli, basePath, info) PrintSegmentInfo(info, detail) case "line": fmt.Printf("SegmentID: %d State: %s, Row Count:%d\n", info.ID, info.State.String(), info.NumOfRows) case "statistics": - if info.GetState() != commonpb.SegmentState_Dropped { - common.FillFieldsIfV2(cli, basePath, info) + if info.State != models.SegmentStateDropped { for _, binlog := range info.GetBinlogs() { - for _, log := range binlog.GetBinlogs() { - fieldSize[binlog.FieldID] += log.GetLogSize() + for _, log := range binlog.Binlogs { + fieldSize[binlog.FieldID] += log.LogSize } } for _, statslog := range info.GetStatslogs() { - for _, binlog := range statslog.GetBinlogs() { + for _, binlog := range statslog.Binlogs { statslogSize += binlog.LogSize } } @@ -137,11 +136,11 @@ const ( ) // PrintSegmentInfo prints segments info -func PrintSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) { +func PrintSegmentInfo(info *models.Segment, detailBinlog bool) { fmt.Println("================================================================================") fmt.Printf("Segment ID: %d\n", info.ID) fmt.Printf("Segment State:%v", info.State) - if info.State == commonpb.SegmentState_Dropped { + if info.State == models.SegmentStateDropped { dropTime := time.Unix(0, int64(info.DroppedAt)) fmt.Printf("\tDropped Time: %s", dropTime.Format(tsPrintFormat)) } @@ -154,7 +153,7 @@ func PrintSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) { fmt.Printf("Compact from %v \n", info.CompactionFrom) if info.StartPosition != nil { startTime, _ := utils.ParseTS(info.GetStartPosition().GetTimestamp()) - fmt.Printf("Start Position ID: %v, time: %s\n", info.StartPosition.MsgID, startTime.Format(tsPrintFormat)) + fmt.Printf("Start Position ID: %v, time: %s, channel name %s\n", info.GetStartPosition().MsgID, startTime.Format(tsPrintFormat), info.GetStartPosition().GetChannelName()) } else { fmt.Println("Start Position: nil") } @@ -165,16 +164,17 @@ func PrintSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) { fmt.Println("Dml Position: nil") } fmt.Printf("Binlog Nums %d\tStatsLog Nums: %d\tDeltaLog Nums:%d\n", - countBinlogNum(info.Binlogs), countBinlogNum(info.Statslogs), countBinlogNum(info.Deltalogs)) + countBinlogNum(info.GetBinlogs()), countBinlogNum(info.GetStatslogs()), countBinlogNum(info.GetDeltalogs())) if detailBinlog { var binlogSize int64 fmt.Println("**************************************") fmt.Println("Binlogs:") - sort.Slice(info.Binlogs, func(i, j int) bool { - return info.Binlogs[i].FieldID < info.Binlogs[j].FieldID + sort.Slice(info.GetBinlogs(), func(i, j int) bool { + return info.GetBinlogs()[i].FieldID < info.GetBinlogs()[j].FieldID }) - for _, log := range info.Binlogs { + for _, log := range info.GetBinlogs() { + fmt.Printf("Field %d:\n", log.FieldID) for _, binlog := range log.Binlogs { fmt.Printf("Path: %s\n", binlog.LogPath) tf, _ := utils.ParseTS(binlog.TimestampFrom) @@ -188,16 +188,24 @@ func PrintSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) { fmt.Println("**************************************") fmt.Println("Statslogs:") - sort.Slice(info.Statslogs, func(i, j int) bool { - return info.Statslogs[i].FieldID < info.Statslogs[j].FieldID + sort.Slice(info.GetStatslogs(), func(i, j int) bool { + return info.GetStatslogs()[i].FieldID < info.GetStatslogs()[j].FieldID }) - for _, log := range info.Statslogs { - fmt.Printf("Field %d: %v\n", log.FieldID, log.Binlogs) + for _, log := range info.GetStatslogs() { + fmt.Printf("Field %d:\n", log.FieldID) + for _, binlog := range log.Binlogs { + fmt.Printf("Path: %s\n", binlog.LogPath) + tf, _ := utils.ParseTS(binlog.TimestampFrom) + tt, _ := utils.ParseTS(binlog.TimestampTo) + fmt.Printf("Log Size: %d \t Entry Num: %d\t TimeRange:%s-%s\n", + binlog.LogSize, binlog.EntriesNum, + tf.Format(tsPrintFormat), tt.Format(tsPrintFormat)) + } } fmt.Println("**************************************") fmt.Println("Delta Logs:") - for _, log := range info.Deltalogs { + for _, log := range info.GetDeltalogs() { for _, l := range log.Binlogs { fmt.Printf("Entries: %d From: %v - To: %v\n", l.EntriesNum, l.TimestampFrom, l.TimestampTo) fmt.Printf("Path: %v\n", l.LogPath) @@ -208,7 +216,7 @@ func PrintSegmentInfo(info *datapb.SegmentInfo, detailBinlog bool) { fmt.Println("================================================================================") } -func countBinlogNum(fbl []*datapb.FieldBinlog) int { +func countBinlogNum(fbl []*models.FieldBinlog) int { result := 0 for _, f := range fbl { result += len(f.Binlogs) diff --git a/states/etcd/show/segment_index.go b/states/etcd/show/segment_index.go index 5f5998a7..3f42e97b 100644 --- a/states/etcd/show/segment_index.go +++ b/states/etcd/show/segment_index.go @@ -17,7 +17,7 @@ import ( ) // SegmentIndexCommand returns show segment-index command. -func SegmentIndexCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func SegmentIndexCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "segment-index", Aliases: []string{"segments-index", "segment-indexes", "segments-indexes"}, @@ -142,7 +142,7 @@ func SegmentIndexCommand(cli *clientv3.Client, basePath string) *cobra.Command { return cmd } -func listSegmentIndexV2(cli *clientv3.Client, basePath string) ([]indexpbv2.SegmentIndex, error) { +func listSegmentIndexV2(cli clientv3.KV, basePath string) ([]indexpbv2.SegmentIndex, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() diff --git a/states/etcd/show/segment_loaded.go b/states/etcd/show/segment_loaded.go index c1e17f94..3824f742 100644 --- a/states/etcd/show/segment_loaded.go +++ b/states/etcd/show/segment_loaded.go @@ -10,7 +10,7 @@ import ( ) // SegmentLoadedCommand returns show segment-loaded command. -func SegmentLoadedCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func SegmentLoadedCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "segment-loaded", Short: "display segment information from querycoord", diff --git a/states/etcd/show/session.go b/states/etcd/show/session.go index 19f691fb..7bcda7da 100644 --- a/states/etcd/show/session.go +++ b/states/etcd/show/session.go @@ -10,7 +10,7 @@ import ( // SessionCommand returns show session command. // usage: show session -func SessionCommand(cli *clientv3.Client, basePath string) *cobra.Command { +func SessionCommand(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "session", Short: "list online milvus components", diff --git a/states/etcd/version/version.go b/states/etcd/version/version.go new file mode 100644 index 00000000..45f72b07 --- /dev/null +++ b/states/etcd/version/version.go @@ -0,0 +1,16 @@ +package version + +var ( + currentVersion string +) + +// SetVersion set current version manually. +func SetVersion(ver string) { + currentVersion = ver +} + +// GetVersion returns current etcd version. +// tmp solution before env supported. +func GetVersion() string { + return currentVersion +} diff --git a/states/etcd_backup.go b/states/etcd_backup.go index d3ad0142..f5d29879 100644 --- a/states/etcd_backup.go +++ b/states/etcd_backup.go @@ -69,7 +69,7 @@ func (c *milvusComponent) Type() string { // getBackupEtcdCmd returns command for backup etcd // usage: backup [component] [options...] -func getBackupEtcdCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getBackupEtcdCmd(cli clientv3.KV, basePath string) *cobra.Command { component := compAll cmd := &cobra.Command{ @@ -149,7 +149,7 @@ func writeBackupHeader(w io.Writer, version int32) error { return nil } -func backupEtcdV2(cli *clientv3.Client, base, prefix string, w *bufio.Writer, ignoreRevision bool) error { +func backupEtcdV2(cli clientv3.KV, base, prefix string, w *bufio.Writer, ignoreRevision bool) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := cli.Get(ctx, path.Join(base, prefix), clientv3.WithCountOnly(), clientv3.WithPrefix()) @@ -239,7 +239,7 @@ func backupEtcdV2(cli *clientv3.Client, base, prefix string, w *bufio.Writer, ig return nil } -func backupMetrics(cli *clientv3.Client, basePath string, w *bufio.Writer) error { +func backupMetrics(cli clientv3.KV, basePath string, w *bufio.Writer) error { sessions, err := common.ListSessions(cli, basePath) if err != nil { return err @@ -280,7 +280,7 @@ func backupMetrics(cli *clientv3.Client, basePath string, w *bufio.Writer) error return nil } -func backupAppMetrics(cli *clientv3.Client, basePath string, w *bufio.Writer) error { +func backupAppMetrics(cli clientv3.KV, basePath string, w *bufio.Writer) error { sessions, err := common.ListSessions(cli, basePath) if err != nil { return err @@ -350,7 +350,7 @@ func backupAppMetrics(cli *clientv3.Client, basePath string, w *bufio.Writer) er } -func backupConfiguration(cli *clientv3.Client, basePath string, w *bufio.Writer) error { +func backupConfiguration(cli clientv3.KV, basePath string, w *bufio.Writer) error { sessions, err := common.ListSessions(cli, basePath) if err != nil { return err diff --git a/states/etcd_connect.go b/states/etcd_connect.go index d804505f..bf6bf09e 100644 --- a/states/etcd_connect.go +++ b/states/etcd_connect.go @@ -15,7 +15,7 @@ const ( metaPath = `meta` ) -func pingEtcd(ctx context.Context, cli *clientv3.Client) error { +func pingEtcd(ctx context.Context, cli clientv3.KV) error { _, err := cli.Get(ctx, "ping") return err } @@ -87,7 +87,7 @@ func getConnectCommand(state State) *cobra.Command { } // findMilvusInstance iterate all possible rootPath -func findMilvusInstance(cli *clientv3.Client) ([]string, error) { +func findMilvusInstance(cli clientv3.KV) ([]string, error) { var apps []string current := "" for { @@ -116,7 +116,7 @@ func findMilvusInstance(cli *clientv3.Client) ([]string, error) { return apps, nil } -func getFindMilvusCmd(cli *clientv3.Client, state *etcdConnectedState) *cobra.Command { +func getFindMilvusCmd(cli clientv3.KV, state *etcdConnectedState) *cobra.Command { cmd := &cobra.Command{ Use: "find-milvus", Short: "search etcd kvs to find milvus instance", @@ -137,7 +137,7 @@ func getFindMilvusCmd(cli *clientv3.Client, state *etcdConnectedState) *cobra.Co return cmd } -func getUseCmd(cli *clientv3.Client, state State) *cobra.Command { +func getUseCmd(cli clientv3.KV, state State) *cobra.Command { cmd := &cobra.Command{ Use: "use [instance name]", Short: "use specified milvus instance", diff --git a/states/etcd_restore.go b/states/etcd_restore.go index ae7f6dfd..2e0fd3f7 100644 --- a/states/etcd_restore.go +++ b/states/etcd_restore.go @@ -18,7 +18,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func restoreFromV1File(cli *clientv3.Client, rd io.Reader, header *models.BackupHeader) error { +func restoreFromV1File(cli clientv3.KV, rd io.Reader, header *models.BackupHeader) error { var nextBytes uint64 var bs []byte @@ -115,7 +115,7 @@ func restoreV2File(rd *bufio.Reader, state *embedEtcdMockState) error { } } -func restoreEtcdFromBackV2(cli *clientv3.Client, rd io.Reader, ph models.PartHeader) (string, error) { +func restoreEtcdFromBackV2(cli clientv3.KV, rd io.Reader, ph models.PartHeader) (string, error) { meta := make(map[string]string) err := json.Unmarshal(ph.Extra, &meta) if err != nil { diff --git a/states/force_release.go b/states/force_release.go index d8294769..86d0e2f7 100644 --- a/states/force_release.go +++ b/states/force_release.go @@ -6,15 +6,17 @@ import ( "path" "time" + "github.com/milvus-io/birdwatcher/models" "github.com/milvus-io/birdwatcher/proto/v2.0/querypb" "github.com/milvus-io/birdwatcher/states/etcd/common" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) // getForceReleaseCmd returns command for force-release // usage: force-release [flags] -func getForceReleaseCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getForceReleaseCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "force-release", Short: "Force release the collections from QueryCoord", @@ -45,12 +47,19 @@ func getForceReleaseCmd(cli *clientv3.Client, basePath string) *cobra.Command { } // getReleaseDroppedCollectionCmd returns command for release-dropped-collection -func getReleaseDroppedCollectionCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getReleaseDroppedCollectionCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "release-dropped-collection", Short: "Clean loaded collections meta if it's dropped from QueryCoord", Run: func(cmd *cobra.Command, args []string) { - collectionLoadInfos, err := common.ListLoadedCollectionInfoV2_1(cli, basePath) + if etcdversion.GetVersion() != models.LTEVersion2_1 { + fmt.Println("force-release only for Milvus version <= 2.1.4") + return + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // force release only for version <= v2.1.4 + collectionLoadInfos, err := common.ListCollectionLoadedInfo(ctx, cli, basePath, models.LTEVersion2_1) if err != nil { fmt.Println("failed to list loaded collections", err.Error()) return @@ -86,7 +95,7 @@ func getReleaseDroppedCollectionCmd(cli *clientv3.Client, basePath string) *cobr return cmd } -func releaseQueryCoordLoadMeta(cli *clientv3.Client, basePath string, collectionID int64) error { +func releaseQueryCoordLoadMeta(cli clientv3.KV, basePath string, collectionID int64) error { p := path.Join(basePath, common.CollectionLoadPrefix, fmt.Sprintf("%d", collectionID)) ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() diff --git a/states/garbage_collect.go b/states/garbage_collect.go index f1204f0e..d9f342db 100644 --- a/states/garbage_collect.go +++ b/states/garbage_collect.go @@ -19,7 +19,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func getGarbageCollectCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getGarbageCollectCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "garbage-collect", Short: "scan oss of milvus instance for garbage(dry-run)", @@ -64,7 +64,7 @@ const ( deltaLogPrefix = `delta_log` ) -func garbageCollect(cli *clientv3.Client, basePath string, minioClient *minio.Client, minioRootPath string, bucketName string) { +func garbageCollect(cli clientv3.KV, basePath string, minioClient *minio.Client, minioRootPath string, bucketName string) { segments, err := common.ListSegments(cli, basePath, func(*datapb.SegmentInfo) bool { return true }) if err != nil { diff --git a/states/inspect_primary_key.go b/states/inspect_primary_key.go index 2c9bc377..0233f956 100644 --- a/states/inspect_primary_key.go +++ b/states/inspect_primary_key.go @@ -14,7 +14,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func getInspectPKCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getInspectPKCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "inspect-pk [segment id]", diff --git a/states/instance.go b/states/instance.go index 7ddbb278..7510191b 100644 --- a/states/instance.go +++ b/states/instance.go @@ -2,9 +2,11 @@ package states import ( "fmt" + "os" "path" "github.com/milvus-io/birdwatcher/states/etcd" + "github.com/milvus-io/birdwatcher/states/etcd/audit" "github.com/spf13/cobra" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -13,11 +15,18 @@ import ( type instanceState struct { cmdState instanceName string - client *clientv3.Client + client clientv3.KV + auditFile *os.File etcdState State } +func (s *instanceState) Close() { + if s.auditFile != nil { + s.auditFile.Close() + } +} + // SetupCommands setups the command. // also called after each command run to reset flag values. func (s *instanceState) SetupCommands() { @@ -28,11 +37,16 @@ func (s *instanceState) SetupCommands() { basePath := path.Join(instanceName, metaPath) + showCmd := etcd.ShowCommand(cli, basePath) + showCmd.AddCommand( + CurrentVersionCommand(), + ) + cmd.AddCommand( // download-segment getDownloadSegmentCmd(cli, basePath), // show [subcommand] options... - etcd.ShowCommand(cli, basePath), + showCmd, // repair [subcommand] options... etcd.RepairCommand(cli, basePath), // remove [subcommand] options... @@ -53,6 +67,12 @@ func (s *instanceState) SetupCommands() { // update-log-level log_level_name component serverId getUpdateLogLevelCmd(cli, basePath), + // segment-loaded + GetDistributionCommand(cli, basePath), + + // set current-version + SetCurrentVersionCommand(), + // remove-segment-by-id //removeSegmentByID(cli, basePath), // garbage-collect @@ -75,7 +95,7 @@ func (s *instanceState) SetupCommands() { } // getDryModeCmd enter dry-mode -func getDryModeCmd(cli *clientv3.Client, state *instanceState, etcdState State) *cobra.Command { +func getDryModeCmd(cli clientv3.KV, state *instanceState, etcdState State) *cobra.Command { cmd := &cobra.Command{ Use: "dry-mode", Short: "enter dry mode to select instance", @@ -86,14 +106,24 @@ func getDryModeCmd(cli *clientv3.Client, state *instanceState, etcdState State) return cmd } -func getInstanceState(cli *clientv3.Client, instanceName string, etcdState State) State { +func getInstanceState(cli clientv3.KV, instanceName string, etcdState State) State { + var kv clientv3.KV + file, err := os.OpenFile("audit.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, os.ModeAppend) + if err != nil { + fmt.Println("failed to open audit.log file!") + kv = cli + } else { + kv = audit.NewFileAuditKV(cli, file) + } + // use audit kv state := &instanceState{ cmdState: cmdState{ label: fmt.Sprintf("Milvus(%s)", instanceName), }, instanceName: instanceName, - client: cli, + client: kv, + auditFile: file, etcdState: etcdState, } diff --git a/states/kill.go b/states/kill.go index ef5c6638..3568491d 100644 --- a/states/kill.go +++ b/states/kill.go @@ -16,7 +16,7 @@ import ( // getEtcdKillCmd returns command for kill component session // usage: kill component -func getEtcdKillCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getEtcdKillCmd(cli clientv3.KV, basePath string) *cobra.Command { component := compAll cmd := &cobra.Command{ @@ -45,7 +45,7 @@ func getEtcdKillCmd(cli *clientv3.Client, basePath string) *cobra.Command { return cmd } -func etcdKillComponent(cli *clientv3.Client, key string, id int64) error { +func etcdKillComponent(cli clientv3.KV, key string, id int64) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := cli.Get(ctx, key) diff --git a/states/metrics.go b/states/metrics.go index 41923c51..99ba1dc7 100644 --- a/states/metrics.go +++ b/states/metrics.go @@ -12,7 +12,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -func getFetchMetricsCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getFetchMetricsCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "fetch-metrics", Short: "fetch metrics from milvus instances", diff --git a/states/start.go b/states/start.go index 81d599b1..b6cc95c8 100644 --- a/states/start.go +++ b/states/start.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/milvus-io/birdwatcher/configs" + "github.com/milvus-io/birdwatcher/models" + etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version" "github.com/spf13/cobra" ) @@ -30,6 +32,8 @@ func Start() State { fmt.Println("[WARN] load config file failed", err.Error()) } + etcdversion.SetVersion(models.GTEVersion2_2) + root.AddCommand( // connect getConnectCommand(state), diff --git a/states/update_log_level.go b/states/update_log_level.go index 7099a587..32f1a728 100644 --- a/states/update_log_level.go +++ b/states/update_log_level.go @@ -19,7 +19,7 @@ import ( // TODO read port from config const httpAPIListenPort = 9091 -func getShowLogLevelCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getShowLogLevelCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "show-log-level", Short: "show log level of milvus roles", @@ -62,7 +62,7 @@ func GetLogLevel(httpClient *http.Client, session *models.Session) error { return nil } -func getUpdateLogLevelCmd(cli *clientv3.Client, basePath string) *cobra.Command { +func getUpdateLogLevelCmd(cli clientv3.KV, basePath string) *cobra.Command { cmd := &cobra.Command{ Use: "update-log-level log_level [component] [serverId]", Short: "update log level of milvus role ", diff --git a/states/util.go b/states/util.go index c98d4eb3..2db8a732 100644 --- a/states/util.go +++ b/states/util.go @@ -51,7 +51,7 @@ func ParseTS(ts uint64) (time.Time, uint64) { } // listSessions returns all session -func listSessionsByPrefix(cli *clientv3.Client, prefix string) ([]*models.Session, error) { +func listSessionsByPrefix(cli clientv3.KV, prefix string) ([]*models.Session, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix()) diff --git a/states/visit.go b/states/visit.go index 355526ea..b7aace5a 100644 --- a/states/visit.go +++ b/states/visit.go @@ -34,7 +34,7 @@ func getSessionTypes() []string { } } -func getVisitCmd(state State, cli *clientv3.Client, basePath string) *cobra.Command { +func getVisitCmd(state State, cli clientv3.KV, basePath string) *cobra.Command { callCmd := &cobra.Command{ Use: "visit", Short: "enter state that could visit some service of component", @@ -74,7 +74,7 @@ func setNextState(sessionType string, conn *grpc.ClientConn, statePtr *State, se } } -func getSessionConnect(cli *clientv3.Client, basePath string, id int64, sessionType string) (session *models.Session, conn *grpc.ClientConn, err error) { +func getSessionConnect(cli clientv3.KV, basePath string, id int64, sessionType string) (session *models.Session, conn *grpc.ClientConn, err error) { sessions, err := common.ListSessions(cli, basePath) if err != nil { fmt.Println("failed to list session, err:", err.Error()) @@ -101,7 +101,7 @@ func getSessionConnect(cli *clientv3.Client, basePath string, id int64, sessionT return nil, nil, errors.New("invalid id") } -func getVisitSessionCmds(state State, cli *clientv3.Client, basePath string) []*cobra.Command { +func getVisitSessionCmds(state State, cli clientv3.KV, basePath string) []*cobra.Command { sessionCmds := make([]*cobra.Command, 0, len(getSessionTypes())) sessionTypes := getSessionTypes()