Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add audit log and unify v2.1-2.2 proto logic #93

Merged
merged 1 commit into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions models/audit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package models

import "github.com/golang/protobuf/proto"

// AuditHeader stores birdwatcher audit log header info.
type AuditHeader struct {
// Version number for audit format
Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
// OpType enum for operation type
OpType int32 `protobuf:"varint,2,opt,name=op_type,proto3" json:"op_type,omitempty"`
// EntriesNum following entries number
EntriesNum int32 `protobuf:"varint,3,opt,name=entries_num,proto3" json:"entries_num,omitempty"`
}

// Reset implements protoiface.MessageV1
func (v *AuditHeader) Reset() {
*v = AuditHeader{}
}

// String implements protoiface.MessageV1
func (v *AuditHeader) String() string {
return proto.CompactTextString(v)
}

// String implements protoiface.MessageV1
func (v *AuditHeader) ProtoMessage() {}

// AuditOpType operation enum type for audit log.
type AuditOpType int32

const (
// OpDel operation type for delete.
OpDel AuditOpType = 1
// OpPut operation type for put.
OpPut AuditOpType = 2
// OpPutBefore sub header for put before value.
OpPutBefore AuditOpType = 3
// OpPutAfter sub header for put after value.
OpPutAfter AuditOpType = 4
)
7 changes: 7 additions & 0 deletions models/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package models

type Channel struct {
PhysicalName string
VirtualName string
StartPosition *MsgPosition
}
64 changes: 64 additions & 0 deletions models/channel_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package models

type ChannelWatch struct {
Vchan VChannelInfo
StartTs int64
State ChannelWatchState
TimeoutTs int64

// key
key string
}

func (c *ChannelWatch) Key() string {
return c.key
}

type VChannelInfo struct {
CollectionID int64
ChannelName string
SeekPosition *MsgPosition
UnflushedSegmentIds []int64
FlushedSegmentIds []int64
DroppedSegmentIds []int64
}

type vchannelInfoBase interface {
GetCollectionID() int64
GetChannelName() string
GetUnflushedSegmentIds() []int64
GetFlushedSegmentIds() []int64
GetDroppedSegmentIds() []int64
}

func GetChannelWatchInfo[ChannelWatchBase interface {
GetVchan() vchan
GetStartTs() int64
GetState() watchState
GetTimeoutTs() int64
}, watchState ~int32, vchan interface {
vchannelInfoBase
GetSeekPosition() pos
}, pos msgPosBase](info ChannelWatchBase, key string) *ChannelWatch {
return &ChannelWatch{
Vchan: getVChannelInfo[vchan, pos](info.GetVchan()),
StartTs: info.GetStartTs(),
State: ChannelWatchState(info.GetState()),
TimeoutTs: info.GetTimeoutTs(),
key: key,
}
}

func getVChannelInfo[info interface {
vchannelInfoBase
GetSeekPosition() pos
}, pos msgPosBase](vchan info) VChannelInfo {
return VChannelInfo{
CollectionID: vchan.GetCollectionID(),
ChannelName: vchan.GetChannelName(),
UnflushedSegmentIds: vchan.GetUnflushedSegmentIds(),
FlushedSegmentIds: vchan.GetFlushedSegmentIds(),
DroppedSegmentIds: vchan.GetDroppedSegmentIds(),
SeekPosition: newMsgPosition(vchan.GetSeekPosition()),
}
}
40 changes: 40 additions & 0 deletions models/channel_watch_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package models

type ChannelWatchState int32

const (
ChannelWatchStateUncomplete ChannelWatchState = 0
ChannelWatchStateComplete ChannelWatchState = 1
ChannelWatchStateToWatch ChannelWatchState = 2
ChannelWatchStateWatchSuccess ChannelWatchState = 3
ChannelWatchStateWatchFailure ChannelWatchState = 4
ChannelWatchStateToRelease ChannelWatchState = 5
ChannelWatchStateReleaseSuccess ChannelWatchState = 6
ChannelWatchStateReleaseFailure ChannelWatchState = 7
)

var ChannelWatchStatename = map[int32]string{
0: "Uncomplete",
1: "Complete",
2: "ToWatch",
3: "WatchSuccess",
4: "WatchFailure",
5: "ToRelease",
6: "ReleaseSuccess",
7: "ReleaseFailure",
}

var ChannelWatchStatevalue = map[string]int32{
"Uncomplete": 0,
"Complete": 1,
"ToWatch": 2,
"WatchSuccess": 3,
"WatchFailure": 4,
"ToRelease": 5,
"ReleaseSuccess": 6,
"ReleaseFailure": 7,
}

func (x ChannelWatchState) String() string {
return EnumName(ChannelWatchStatename, int32(x))
}
129 changes: 129 additions & 0 deletions models/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package models

import (
"sync"

"github.com/milvus-io/birdwatcher/proto/v2.0/commonpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/schemapb"
commonpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/commonpb"
etcdpbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/etcdpb"
schemapbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/schemapb"
"github.com/samber/lo"
)

// Collection model for collection information.
type Collection struct {
ID int64
// TODO partitions
Schema CollectionSchema
CreateTime uint64
Channels []Channel
ShardsNum int32
ConsistencyLevel ConsistencyLevel
State CollectionState
Properties map[string]string

// etcd collection key
key string

// lazy load func
loadOnce sync.Once
lazyLoad func(*Collection)
}

// CollectionHistory collection models with extra history data.
type CollectionHistory struct {
Collection
Ts uint64
Dropped bool
}

// newCollectionFromBase fetchs common base information form proto objects
func newCollectionFromBase[collectionBase interface {
GetID() int64
GetCreateTime() uint64
GetShardsNum() int32
GetPhysicalChannelNames() []string
GetVirtualChannelNames() []string
GetStartPositions() []KD
}, KD interface {
GetKey() string
GetData() []byte
}](info collectionBase) *Collection {
c := &Collection{}

c.ID = info.GetID()
c.CreateTime = info.GetCreateTime()
c.ShardsNum = info.GetShardsNum()
c.Channels = getChannels(info.GetPhysicalChannelNames(), info.GetVirtualChannelNames(), info.GetStartPositions())

return c
}

// NewCollectionFrom2_1 parses etcdpb.CollectionInfo(proto v2.0) to models.Collection.
func NewCollectionFromV2_1(info *etcdpb.CollectionInfo, key string) *Collection {
c := newCollectionFromBase[*etcdpb.CollectionInfo, *commonpb.KeyDataPair](info)
c.key = key
schema := info.GetSchema()
c.Schema = newSchemaFromBase(schema)
c.Schema.Fields = lo.Map(schema.GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapb.FieldSchema, schemapb.DataType](fieldSchema)
fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams())
return fs
})
// hard code created for version <= v2.1.4
c.State = CollectionStateCollectionCreated
c.ConsistencyLevel = ConsistencyLevel(info.GetConsistencyLevel())

return c
}

// NewCollectionFromV2_2 parses etcdpb.CollectionInfo(proto v2.2) to models.Collections.
func NewCollectionFromV2_2(info *etcdpbv2.CollectionInfo, key string, fields []*schemapbv2.FieldSchema) *Collection {
c := newCollectionFromBase[*etcdpbv2.CollectionInfo, *commonpbv2.KeyDataPair](info)
c.key = key
c.State = CollectionState(info.GetState())
schema := info.GetSchema()
schema.Fields = fields
c.Schema = newSchemaFromBase(schema)

c.Schema.Fields = lo.Map(fields, func(fieldSchema *schemapbv2.FieldSchema, _ int) FieldSchema {
fs := NewFieldSchemaFromBase[*schemapbv2.FieldSchema, schemapbv2.DataType](fieldSchema)
fs.Properties = GetMapFromKVPairs(fieldSchema.GetTypeParams())
return fs
})

c.ConsistencyLevel = ConsistencyLevel(info.GetConsistencyLevel())
info.GetStartPositions()

return c
}

func getChannels[cp interface {
GetKey() string
GetData() []byte
}](pcs, vcs []string, cps []cp) []Channel {
return lo.Map(cps, func(c cp, idx int) Channel {
return Channel{
PhysicalName: pcs[idx],
VirtualName: vcs[idx],
StartPosition: &MsgPosition{
ChannelName: c.GetKey(),
MsgID: c.GetData(),
},
}
})
}

// GetMapFromKVPairs parses kv pairs to map[string]string.
func GetMapFromKVPairs[kvPair interface {
GetKey() string
GetValue() string
}](pairs []kvPair) map[string]string {
result := make(map[string]string)
for _, kv := range pairs {
result[kv.GetKey()] = kv.GetValue()
}
return result
}
110 changes: 110 additions & 0 deletions models/collection_loaded.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package models

import (
"github.com/milvus-io/birdwatcher/proto/v2.0/querypb"
querypbv2 "github.com/milvus-io/birdwatcher/proto/v2.2/querypb"
)

// CollectionLoaded models collection loaded information.
type CollectionLoaded struct {
CollectionID int64
/* v1 deprecated
PartitionIDs []int64
PartitionStates []PartitionState
*/
LoadType LoadType
Schema CollectionSchema
ReleasedPartitionIDs []int64
InMemoryPercentage int64
ReplicaIDs []int64
ReplicaNumber int32
// since 2.2
Status LoadStatus
FieldIndexID map[int64]int64

// orignial etcd key
key string
Version string
}

func newCollectionLoadedBase[base interface {
GetCollectionID() int64
GetReplicaNumber() int32
}](info base) *CollectionLoaded {
return &CollectionLoaded{
CollectionID: info.GetCollectionID(),
ReplicaNumber: info.GetReplicaNumber(),
}
}

func NewCollectionLoadedV2_1(info *querypb.CollectionInfo, key string) *CollectionLoaded {
c := newCollectionLoadedBase(info)
c.ReleasedPartitionIDs = info.GetPartitionIDs()

c.LoadType = LoadType(info.GetLoadType())
c.Schema = newSchemaFromBase(info.GetSchema())
c.InMemoryPercentage = info.GetInMemoryPercentage()
c.ReplicaIDs = info.GetReplicaIds()
c.Version = LTEVersion2_1
c.key = key

return c
}

func NewCollectionLoadedV2_2(info *querypbv2.CollectionLoadInfo, key string) *CollectionLoaded {
c := newCollectionLoadedBase(info)
c.ReleasedPartitionIDs = info.GetReleasedPartitions()
c.Status = LoadStatus(info.GetStatus())
c.FieldIndexID = info.GetFieldIndexID()
c.Version = GTEVersion2_2
c.key = key
return c
}

type LoadType int32

const (
LoadTypeUnKnownType LoadType = 0
LoadTypeLoadPartition LoadType = 1
LoadTypeLoadCollection LoadType = 2
)

var LoadTypename = map[int32]string{
0: "UnKnownType",
1: "LoadPartition",
2: "LoadCollection",
}

var LoadTypevalue = map[string]int32{
"UnKnownType": 0,
"LoadPartition": 1,
"LoadCollection": 2,
}

func (x LoadType) String() string {
return EnumName(LoadTypename, int32(x))
}

type LoadStatus int32

const (
LoadStatusInvalid LoadStatus = 0
LoadStatusLoading LoadStatus = 1
LoadStatusLoaded LoadStatus = 2
)

var LoadStatusname = map[int32]string{
0: "Invalid",
1: "Loading",
2: "Loaded",
}

var LoadStatusvalue = map[string]int32{
"Invalid": 0,
"Loading": 1,
"Loaded": 2,
}

func (x LoadStatus) String() string {
return EnumName(LoadStatusname, int32(x))
}
Loading