Skip to content

Commit

Permalink
Add audit log and unify v2.1-2.2 proto logic
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Feb 20, 2023
1 parent 61b039c commit ead8f27
Show file tree
Hide file tree
Showing 65 changed files with 1,733 additions and 307 deletions.
40 changes: 40 additions & 0 deletions models/audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package models

import "github.com/golang/protobuf/proto"

// AuditHeader stores birdwatcher audit log header info.
type AuditHeader struct {
// Version number for audit format
Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
// OpType enum for operation type
OpType int32 `protobuf:"varint,2,opt,name=op_type,proto3" json:"op_type,omitempty"`
// EntriesNum following entries number
EntriesNum int32 `protobuf:"varint,3,opt,name=entries_num,proto3" json:"entries_num,omitempty"`
}

// Reset implements protoiface.MessageV1
func (v *AuditHeader) Reset() {
*v = AuditHeader{}
}

// String implements protoiface.MessageV1
func (v *AuditHeader) String() string {
return proto.CompactTextString(v)
}

// String implements protoiface.MessageV1
func (v *AuditHeader) ProtoMessage() {}

// AuditOpType operation enum type for audit log.
type AuditOpType int32

const (
// OpDel operation type for delete.
OpDel AuditOpType = 1
// OpPut operation type for put.
OpPut AuditOpType = 2
// OpPutBefore sub header for put before value.
OpPutBefore AuditOpType = 3
// OpPutAfter sub header for put after value.
OpPutAfter AuditOpType = 4
)
7 changes: 7 additions & 0 deletions models/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package models

type Channel struct {
PhysicalName string
VirtualName string
StartPosition *MsgPosition
}
64 changes: 64 additions & 0 deletions models/channel_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package models

type ChannelWatch struct {
Vchan VChannelInfo
StartTs int64
State ChannelWatchState
TimeoutTs int64

// key
key string
}

func (c *ChannelWatch) Key() string {
return c.key
}

type VChannelInfo struct {
CollectionID int64
ChannelName string
SeekPosition *MsgPosition
UnflushedSegmentIds []int64
FlushedSegmentIds []int64
DroppedSegmentIds []int64
}

type vchannelInfoBase interface {
GetCollectionID() int64
GetChannelName() string
GetUnflushedSegmentIds() []int64
GetFlushedSegmentIds() []int64
GetDroppedSegmentIds() []int64
}

func GetChannelWatchInfo[ChannelWatchBase interface {
GetVchan() vchan
GetStartTs() int64
GetState() watchState
GetTimeoutTs() int64
}, watchState ~int32, vchan interface {
vchannelInfoBase
GetSeekPosition() pos
}, pos msgPosBase](info ChannelWatchBase, key string) *ChannelWatch {
return &ChannelWatch{
Vchan: getVChannelInfo[vchan, pos](info.GetVchan()),
StartTs: info.GetStartTs(),
State: ChannelWatchState(info.GetState()),
TimeoutTs: info.GetTimeoutTs(),
key: key,
}
}

func getVChannelInfo[info interface {
vchannelInfoBase
GetSeekPosition() pos
}, pos msgPosBase](vchan info) VChannelInfo {
return VChannelInfo{
CollectionID: vchan.GetCollectionID(),
ChannelName: vchan.GetChannelName(),
UnflushedSegmentIds: vchan.GetUnflushedSegmentIds(),
FlushedSegmentIds: vchan.GetFlushedSegmentIds(),
DroppedSegmentIds: vchan.GetDroppedSegmentIds(),
SeekPosition: newMsgPosition(vchan.GetSeekPosition()),
}
}
40 changes: 40 additions & 0 deletions models/channel_watch_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package models

type ChannelWatchState int32

const (
ChannelWatchState_Uncomplete ChannelWatchState = 0
ChannelWatchState_Complete ChannelWatchState = 1
ChannelWatchState_ToWatch ChannelWatchState = 2
ChannelWatchState_WatchSuccess ChannelWatchState = 3
ChannelWatchState_WatchFailure ChannelWatchState = 4
ChannelWatchState_ToRelease ChannelWatchState = 5
ChannelWatchState_ReleaseSuccess ChannelWatchState = 6
ChannelWatchState_ReleaseFailure ChannelWatchState = 7
)

var ChannelWatchState_name = map[int32]string{
0: "Uncomplete",
1: "Complete",
2: "ToWatch",
3: "WatchSuccess",
4: "WatchFailure",
5: "ToRelease",
6: "ReleaseSuccess",
7: "ReleaseFailure",
}

var ChannelWatchState_value = map[string]int32{
"Uncomplete": 0,
"Complete": 1,
"ToWatch": 2,
"WatchSuccess": 3,
"WatchFailure": 4,
"ToRelease": 5,
"ReleaseSuccess": 6,
"ReleaseFailure": 7,
}

func (x ChannelWatchState) String() string {
return EnumName(ChannelWatchState_name, int32(x))
}
129 changes: 129 additions & 0 deletions models/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package models

import (
"sync"

"github.com/milvus-io/birdwatcher/proto/v2.0/commonpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/schemapb"
commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb"
etcdpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/etcdpb"
schemapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/schemapb"
"github.com/samber/lo"
)

// Collection model for collection information.
type Collection struct {
ID int64
// TODO partitions
Schema CollectionSchema
CreateTime uint64
Channels []Channel
ShardsNum int32
ConsistencyLevel ConsistencyLevel
State CollectionState
Properties map[string]string

// etcd collection key
key string

// lazy load func
loadOnce sync.Once
lazyLoad func(*Collection)
}

// CollectionHistory collection models with extra history data.
type CollectionHistory struct {
Collection
Ts uint64
Dropped bool
}

// newCollectionFromBase fetchs common base information form proto objects
func newCollectionFromBase[collectionBase interface {
GetID() int64
GetCreateTime() uint64
GetShardsNum() int32
GetPhysicalChannelNames() []string
GetVirtualChannelNames() []string
GetStartPositions() []KD
}, KD interface {
GetKey() string
GetData() []byte
}](info collectionBase) *Collection {
c := &Collection{}

c.ID = info.GetID()
c.CreateTime = info.GetCreateTime()
c.ShardsNum = info.GetShardsNum()
c.Channels = getChannels(info.GetPhysicalChannelNames(), info.GetVirtualChannelNames(), info.GetStartPositions())

return c
}

// NewCollectionFrom2_1 parses etcdpb.CollectionInfo(proto v2.0) to models.Collection.
func NewCollectionFromV2_1(info *etcdpb.CollectionInfo, key string) *Collection {
c := newCollectionFromBase[*etcdpb.CollectionInfo, *commonpb.KeyDataPair](info)
c.key = key
schema := info.GetSchema()
c.Schema = newSchemaFromBase(schema)
c.Schema.Fields = lo.Map(schema.GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapb.FieldSchema, schemapb.DataType](fieldSchema)
fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams())
return fs
})
// hard code created for version <= v2.1.4
c.State = CollectionState_CollectionCreated
c.ConsistencyLevel = ConsistencyLevel(info.GetConsistencyLevel())

return c
}

// NewCollectionFromV2_2 parses etcdpb.CollectionInfo(proto v2.2) to models.Collections.
func NewCollectionFromV2_2(info *etcdpbv2.CollectionInfo, key string, fields []*schemapbv2.FieldSchema) *Collection {
c := newCollectionFromBase[*etcdpbv2.CollectionInfo, *commonpbv2.KeyDataPair](info)
c.key = key
c.State = CollectionState(info.GetState())
schema := info.GetSchema()
schema.Fields = fields
c.Schema = newSchemaFromBase(schema)

c.Schema.Fields = lo.Map(fields, func(fieldSchema *schemapbv2.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapbv2.FieldSchema, schemapbv2.DataType](fieldSchema)
fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams())
return fs
})

c.ConsistencyLevel = ConsistencyLevel(info.GetConsistencyLevel())
info.GetStartPositions()

return c
}

func getChannels[cp interface {
GetKey() string
GetData() []byte
}](pcs, vcs []string, cps []cp) []Channel {
return lo.Map(cps, func(c cp, idx int) Channel {
return Channel{
PhysicalName: pcs[idx],
VirtualName: vcs[idx],
StartPosition: &MsgPosition{
ChannelName: c.GetKey(),
MsgID: c.GetData(),
},
}
})
}

// GetMapFromKVPairs parses kv pairs to map[string]string.
func GetMapFromKVPairs[kvPair interface {
GetKey() string
GetValue() string
}](pairs []kvPair) map[string]string {
result := make(map[string]string)
for _, kv := range pairs {
result[kv.GetKey()] = kv.GetValue()
}
return result
}
110 changes: 110 additions & 0 deletions models/collection_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package models

import (
"github.com/milvus-io/birdwatcher/proto/v2.0/querypb"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
)

// CollectionLoaded models collection loaded information.
type CollectionLoaded struct {
CollectionID int64
/* v1 deprecated
PartitionIDs []int64
PartitionStates []PartitionState
*/
LoadType LoadType
Schema CollectionSchema
ReleasedPartitionIDs []int64
InMemoryPercentage int64
ReplicaIDs []int64
ReplicaNumber int32
// since 2.2
Status LoadStatus
FieldIndexID map[int64]int64

// orignial etcd key
key string
Version string
}

func newCollectionLoadedBase[base interface {
GetCollectionID() int64
GetReplicaNumber() int32
}](info base) *CollectionLoaded {
return &CollectionLoaded{
CollectionID: info.GetCollectionID(),
ReplicaNumber: info.GetReplicaNumber(),
}
}

func NewCollectionLoadedV2_1(info *querypb.CollectionInfo, key string) *CollectionLoaded {
c := newCollectionLoadedBase(info)
c.ReleasedPartitionIDs = info.GetPartitionIDs()

c.LoadType = LoadType(info.GetLoadType())
c.Schema = newSchemaFromBase(info.GetSchema())
c.InMemoryPercentage = info.GetInMemoryPercentage()
c.ReplicaIDs = info.GetReplicaIds()
c.Version = LTEVersion2_1
c.key = key

return c
}

func NewCollectionLoadedV2_2(info *querypbv2.CollectionLoadInfo, key string) *CollectionLoaded {
c := newCollectionLoadedBase(info)
c.ReleasedPartitionIDs = info.GetReleasedPartitions()
c.Status = LoadStatus(info.GetStatus())
c.FieldIndexID = info.GetFieldIndexID()
c.Version = GTEVersion2_2
c.key = key
return c
}

type LoadType int32

const (
LoadType_UnKnownType LoadType = 0
LoadType_LoadPartition LoadType = 1
LoadType_LoadCollection LoadType = 2
)

var LoadType_name = map[int32]string{
0: "UnKnownType",
1: "LoadPartition",
2: "LoadCollection",
}

var LoadType_value = map[string]int32{
"UnKnownType": 0,
"LoadPartition": 1,
"LoadCollection": 2,
}

func (x LoadType) String() string {
return EnumName(LoadType_name, int32(x))
}

type LoadStatus int32

const (
LoadStatus_Invalid LoadStatus = 0
LoadStatus_Loading LoadStatus = 1
LoadStatus_Loaded LoadStatus = 2
)

var LoadStatus_name = map[int32]string{
0: "Invalid",
1: "Loading",
2: "Loaded",
}

var LoadStatus_value = map[string]int32{
"Invalid": 0,
"Loading": 1,
"Loaded": 2,
}

func (x LoadStatus) String() string {
return EnumName(LoadStatus_name, int32(x))
}
Loading

0 comments on commit ead8f27

Please sign in to comment.