Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Sync v1.0.x pr to main branch (part4) #357

Merged
merged 13 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .goreleaser-darwin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ before:
builds:
- env:
- CGO_ENABLED=1
main: ./cmd/birdwatcher
goos:
# - linux
#- windows
Expand Down
1 change: 1 addition & 0 deletions .goreleaser-linux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ before:
builds:
- env:
- CGO_ENABLED=1
main: ./cmd/birdwatcher
goos:
- linux
#- windows
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ all: static-check birdwatcher
birdwatcher:
@echo "Compiling birdwatcher"
@mkdir -p bin
@CGO_ENABLED=0 go build -o bin/birdwatcher main.go
@CGO_ENABLED=0 go build -o bin/birdwatcher cmd/birdwatcher/main.go

birdwatcher_wkafka:
@echo "Compiling birdwatcher with kafka(CGO_ENABLED)"
@mkdir -p bin
@CGO_ENABLED=1 go build -o bin/birdwatcher_wkafka -tags WKAFKA main.go
@CGO_ENABLED=1 go build -o bin/birdwatcher_wkafka -tags WKAFKA cmd/birdwatcher/main.go

getdeps:
@mkdir -p $(INSTALL_PATH)
Expand Down
75 changes: 75 additions & 0 deletions cmd/birdwatcher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"flag"
"fmt"
"log"
"os"
"os/exec"

_ "github.com/milvus-io/birdwatcher/asap"
"github.com/milvus-io/birdwatcher/bapps"
"github.com/milvus-io/birdwatcher/common"
"github.com/milvus-io/birdwatcher/configs"
"github.com/milvus-io/birdwatcher/states"
)

var (
oneLineCommand = flag.String("olc", "", "one line command execution mode")
simple = flag.Bool("simple", false, "use simple ui without suggestion and history")
restServer = flag.Bool("rest", false, "rest server address")
webPort = flag.Int("port", 8002, "listening port for web server")
printVersion = flag.Bool("version", false, "print version")
)

func main() {
flag.Parse()

var appFactory func(config *configs.Config) bapps.BApp

switch {
// Print current birdwatcher version
case *printVersion:
fmt.Println("Birdwatcher Version", common.Version)
return
case *simple:
appFactory = func(*configs.Config) bapps.BApp { return bapps.NewSimpleApp() }
case len(*oneLineCommand) > 0:
appFactory = func(*configs.Config) bapps.BApp { return bapps.NewOlcApp(*oneLineCommand) }
case *restServer:
appFactory = func(config *configs.Config) bapps.BApp { return bapps.NewWebServerApp(*webPort, config) }
default:
defer handleExit()
// open file and create if non-existent
file, err := os.OpenFile("bw_debug.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
log.Fatal(err)
}
defer file.Close()

logger := log.New(file, "Custom Log", log.LstdFlags)

appFactory = func(config *configs.Config) bapps.BApp {
return bapps.NewPromptApp(config, bapps.WithLogger(logger))
}
}

config, err := configs.NewConfig(".bw_config")
if err != nil {
// run by default, just printing warning.
fmt.Println("[WARN] load config file failed, running in default setting", err.Error())
}

start := states.Start(config)

app := appFactory(config)
app.Run(start)
}

// handleExit is the fix for go-prompt output hi-jack fix.
func handleExit() {
rawModeOff := exec.Command("/bin/stty", "-raw", "echo")
rawModeOff.Stdin = os.Stdin
_ = rawModeOff.Run()
rawModeOff.Wait()
}
4 changes: 2 additions & 2 deletions states/backup_mock_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *embedEtcdMockState) SetInstance(instanceName string) {
s.SetLabel(fmt.Sprintf("Backup(%s)", instanceName))
s.instanceName = instanceName
rootPath := path.Join(instanceName, metaPath)
s.ComponentShow = show.NewComponent(s.client, s.config, rootPath)
s.ComponentShow = show.NewComponent(s.client, s.config, instanceName, metaPath)
s.ComponentRemove = remove.NewComponent(s.client, s.config, rootPath)
s.ComponentRepair = repair.NewComponent(s.client, s.config, rootPath)
s.SetupCommands()
Expand Down Expand Up @@ -171,7 +171,7 @@ func getEmbedEtcdInstance(server *embed.Etcd, cli kv.MetaKV, instanceName string

state := &embedEtcdMockState{
CmdState: framework.NewCmdState(fmt.Sprintf("Backup(%s)", instanceName)),
ComponentShow: show.NewComponent(cli, config, basePath),
ComponentShow: show.NewComponent(cli, config, instanceName, metaPath),
ComponentRemove: remove.NewComponent(cli, config, basePath),
instanceName: instanceName,
server: server,
Expand Down
19 changes: 15 additions & 4 deletions states/etcd/common/bulkinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,25 @@ func ListImportJobs(ctx context.Context, cli kv.MetaKV, basePath string, filters
return nil, nil, err
}

return lo.FilterMap(jobs, func(job datapb.ImportJob, idx int) (*datapb.ImportJob, bool) {
resultJobs := make([]*datapb.ImportJob, 0, len(jobs))
resultKeys := make([]string, 0, len(keys))

filterFn := func(job datapb.ImportJob) bool {
for _, filter := range filters {
if !filter(&job) {
return nil, false
return false
}
}
return &job, true
}), keys, nil
return true
}
for i, job := range jobs {
if ok := filterFn(job); ok {
resultJobs = append(resultJobs, &jobs[i])
resultKeys = append(resultKeys, keys[i])
}
}

return resultJobs, resultKeys, nil
}

// ListPreImportTasks list pre-import tasks.
Expand Down
6 changes: 1 addition & 5 deletions states/etcd/common/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package common
import (
"context"
"path"
"time"

"github.com/milvus-io/birdwatcher/proto/v2.0/etcdpb"
"github.com/milvus-io/birdwatcher/proto/v2.0/indexpb"
Expand All @@ -18,10 +17,7 @@ func ListIndex(ctx context.Context, cli kv.MetaKV, basePath string, filters ...f
}

// ListSegmentIndex list segment index info.
func ListSegmentIndex(cli kv.MetaKV, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

func ListSegmentIndex(ctx context.Context, cli kv.MetaKV, basePath string, filters ...func(segIdx *etcdpb.SegmentIndexInfo) bool) ([]etcdpb.SegmentIndexInfo, error) {
prefix := path.Join(basePath, "root-coord/segment-index") + "/"
result, _, err := ListProtoObjects(ctx, cli, prefix, filters...)
return result, err
Expand Down
60 changes: 60 additions & 0 deletions states/etcd/remove/dirty_importing_segment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package remove

import (
"context"
"fmt"

"github.com/samber/lo"

"github.com/milvus-io/birdwatcher/framework"
"github.com/milvus-io/birdwatcher/models"
"github.com/milvus-io/birdwatcher/states/etcd/common"
etcdversion "github.com/milvus-io/birdwatcher/states/etcd/version"
)

type DirtyImportingSegment struct {
framework.ParamBase `use:"remove dirty-importing-segment" desc:"remove dirty importing segments with 0 rows"`
CollectionID int64 `name:"collection" default:"0" desc:"collection id to filter with"`
Ts int64 `name:"ts" default:"0" desc:"only remove segments with ts less than this value"`
Run bool `name:"run" default:"false" desc:"flag to control actually run or dry"`
}

// DirtyImportingSegmentCommand returns command to remove
func (c *ComponentRemove) DirtyImportingSegmentCommand(ctx context.Context, p *DirtyImportingSegment) error {
fmt.Println("start to remove dirty importing segment")
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(segment *models.Segment) bool {
return (p.CollectionID == 0 || segment.CollectionID == p.CollectionID)
})
if err != nil {
return err
}

groups := lo.GroupBy(segments, func(segment *models.Segment) int64 {
return segment.CollectionID
})

for collectionID, segments := range groups {
for _, segment := range segments {
if segment.State == models.SegmentStateImporting {
segmentTs := segment.GetDmlPosition().GetTimestamp()
if segmentTs == 0 {
segmentTs = segment.GetStartPosition().GetTimestamp()
}
if segment.NumOfRows == 0 && segmentTs < uint64(p.Ts) {
fmt.Printf("collection %d, segment %d is dirty importing with 0 rows, remove it\n", collectionID, segment.ID)
if p.Run {
err := common.RemoveSegmentByID(ctx, c.client, c.basePath, segment.CollectionID, segment.PartitionID, segment.ID)
if err != nil {
fmt.Printf("failed to remove segment %d, err: %s\n", segment.ID, err.Error())
}
}
} else {
fmt.Printf("collection %d, segment %d is dirty importing with %d rows, ts=%d, skip it\n", collectionID, segment.ID, segment.NumOfRows, segmentTs)
}
}
}
}

fmt.Println("finish to remove dirty importing segment")
return nil
}
6 changes: 5 additions & 1 deletion states/etcd/repair/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/golang/protobuf/proto"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -56,8 +57,11 @@ func SegmentCommand(cli kv.MetaKV, basePath string) *cobra.Command {
return
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

// use v1 meta for now
segmentIndexes, err := common.ListSegmentIndex(cli, basePath)
segmentIndexes, err := common.ListSegmentIndex(ctx, cli, basePath)
if err != nil {
fmt.Println(err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type AliasParam struct {

// AliasCommand implements `show alias` command.
func (c *ComponentShow) AliasCommand(ctx context.Context, p *AliasParam) (*Aliases, error) {
aliases, err := common.ListAliasVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(a *models.Alias) bool {
aliases, err := common.ListAliasVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(a *models.Alias) bool {
return p.DBID == -1 || p.DBID == a.DBID
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/bulkinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ImportJobParam struct {

// BulkInsertCommand returns show bulkinsert command.
func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam) error {
jobs, _, err := common.ListImportJobs(ctx, c.client, c.basePath, func(job *datapb.ImportJob) bool {
jobs, _, err := common.ListImportJobs(ctx, c.client, c.metaPath, func(job *datapb.ImportJob) bool {
return (p.JobID == 0 || job.GetJobID() == p.JobID) &&
(p.CollectionID == 0 || job.GetCollectionID() == p.CollectionID) &&
(p.State == "" || strings.EqualFold(job.GetState().String(), p.State))
Expand All @@ -51,7 +51,7 @@ func (c *ComponentShow) BulkInsertCommand(ctx context.Context, p *ImportJobParam
fmt.Println("Please specify the job ID (-job={JobID}) to show detailed info.")
return nil
}
PrintDetailedImportJob(ctx, c.client, c.basePath, job, p.ShowAllFiles)
PrintDetailedImportJob(ctx, c.client, c.metaPath, job, p.ShowAllFiles)
} else {
PrintSimpleImportJob(job)
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/channel_watched.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ChannelWatchedParam struct {

// ChannelWatchedCommand return show channel-watched commands.
func (c *ComponentShow) ChannelWatchedCommand(ctx context.Context, p *ChannelWatchedParam) (*framework.PresetResultSet, error) {
infos, err := common.ListChannelWatch(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
infos, err := common.ListChannelWatch(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(channel *models.ChannelWatch) bool {
return (p.CollectionID == 0 || channel.Vchan.CollectionID == p.CollectionID) && (!p.WithoutSchema || channel.Schema == nil)
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions states/etcd/show/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type CheckpointParam struct {

// CheckpointCommand returns show checkpoint command.
func (c *ComponentShow) CheckpointCommand(ctx context.Context, p *CheckpointParam) (*Checkpoints, error) {
coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
coll, err := common.GetCollectionByIDVersion(context.Background(), c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err != nil {
return nil, errors.Wrap(err, "failed to get collection")
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func (rs *Checkpoints) PrintAs(format framework.Format) string {
}

func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName string) (*models.MsgPosition, error) {
prefix := path.Join(c.basePath, "datacoord-meta", "channel-cp", channelName)
prefix := path.Join(c.metaPath, "datacoord-meta", "channel-cp", channelName)
results, _, err := common.ListProtoObjects[internalpb.MsgPosition](ctx, c.client, prefix)
if err != nil {
return nil, err
Expand All @@ -108,7 +108,7 @@ func (c *ComponentShow) getChannelCheckpoint(ctx context.Context, channelName st
}

func (c *ComponentShow) getCheckpointFromSegments(ctx context.Context, collID int64, vchannel string) (*models.MsgPosition, int64, error) {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.Segment) bool {
segments, err := common.ListSegmentsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.Segment) bool {
return info.CollectionID == collID && info.InsertChannel == vchannel
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func (c *ComponentShow) CollectionCommand(ctx context.Context, p *CollectionPara
// perform get by id to accelerate
if p.CollectionID > 0 {
var collection *models.Collection
collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
collection, err = common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err == nil {
collections = append(collections, collection)
}
} else {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(coll *models.Collection) bool {
collections, err = common.ListCollectionsVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(coll *models.Collection) bool {
if p.CollectionName != "" && coll.Schema.Name != p.CollectionName {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions states/etcd/show/collection_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect
}

// fetch current for now
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.basePath, etcdversion.GetVersion(), p.CollectionID)
collection, err := common.GetCollectionByIDVersion(ctx, c.client, c.metaPath, etcdversion.GetVersion(), p.CollectionID)
if err != nil {
switch {
case errors.Is(err, common.ErrCollectionDropped):
Expand All @@ -43,7 +43,7 @@ func (c *ComponentShow) CollectionHistoryCommand(ctx context.Context, p *Collect
Collection: collection,
}
// fetch history
items, err := common.ListCollectionHistory(ctx, c.client, c.basePath, etcdversion.GetVersion(), collection.DBID, p.CollectionID)
items, err := common.ListCollectionHistory(ctx, c.client, c.metaPath, etcdversion.GetVersion(), collection.DBID, p.CollectionID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/collection_loaded.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type CollectionLoadedParam struct {
// CollectionLoadedCommand return show collection-loaded command.
func (c *ComponentShow) CollectionLoadedCommand(ctx context.Context, p *CollectionLoadedParam) (*CollectionsLoaded, error) {
var total int
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.basePath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
infos, err := common.ListCollectionLoadedInfo(ctx, c.client, c.metaPath, etcdversion.GetVersion(), func(info *models.CollectionLoaded) bool {
total++
return p.CollectionID == 0 || p.CollectionID == info.CollectionID
})
Expand Down
2 changes: 1 addition & 1 deletion states/etcd/show/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *ComponentShow) CompactionTaskCommand(ctx context.Context, p *Compaction

// perform get by id to accelerate

compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.basePath, func(task *models.CompactionTask) bool {
compactionTasks, err = common.ListCompactionTask(ctx, c.client, c.metaPath, func(task *models.CompactionTask) bool {
total++
if p.CollectionName != "" && task.GetSchema().GetName() != p.CollectionName {
return false
Expand Down
Loading