diff --git a/cmd/zoekt-sourcegraph-indexserver/debug.go b/cmd/zoekt-sourcegraph-indexserver/debug.go new file mode 100644 index 000000000..e290f4ed5 --- /dev/null +++ b/cmd/zoekt-sourcegraph-indexserver/debug.go @@ -0,0 +1,196 @@ +// This file contains commands which run in a non daemon mode for testing/debugging. + +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "strconv" + + "github.com/peterbourgon/ff/v3/ffcli" + + "github.com/google/zoekt/build" +) + +func debugFind() *ffcli.Command { + fs := flag.NewFlagSet("debug find", flag.ExitOnError) + debugFindIndex := fs.String("index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") + + return &ffcli.Command{ + Name: "find", + ShortUsage: "find [flags] ", + ShortHelp: "find a shard by repo name", + FlagSet: fs, + Exec: func(ctx context.Context, args []string) error { + if len(args) == 0 { + return fmt.Errorf("missing repository name") + } + ia := indexArgs{ + IndexOptions: IndexOptions{ + Name: args[0], + }, + IndexDir: *debugFindIndex, + } + bo := ia.BuildOptions() + for _, s := range bo.FindAllShards() { + fmt.Println(s) + } + return nil + }, + } +} + +func debugIndex() *ffcli.Command { + fs := flag.NewFlagSet("debug index", flag.ExitOnError) + conf := rootConfig{} + conf.registerRootFlags(fs) + + return &ffcli.Command{ + Name: "index", + ShortUsage: "index [flags] ", + ShortHelp: "index a repository", + FlagSet: fs, + Exec: func(ctx context.Context, args []string) error { + if len(args) == 0 { + return fmt.Errorf("missing repository ID") + } + s, err := newServer(conf) + if err != nil { + return err + } + id, err := strconv.Atoi(args[0]) + if err != nil { + return err + } + msg, err := s.forceIndex(uint32(id)) + log.Println(msg) + if err != nil { + return err + } + return nil + }, + } +} + +func debugTrigrams() *ffcli.Command { + return &ffcli.Command{ + Name: "trigrams", + ShortUsage: "trigrams ", + ShortHelp: "list all the trigrams in a shard", + Exec: func(ctx context.Context, args []string) error { + if len(args) == 0 { + return fmt.Errorf("missing path to shard") + } + return printShardStats(args[0]) + }, + } +} + +func debugMeta() *ffcli.Command { + return &ffcli.Command{ + Name: "meta", + ShortUsage: "meta ", + ShortHelp: "output index and repo metadata", + Exec: func(ctx context.Context, args []string) error { + if len(args) == 0 { + return fmt.Errorf("missing path to shard") + } + return printMetaData(args[0]) + }, + } +} + +func debugMerge() *ffcli.Command { + fs := flag.NewFlagSet("debug merge", flag.ExitOnError) + simulate := fs.Bool("simulate", false, "if set, merging will be simulated") + targetSize := fs.Int64("merge_target_size", getEnvWithDefaultInt64("SRC_TARGET_SIZE", 2000), "the target size of compound shards in MiB") + index := fs.String("index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") + dbg := fs.Bool("debug", srcLogLevelIsDebug(), "turn on more verbose logging.") + + return &ffcli.Command{ + Name: "merge", + FlagSet: fs, + ShortUsage: "merge [flags] ", + ShortHelp: "run a full merge operation inside dir", + Exec: func(ctx context.Context, args []string) error { + if *dbg { + debug = log.New(os.Stderr, "", log.LstdFlags) + } + return doMerge(*index, *targetSize*1024*1024, *simulate) + }, + } +} + +func debugList() *ffcli.Command { + fs := flag.NewFlagSet("debug list", flag.ExitOnError) + conf := rootConfig{} + conf.registerRootFlags(fs) + + return &ffcli.Command{ + Name: "list", + ShortUsage: "list [flags]", + ShortHelp: "list the repositories that are OWNED by this indexserver", + FlagSet: fs, + Exec: func(ctx context.Context, args []string) error { + s, err := newServer(conf) + if err != nil { + return err + } + repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) + if err != nil { + return err + } + for _, r := range repos.IDs { + fmt.Println(r) + } + return nil + }, + } +} + +func debugListIndexed() *ffcli.Command { + fs := flag.NewFlagSet("debug list-indexed", flag.ExitOnError) + conf := rootConfig{} + conf.registerRootFlags(fs) + + return &ffcli.Command{ + Name: "list-indexed", + ShortUsage: "list-indexed [flags]", + ShortHelp: "list the repositories that are INDEXED by this indexserver", + FlagSet: fs, + Exec: func(ctx context.Context, args []string) error { + s, err := newServer(conf) + if err != nil { + return err + } + indexed := listIndexed(s.IndexDir) + for _, r := range indexed { + fmt.Println(r) + } + return nil + }, + } +} + +func debugCmd() *ffcli.Command { + fs := flag.NewFlagSet("debug", flag.ExitOnError) + + return &ffcli.Command{ + Name: "debug", + ShortUsage: "debug ", + ShortHelp: "a set of commands for debugging and testing", + FlagSet: fs, + Subcommands: []*ffcli.Command{ + debugFind(), + debugIndex(), + debugList(), + debugListIndexed(), + debugMerge(), + debugMeta(), + debugTrigrams(), + }, + } +} diff --git a/cmd/zoekt-sourcegraph-indexserver/main.go b/cmd/zoekt-sourcegraph-indexserver/main.go index 281167517..58443eb75 100644 --- a/cmd/zoekt-sourcegraph-indexserver/main.go +++ b/cmd/zoekt-sourcegraph-indexserver/main.go @@ -29,16 +29,17 @@ import ( "time" "cloud.google.com/go/profiler" - "github.com/google/zoekt" - "github.com/google/zoekt/debugserver" "github.com/hashicorp/go-retryablehttp" + "github.com/keegancsmith/tmpfriend" + "github.com/peterbourgon/ff/v3/ffcli" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/automaxprocs/maxprocs" "golang.org/x/net/trace" + "github.com/google/zoekt" "github.com/google/zoekt/build" - "github.com/keegancsmith/tmpfriend" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/google/zoekt/debugserver" ) var ( @@ -634,7 +635,7 @@ func hostnameBestEffort() string { // // If main is true we will delete older temp directories left around. main is // false when this is a debug command. -func setupTmpDir(index string, main bool) error { +func setupTmpDir(index string) error { tmpRoot := filepath.Join(index, ".indexserver.tmp") if err := os.MkdirAll(tmpRoot, 0755); err != nil { return err @@ -725,6 +726,14 @@ func getEnvWithDefaultInt(k string, defaultVal int) int { return i } +func getEnvWithDefaultString(k string, defaultVal string) string { + v := os.Getenv(k) + if v == "" { + return defaultVal + } + return v +} + func setCompoundShardCounter(indexDir string) { fns, err := filepath.Glob(filepath.Join(indexDir, "compound-*.zoekt")) if err != nil { @@ -734,52 +743,89 @@ func setCompoundShardCounter(indexDir string) { metricNumberCompoundShards.Set(float64(len(fns))) } -func main() { - defaultIndexDir := os.Getenv("DATA_DIR") - if defaultIndexDir == "" { - defaultIndexDir = build.DefaultDir - } - - root := flag.String("sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") - interval := flag.Duration("interval", time.Minute, "sync with sourcegraph this often") - vacuumInterval := flag.Duration("vacuum_interval", 24*time.Hour, "run vacuum this often") - mergeInterval := flag.Duration("merge_interval", time.Hour, "run merge this often") - targetSize := flag.Int64("merge_target_size", getEnvWithDefaultInt64("SRC_TARGET_SIZE", 2000), "the target size of compound shards in MiB") - minSize := flag.Int64("merge_min_size", getEnvWithDefaultInt64("SRC_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") - index := flag.String("index", defaultIndexDir, "set index directory to use") - listen := flag.String("listen", ":6072", "listen on this address.") - hostname := flag.String("hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") - cpuFraction := flag.Float64("cpu_fraction", 1.0, "use this fraction of the cores for indexing.") - dbg := flag.Bool("debug", srcLogLevelIsDebug(), "turn on more verbose logging.") - blockProfileRate := flag.Int("block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler (default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") - - // non daemon mode for debugging/testing - debugFind := flag.String("debug-find", "", "find a shard by repo name.") - debugList := flag.Bool("debug-list", false, "do not start the indexserver, rather list the repositories owned by this indexserver then quit.") - debugListIndexed := flag.Bool("debug-list-indexed", false, "do not start the indexserver, rather list the repositories indexed by this indexserver then quit.") - debugIndex := flag.String("debug-index", "", "do not start the indexserver, rather index the repository ID then quit.") - debugShard := flag.String("debug-shard", "", "do not start the indexserver, rather print shard stats then quit.") - debugMeta := flag.String("debug-meta", "", "do not start the indexserver, rather print shard metadata then quit.") - debugMerge := flag.Bool("debug-merge", false, "do not start the indexserver, rather run merge in the index directory then quit.") - debugMergeSimulate := flag.Bool("simulate", false, "use in conjuction with debugMerge. If set, merging is simulated.") - - _ = flag.Bool("exp-git-index", true, "DEPRECATED: not read anymore. We always use zoekt-git-index now.") - - flag.Parse() - - if *cpuFraction <= 0.0 || *cpuFraction > 1.0 { - log.Fatal("cpu_fraction must be between 0.0 and 1.0") - } - if *index == "" { - log.Fatal("must set -index") - } - needSourcegraph := !(*debugShard != "" || *debugMeta != "" || *debugMerge) - if *root == "" && needSourcegraph { - log.Fatal("must set -sourcegraph_url") - } - rootURL, err := url.Parse(*root) +func rootCmd() *ffcli.Command { + rootFs := flag.NewFlagSet("rootFs", flag.ExitOnError) + conf := rootConfig{} + conf.registerRootFlags(rootFs) + + return &ffcli.Command{ + FlagSet: rootFs, + ShortUsage: "zoekt-sourcegraph-indexserver [flags] []", + Subcommands: []*ffcli.Command{debugCmd()}, + Exec: func(ctx context.Context, args []string) error { + return startServer(conf) + }, + } +} + +type rootConfig struct { + root string + interval time.Duration + index string + listen string + hostname string + cpuFraction float64 + dbg bool + blockProfileRate int + + // config values related to shard merging + vacuumInterval time.Duration + mergeInterval time.Duration + targetSize int64 + minSize int64 +} + +func (rc *rootConfig) registerRootFlags(fs *flag.FlagSet) { + fs.StringVar(&rc.root, "sourcegraph_url", os.Getenv("SRC_FRONTEND_INTERNAL"), "http://sourcegraph-frontend-internal or http://localhost:3090. If a path to a directory, we fake the Sourcegraph API and index all repos rooted under path.") + fs.DurationVar(&rc.interval, "interval", time.Minute, "sync with sourcegraph this often") + fs.DurationVar(&rc.vacuumInterval, "vacuum_interval", 24*time.Hour, "run vacuum this often") + fs.DurationVar(&rc.mergeInterval, "merge_interval", time.Hour, "run merge this often") + fs.Int64Var(&rc.targetSize, "merge_target_size", getEnvWithDefaultInt64("SRC_TARGET_SIZE", 2000), "the target size of compound shards in MiB") + fs.Int64Var(&rc.minSize, "merge_min_size", getEnvWithDefaultInt64("SRC_MIN_SIZE", 1800), "the minimum size of a compound shard in MiB") + fs.StringVar(&rc.index, "index", getEnvWithDefaultString("DATA_DIR", build.DefaultDir), "set index directory to use") + fs.StringVar(&rc.listen, "listen", ":6072", "listen on this address.") + fs.StringVar(&rc.hostname, "hostname", hostnameBestEffort(), "the name we advertise to Sourcegraph when asking for the list of repositories to index. Can also be set via the NODE_NAME environment variable.") + fs.Float64Var(&rc.cpuFraction, "cpu_fraction", 1.0, "use this fraction of the cores for indexing.") + fs.BoolVar(&rc.dbg, "debug", srcLogLevelIsDebug(), "turn on more verbose logging.") + fs.IntVar(&rc.blockProfileRate, "block_profile_rate", getEnvWithDefaultInt("BLOCK_PROFILE_RATE", -1), "Sampling rate of Go's block profiler in nanoseconds. Values <=0 disable the blocking profiler Var(default). A value of 1 includes every blocking event. See https://pkg.go.dev/runtime#SetBlockProfileRate") +} + +func startServer(conf rootConfig) error { + s, err := newServer(conf) + if err != nil { + return err + } + + initializeGoogleCloudProfiler() + setCompoundShardCounter(s.IndexDir) + + if conf.listen != "" { + go func() { + mux := http.NewServeMux() + debugserver.AddHandlers(mux, true) + mux.Handle("/", s) + debug.Printf("serving HTTP on %s", conf.listen) + log.Fatal(http.ListenAndServe(conf.listen, mux)) + }() + } + + s.Run() + return nil +} + +func newServer(conf rootConfig) (*Server, error) { + if conf.cpuFraction <= 0.0 || conf.cpuFraction > 1.0 { + return nil, fmt.Errorf("cpu_fraction must be between 0.0 and 1.0") + } + if conf.index == "" { + return nil, fmt.Errorf("must set -index") + } + if conf.root == "" { + return nil, fmt.Errorf("must set -sourcegraph_url") + } + rootURL, err := url.Parse(conf.root) if err != nil { - log.Fatalf("url.Parse(%v): %v", *root, err) + return nil, fmt.Errorf("url.Parse(%v): %v", conf.root, err) } // Tune GOMAXPROCS to match Linux container CPU quota. @@ -787,9 +833,7 @@ func main() { // Set the sampling rate of Go's block profiler: https://github.com/DataDog/go-profiler-notes/blob/main/guide/README.md#block-profiler. // The block profiler is disabled by default. - if blockProfileRate != nil { - runtime.SetBlockProfileRate(*blockProfileRate) - } + runtime.SetBlockProfileRate(conf.blockProfileRate) // Automatically prepend our own path at the front, to minimize // required configuration. @@ -797,19 +841,17 @@ func main() { os.Setenv("PATH", filepath.Dir(l)+":"+os.Getenv("PATH")) } - if _, err := os.Stat(*index); err != nil { - if err := os.MkdirAll(*index, 0755); err != nil { - log.Fatalf("MkdirAll %s: %v", *index, err) + if _, err := os.Stat(conf.index); err != nil { + if err := os.MkdirAll(conf.index, 0755); err != nil { + return nil, fmt.Errorf("MkdirAll %s: %v", conf.index, err) } } - isDebugCmd := *debugList || *debugIndex != "" || *debugShard != "" || *debugMeta != "" || *debugMerge || *debugFind != "" || *debugListIndexed - - if err := setupTmpDir(*index, !isDebugCmd); err != nil { - log.Fatalf("failed to setup TMPDIR under %s: %v", *index, err) + if err := setupTmpDir(conf.index); err != nil { + return nil, fmt.Errorf("failed to setup TMPDIR under %s: %v", conf.index, err) } - if *dbg || isDebugCmd { + if conf.dbg { debug = log.New(os.Stderr, "", log.LstdFlags) } @@ -837,7 +879,7 @@ func main() { if v := os.Getenv("SRC_REPO_CONFIG_BATCH_SIZE"); v != "" { batchSize, err = strconv.Atoi(v) if err != nil { - log.Fatal("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") + return nil, fmt.Errorf("Invalid value for SRC_REPO_CONFIG_BATCH_SIZE, must be int") } } @@ -846,7 +888,7 @@ func main() { sg = &sourcegraphClient{ Root: rootURL, Client: client, - Hostname: *hostname, + Hostname: conf.hostname, BatchSize: batchSize, } } else { @@ -856,104 +898,25 @@ func main() { } } - cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (*cpuFraction))) + cpuCount := int(math.Round(float64(runtime.GOMAXPROCS(0)) * (conf.cpuFraction))) if cpuCount < 1 { cpuCount = 1 } - s := &Server{ + return &Server{ Sourcegraph: sg, - IndexDir: *index, - Interval: *interval, - VacuumInterval: *vacuumInterval, - MergeInterval: *mergeInterval, + IndexDir: conf.index, + Interval: conf.interval, + VacuumInterval: conf.vacuumInterval, + MergeInterval: conf.mergeInterval, CPUCount: cpuCount, - TargetSizeBytes: *targetSize * 1024 * 1024, - minSizeBytes: *minSize * 1024 * 1024, + TargetSizeBytes: conf.targetSize * 1024 * 1024, + minSizeBytes: conf.minSize * 1024 * 1024, shardMerging: zoekt.ShardMergingEnabled(), - } - - if *debugList { - repos, err := s.Sourcegraph.List(context.Background(), listIndexed(s.IndexDir)) - if err != nil { - log.Fatal(err) - } - for _, r := range repos.IDs { - fmt.Println(r) - } - os.Exit(0) - } - - if *debugListIndexed { - indexed := listIndexed(s.IndexDir) - for _, r := range indexed { - fmt.Println(r) - } - os.Exit(0) - } - - if *debugFind != "" { - args := indexArgs{ - IndexOptions: IndexOptions{ - Name: *debugFind, - }, - IndexDir: *index, - } - bo := args.BuildOptions() - for _, s := range bo.FindAllShards() { - fmt.Println(s) - } - os.Exit(0) - } - - if *debugIndex != "" { - id, err := strconv.Atoi(*debugIndex) - if err != nil { - log.Fatal(err) - } - msg, err := s.forceIndex(uint32(id)) - log.Println(msg) - if err != nil { - os.Exit(1) - } - os.Exit(0) - } - - if *debugShard != "" { - err = printShardStats(*debugShard) - if err != nil { - log.Fatal(err) - } - os.Exit(0) - } - - if *debugMeta != "" { - err = printMetaData(*debugMeta) - if err != nil { - log.Fatal(err) - } - os.Exit(0) - } - - if *debugMerge { - err = doMerge(*index, *targetSize*1024*1024, *debugMergeSimulate) - if err != nil { - log.Fatal(err) - } - os.Exit(0) - } - - initializeGoogleCloudProfiler() - setCompoundShardCounter(s.IndexDir) + }, err +} - if *listen != "" { - go func() { - mux := http.NewServeMux() - debugserver.AddHandlers(mux, true) - mux.Handle("/", s) - debug.Printf("serving HTTP on %s", *listen) - log.Fatal(http.ListenAndServe(*listen, mux)) - }() +func main() { + if err := rootCmd().ParseAndRun(context.Background(), os.Args[1:]); err != nil { + log.Fatal(err) } - - s.Run() } diff --git a/go.mod b/go.mod index 425c9c6cb..6cbabcf78 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/kylelemons/godebug v1.1.0 github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f github.com/opentracing/opentracing-go v1.2.0 + github.com/peterbourgon/ff/v3 v3.1.2 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/procfs v0.0.10 // indirect github.com/rs/xid v1.3.0 diff --git a/go.sum b/go.sum index adae31978..14a914e30 100644 --- a/go.sum +++ b/go.sum @@ -387,6 +387,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys= +github.com/peterbourgon/ff/v3 v3.1.2 h1:0GNhbRhO9yHA4CC27ymskOsuRpmX0YQxwxM9UPiP6JM= +github.com/peterbourgon/ff/v3 v3.1.2/go.mod h1:XNJLY8EIl6MjMVjBS4F0+G0LYoAqs0DTa4rmHHukKDE= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -440,8 +442,6 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= -github.com/sourcegraph/go-ctags v0.0.0-20210923201916-00b9c039141c h1:yE3O0BjqgifSyuyhnvvOuonOHZa8m58IJgqFEB07dR0= -github.com/sourcegraph/go-ctags v0.0.0-20210923201916-00b9c039141c/go.mod h1:ZYjpRXoJrRlxjU9ZfpaUKJkk62AjhJPffN3rlw2aqxM= github.com/sourcegraph/go-ctags v0.0.0-20220210084826-96f4236f0a78 h1:DY/m+6+PnBz49fMU7rE+DfPf09QYgO1RLPDeJ/1BY9w= github.com/sourcegraph/go-ctags v0.0.0-20220210084826-96f4236f0a78/go.mod h1:ZYjpRXoJrRlxjU9ZfpaUKJkk62AjhJPffN3rlw2aqxM= github.com/sourcegraph/go-diff v0.5.1/go.mod h1:j2dHj3m8aZgQO8lMTcTnBcXkRRRqi34cd2MNlA9u1mE=