Skip to content

Commit

Permalink
Support BirdWatcher to insepect delta and stat log path for 2.2 pb (#82)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofan-luan <[email protected]>
  • Loading branch information
xiaofan-luan authored Dec 30, 2022
1 parent 4f1bbbb commit acb1ab5
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,62 @@ func FillFieldsIfV2(cli *clientv3.Client, basePath string, segment *datapb.Segme

if len(segment.Deltalogs) == 0 {
prefix := path.Join(basePath, "datacoord-meta", fmt.Sprintf("deltalog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID))
fields, _, err := ListProtoObjects[datapb.FieldBinlog](context.Background(), cli, prefix)
fields, _, err := ListProtoObjects[datapbv2.FieldBinlog](context.Background(), cli, prefix)
if err != nil {
return err
}

segment.Deltalogs = make([]*datapb.FieldBinlog, 0, len(fields))
for _, field := range fields {
field := field
f := proto.Clone(&field).(*datapb.FieldBinlog)
f := &datapb.FieldBinlog{
FieldID: field.FieldID,
Binlogs: make([]*datapb.Binlog, 0, len(field.Binlogs)),
}

for _, binlog := range field.Binlogs {
l := &datapb.Binlog{
EntriesNum: binlog.EntriesNum,
TimestampFrom: binlog.TimestampFrom,
TimestampTo: binlog.TimestampTo,
LogPath: binlog.LogPath,
LogSize: binlog.LogSize,
}
if l.LogPath == "" {
l.LogPath = fmt.Sprintf("files/delta_log/%d/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID, binlog.LogID)
}
f.Binlogs = append(f.Binlogs, l)
}
segment.Deltalogs = append(segment.Deltalogs, f)
}
}

if len(segment.Statslogs) == 0 {
prefix := path.Join(basePath, "datacoord-meta", fmt.Sprintf("statslog/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID))
fields, _, err := ListProtoObjects[datapb.FieldBinlog](context.Background(), cli, prefix)
fields, _, err := ListProtoObjects[datapbv2.FieldBinlog](context.Background(), cli, prefix)
if err != nil {
return err
}

segment.Statslogs = make([]*datapb.FieldBinlog, 0, len(fields))
for _, field := range fields {
field := field
f := proto.Clone(&field).(*datapb.FieldBinlog)
f := &datapb.FieldBinlog{
FieldID: field.FieldID,
Binlogs: make([]*datapb.Binlog, 0, len(field.Binlogs)),
}

for _, binlog := range field.Binlogs {
l := &datapb.Binlog{
EntriesNum: binlog.EntriesNum,
TimestampFrom: binlog.TimestampFrom,
TimestampTo: binlog.TimestampTo,
LogPath: binlog.LogPath,
LogSize: binlog.LogSize,
}
if l.LogPath == "" {
l.LogPath = fmt.Sprintf("files/statslog/%d/%d/%d/%d", segment.CollectionID, segment.PartitionID, segment.ID, binlog.LogID)
}
f.Binlogs = append(f.Binlogs, l)
}
segment.Statslogs = append(segment.Statslogs, f)
}
}
Expand Down

0 comments on commit acb1ab5

Please sign in to comment.