Skip to content

Commit

Permalink
enhance: manual compaction trigger (milvus-io#269)
Browse files Browse the repository at this point in the history
issue: milvus-io#268

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored and congqixia committed Feb 12, 2025
1 parent 531e8cf commit aa56daf
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
2 changes: 2 additions & 0 deletions states/etcd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func RepairCommand(cli kv.MetaKV, basePath string) *cobra.Command {
repair.IndexMetricCommand(cli, basePath),
repair.DiskAnnIndexParamsCommand(cli, basePath),
repair.AddIndexParamsCommand(cli, basePath),
// repair manual compaction
repair.ManualCompactionCommand(cli, basePath),
)

return repairCmd
Expand Down
73 changes: 73 additions & 0 deletions states/etcd/repair/manual_compaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package repair

import (
"context"
"fmt"
"time"

"github.com/milvus-io/birdwatcher/proto/v2.0/commonpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/datapb"
"github.com/milvus-io/birdwatcher/proto/v2.0/milvuspb"
"github.com/milvus-io/birdwatcher/states/etcd/common"
"github.com/milvus-io/birdwatcher/states/kv"
"github.com/spf13/cobra"
"google.golang.org/grpc"
)

func ManualCompactionCommand(cli kv.MetaKV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "manual-compaction",
Short: "do manual compaction",
Run: func(cmd *cobra.Command, args []string) {
collID, err := cmd.Flags().GetInt64("collection")
if err != nil {
fmt.Println(err.Error())
return
}
if collID == 0 {
fmt.Printf("collection id should not be zero\n")
return
}
doManualCompaction(cli, basePath, collID)
},
}
cmd.Flags().Int64("collection", 0, "collection id")
return cmd
}

func doManualCompaction(cli kv.MetaKV, basePath string, collID int64) {
sessions, err := common.ListSessions(cli, basePath)
if err != nil {
fmt.Println("failed to list session")
return
}
for _, session := range sessions {
if session.ServerName == "datacoord" {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithTimeout(2 * time.Second),
}

conn, err := grpc.DialContext(context.Background(), session.Address, opts...)
if err != nil {
fmt.Printf("failed to connect to DataCoord(%d) addr: %s, err: %s\n", session.ServerID, session.Address, err.Error())
return
}

client := datapb.NewDataCoordClient(conn)
result, err := client.ManualCompaction(context.Background(), &milvuspb.ManualCompactionRequest{
CollectionID: collID,
})
if err != nil {
fmt.Println("failed to call ManualCompaction", err.Error())
return
}
if result.Status.ErrorCode != commonpb.ErrorCode_Success {
fmt.Printf("ManualCompaction failed, error code = %s, reason = %s\n", result.Status.ErrorCode.String(), result.Status.Reason)
return
}
fmt.Printf("ManualCompaction trigger success, id: %d\n", result.CompactionID)
}
}
}

0 comments on commit aa56daf

Please sign in to comment.