Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fix option for Verify segment and add more logs #224

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions states/etcd/common/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,39 @@ func RemoveSegmentByID(ctx context.Context, cli kv.MetaKV, basePath string, coll
return err
}

func RemoveSegmentInsertLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, fieldID, logID int64) error {
// delete binlog entries
BinLogPath := path.Join(basePath, "datacoord-meta", "insert_log", fmt.Sprintf("%d/%d/%d/%d/%d", collectionID, partitionID, segmentID, fieldID, logID))
fmt.Println("remove", BinLogPath)
err := cli.Remove(ctx, BinLogPath)
if err != nil {
fmt.Printf("failed to delete insert binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}
return err
}

func RemoveSegmentDeltaLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, logID int64) error {
// delete binlog entries
DeltaLogPath := path.Join(basePath, "datacoord-meta", "delta_log", fmt.Sprintf("%d/%d/%d/%d", collectionID, partitionID, segmentID, logID))
fmt.Println("remove", DeltaLogPath)
err := cli.Remove(ctx, DeltaLogPath)
if err != nil {
fmt.Printf("failed to delete delta binlogs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}
return err
}

func RemoveSegmentStatLogPath(ctx context.Context, cli kv.MetaKV, basePath string, collectionID, partitionID, segmentID, fieldID, logID int64) error {
// delete binlog entries
StatLogPath := path.Join(basePath, "datacoord-meta", "stats_log", fmt.Sprintf("%d/%d/%d/%d/%d", collectionID, partitionID, segmentID, fieldID, logID))
fmt.Println("remove", StatLogPath)
err := cli.Remove(ctx, StatLogPath)
if err != nil {
fmt.Printf("failed to delete stat logs from etcd for segment %d, err: %s\n", segmentID, err.Error())
}
return err
}

func UpdateSegments(ctx context.Context, cli kv.MetaKV, basePath string, collectionID int64, fn func(segment *datapbv2.SegmentInfo)) error {

prefix := path.Join(basePath, fmt.Sprintf("%s/%d", SegmentMetaPrefix, collectionID)) + "/"
Expand Down
20 changes: 14 additions & 6 deletions states/kv/kv_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,33 @@ func (c *FileAuditKV) Save(ctx context.Context, key, value string) error {

func (c *FileAuditKV) Remove(ctx context.Context, key string) error {
fmt.Println("audit delete", key)
kv, err := c.cli.removeWithPrevKV(ctx, key)
val, err := c.cli.Load(ctx, key)
if err != nil {
return err
}

err = c.cli.Remove(ctx, key)
if err != nil {
return err
}
c.writeHeader(models.OpDel, 1)
c.writeLogKV(kv)
c.writeKeyValue(key, val)
return nil
}

func (c *FileAuditKV) RemoveWithPrefix(ctx context.Context, key string) error {
fmt.Println("audit delete with prefix", key)
kvs, err := c.cli.removeWithPrefixAndPrevKV(ctx, key)
val, err := c.cli.Load(ctx, key)
if err != nil {
return err
}
c.writeHeader(models.OpDel, int32(len(kvs)))
for _, kv := range kvs {
c.writeLogKV(kv)

err = c.cli.RemoveWithPrefix(ctx, key)
if err != nil {
return err
}
c.writeHeader(models.OpDel, 1)
c.writeKeyValue(key, val)
return nil
}

Expand Down
124 changes: 99 additions & 25 deletions states/verify_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package states
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/milvus-io/birdwatcher/models"
Expand All @@ -23,7 +24,7 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command {
fmt.Println(err.Error())
return
}
patch, err := cmd.Flags().GetBool("patch")
fix, err := cmd.Flags().GetBool("fix")
if err != nil {
fmt.Println(err.Error())
return
Expand Down Expand Up @@ -61,30 +62,103 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command {
_, err := minioClient.StatObject(context.Background(), bucketName, l.LogPath, minio.StatObjectOptions{})
if err != nil {
errResp := minio.ToErrorResponse(err)
if errResp.Code != "NoSuchKey" {
fmt.Println("failed to stat object in minio", err.Error())
continue
}
if !patch {
fmt.Println("file not exists in minio", l.LogPath)
continue
}
// try to patch 01 => 1 bug
if item.tag == "statslog" && strings.HasSuffix(l.LogPath, "/1") {
currentObjectPath := strings.TrimSuffix(l.LogPath, "/1") + "/01"
_, err = minioClient.StatObject(context.Background(), bucketName, currentObjectPath, minio.StatObjectOptions{})
if err != nil {
fmt.Println(currentObjectPath, "also not exists")
continue
fmt.Println("failed to check ", l.LogPath, err, errResp.Code)
if errResp.Code == "NoSuchKey" {
if item.tag == "binlog" {
fmt.Println("path", l.LogPath, fix)
splits := strings.Split(l.LogPath, "/")
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
fmt.Println("failed to parse logID")
}

fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
fmt.Println("failed to parse fieldID")
}

segmentID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64)
if err != nil {
fmt.Println("failed to parse segmentID")
}

partitionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64)
if err != nil {
fmt.Println("failed to parse partitionID")
}

collectionID, err := strconv.ParseInt(splits[len(splits)-5], 10, 64)
if err != nil {
fmt.Println("failed to parse collectionID")
}

if fix && err == nil {
err := common.RemoveSegmentInsertLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, fieldID, logID)
if err != nil {
fmt.Println("failed to remove segment insert path")
}
}
} else if item.tag == "statslog" {
splits := strings.Split(l.LogPath, "/")
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
fmt.Println("failed to parse logID")
}

fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
fmt.Println("failed to parse fieldID")
}

segmentID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64)
if err != nil {
fmt.Println("failed to parse segmentID")
}

partitionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64)
if err != nil {
fmt.Println("failed to parse parititonID")
}

collectionID, err := strconv.ParseInt(splits[len(splits)-5], 10, 64)
if err != nil {
fmt.Println("failed to parse col id")
}

if fix && err == nil {
err := common.RemoveSegmentStatLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, fieldID, logID)
if err != nil {
fmt.Println("failed to remove segment insert path")
}
}
} else if item.tag == "deltalog" {
splits := strings.Split(l.LogPath, "/")
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
fmt.Println("failed to parse logID")
}
segmentID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
fmt.Println("failed to parse segmentID")
}

partitionID, err := strconv.ParseInt(splits[len(splits)-3], 10, 64)
if err != nil {
fmt.Println("failed to parse partitionID")
}

collectionID, err := strconv.ParseInt(splits[len(splits)-4], 10, 64)
if err != nil {
fmt.Println("failed to parse col id")
}

if fix && err == nil {
err := common.RemoveSegmentDeltaLogPath(context.Background(), cli, basePath, collectionID, partitionID, segmentID, logID)
if err != nil {
fmt.Println("failed to remove segment insert path")
}
}
}
fmt.Printf("current statslog(%s) for (%s) found, try to copy object", currentObjectPath, l.LogPath)
minioClient.CopyObject(context.Background(), minio.CopyDestOptions{
Bucket: bucketName,
Object: l.LogPath,
}, minio.CopySrcOptions{
Bucket: bucketName,
Object: currentObjectPath,
})
}
}
}
Expand All @@ -98,6 +172,6 @@ func getVerifySegmentCmd(cli kv.MetaKV, basePath string) *cobra.Command {
}

cmd.Flags().Int64("collection", 0, "collection id")
cmd.Flags().Bool("patch", false, "try to patch with known issue logic")
cmd.Flags().Bool("fix", false, "remove the log path to fix no such key")
return cmd
}