diff --git a/tools/benchmark/cmd/put.go b/tools/benchmark/cmd/put.go index a83d570812f3..f03a0a1fff37 100644 --- a/tools/benchmark/cmd/put.go +++ b/tools/benchmark/cmd/put.go @@ -15,6 +15,7 @@ package cmd import ( + "bufio" "context" "encoding/binary" "fmt" @@ -28,6 +29,7 @@ import ( "go.etcd.io/etcd/pkg/v3/report" "github.com/dustin/go-humanize" + "github.com/google/uuid" "github.com/spf13/cobra" "golang.org/x/time/rate" "gopkg.in/cheggaaa/pb.v1" @@ -41,6 +43,11 @@ var putCmd = &cobra.Command{ Run: putFunc, } +type kv struct { + key string + value string +} + var ( keySize int valSize int @@ -50,11 +57,20 @@ var ( keySpaceSize int seqKeys bool + mayastor bool compactInterval time.Duration compactIndexDelta int64 checkHashkv bool + + saveKeysToFile string + + msnodes int + msdisks int + mssnaps int + + q = make(chan ([]kv), 10000) ) func init() { @@ -65,12 +81,174 @@ func init() { putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests") putCmd.Flags().IntVar(&keySpaceSize, "key-space-size", 1, "Maximum possible keys") + putCmd.Flags().StringVar(&saveKeysToFile, "saveKeysToFile", "", "Save keys to file") + putCmd.Flags().BoolVar(&mayastor, "mayastor", false, "Is mayastor testing") + putCmd.Flags().IntVar(&msnodes, "msnodes", 10, "Total number of nodes when mayastor flag is set") + putCmd.Flags().IntVar(&msdisks, "msdisks", 10, "Total number of disks across msnodes when mayastor flag is set") + putCmd.Flags().IntVar(&mssnaps, "mssnaps", 0, "Total number of snapshots across msnodes when mayastor flag is set") putCmd.Flags().BoolVar(&seqKeys, "sequential-keys", false, "Use sequential keys") putCmd.Flags().DurationVar(&compactInterval, "compact-interval", 0, `Interval to compact database (do not duplicate this with etcd's 'auto-compaction-retention' flag) (e.g. --compact-interval=5m compacts every 5-minute)`) putCmd.Flags().Int64Var(&compactIndexDelta, "compact-index-delta", 1000, "Delta between current revision and compact revision (e.g. current revision 10000, compact at 9000)") putCmd.Flags().BoolVar(&checkHashkv, "check-hashkv", false, "'true' to check hashkv") } +func generateRandomIP() string { + rand.Seed(time.Now().UnixNano()) + octet1 := rand.Intn(256) + octet2 := rand.Intn(256) + octet3 := rand.Intn(256) + octet4 := rand.Intn(256) + return fmt.Sprintf("%d.%d.%d.%d", octet1, octet2, octet3, octet4) +} + +func generateRandomString(length int) string { + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + var result string + rand.Seed(time.Now().UnixNano()) + + for i := 0; i < length; i++ { + randomIndex := rand.Intn(len(charset)) + result += string(charset[randomIndex]) + } + + return result +} + +func populate_kvs() { + clusterid := "/openebs.io/mayastor/apis/v0/clusters/" + uuid.New().String() + println("Cluster ID : ", clusterid) + + cn := uint(0) + + kvs := []kv{} + for i := 0; i < msnodes; i++ { + k1 := fmt.Sprintf(clusterid+"/namespaces/default/NodeSpec/k8s-agentpool1-10232180-vmss00%05d", i) + v1 := fmt.Sprintf( + "{\"id\":\"k8s-agentpool1-10232180-vmss00%05d\",\"endpoint\":\"%s:10124\",\"labels\":{\"topology.kubernetestorage.csi.mayastor.com/zone\":\"\"},\"cordon_labels\":[]", + i, + generateRandomIP(), + ) + kvs = append(kvs, kv{key: k1, value: v1}) + } + if len(kvs) > 0 { + q <- kvs + } + + kvs = []kv{} + for i := 0; i < msdisks; i++ { + cn = cn + 1 + if cn >= uint(msnodes) { + cn = 0 + } + agentpoold := fmt.Sprintf("k8s-agentpool1-10232180-%d", cn) + k1 := clusterid + "/namespaces/default/PoolSpec/csi-" + generateRandomString(5) + v1 := fmt.Sprintf( + "{\"node\":\"%s\",\"id\":\"csi-abcde\",\"disks\":[\"/abc/dis\"],\"status\":{\"Created\":\"Online\"},\"labels\":{\"default.cloud.org/storagedisk\":\"testdiskpoolfromcloud-request-xyz\",\"default.cloud.org/provisionedby\":\"someone-by-administring\",\"default.cloud.org/somepool\":\"testdiskpoolfromcloud-request-xyz-diskpool-abcde\",\"default.cloud.org/plumber\":\"somediskpool-provisioned-24gdd\",\"openebs.io/created-by\":\"operator-diskpool\",\"default.cloud.org/type-of-disk\":\"generallycloud\"},\"operation\":null,\"pooltype\":\"any\",\"revision\":300}", + agentpoold, + ) + kvs = append(kvs, kv{key: k1, value: v1}) + } + if len(kvs) > 0 { + q <- kvs + } + + cn = uint(0) + for i := 0; i < putTotal; i++ { + + agentpoold := fmt.Sprintf("k8s-agentpool1-10232180-%d", cn) + cn = cn + 1 + if cn >= totalClients { + cn = 0 + } + nexid := uuid.New().String() + volid := uuid.New().String() + replid := uuid.New().String() + + k1 := clusterid + "/namespaces/default/NexusSpec/" + nexid + v1 := fmt.Sprintf( + "{\"uuid\":\"%s\",\"name\":\"%s\",\"node\":\"%s\",\"children\":[{\"Replica\":{\"uuid\":\"%s\",\"share_uri\":\"bdev:///%s?uuid=%s\"}}],\"size\":2147483648,\"spec_status\":{\"Created\":\"Online\"},\"share\":\"nvmf\",\"managed\":true,\"owner\":\"%s\",\"operation\":null}", + nexid, volid, agentpoold, replid, replid, replid, volid, + ) + + k2 := clusterid + "/namespaces/default/ReplicaSpec/" + replid + v2 := fmt.Sprintf( + "{\"name\":\"%s\",\"uuid\":\"%s\",\"size\":2147483648,\"pool\":\"diskpool-ppndp\",\"share\":\"none\",\"thin\":false,\"status\":{\"Created\":\"online\"},\"managed\":true,\"owners\":{\"volume\":\"%s\"},\"operation\":null}", + replid, replid, volid, + ) + + k3 := clusterid + "/namespaces/default/VolumeSpec/" + volid + v3 := fmt.Sprintf( + "{\"uuid\":\"%s\",\"size\":2147483648,\"labels\":null,\"num_replicas\":1,\"status\":{\"Created\":\"Online\"},\"target\":{\"node\":\"%s\",\"nexus\":\"%s\",\"protocol\":\"nvmf\"},\"policy\":{\"self_heal\":true},\"topology\":{\"node\":null,\"pool\":{\"Labelled\":{\"exclusion\":{},\"inclusion\":{\"openebs.io/created-by\":\"operator-diskpool\"}}}},\"last_nexus_id\":\"%s\",\"operation\":null,\"thin\":false}", + volid, agentpoold, nexid, nexid, + ) + + k4 := clusterid + "/namespaces/default/volume/" + volid + "/nexus/" + nexid + "/info" + v4 := fmt.Sprintf("{\"children\":[{\"healthy\":true,\"uuid\":\"%s\"}],\"clean_shutdown\":false}", replid) + + kvs := []kv{} + kv1 := kv{} + kv1.key = k1 + kv1.value = v1 + + kv2 := kv{} + kv2.key = k2 + kv2.value = v2 + + kv3 := kv{} + kv3.key = k3 + kv3.value = v3 + + kv4 := kv{} + kv4.key = k4 + kv4.value = v4 + + kvs = append(kvs, kv1) + kvs = append(kvs, kv2) + kvs = append(kvs, kv3) + kvs = append(kvs, kv4) + + if mssnaps > 0 { + snapid := uuid.New().String() + k5 := clusterid + "/namespaces/default/VolumeSnapshotSpec/" + volid + "@snapshot-" + snapid + v5 := fmt.Sprintf( + "{\"name\":\"%s\",\"volume_id\":\"%s\",\"nexus_id\":\"%s\",\"creation_timestamp\":\"2024-06-24T11:50:02.516046460Z\",\"status\":{\"Created\":null},\"operation\":null,\"revision\":200}", + volid+"@snapshot-"+snapid, + volid, + nexid, + ) + kv5 := kv{} + kv5.key = k5 + kv5.value = v5 + kvs = append(kvs, kv5) + + for j := 0; j < mssnaps; j++ { + repid := uuid.New().String() + repname := uuid.New().String() + k6 := clusterid + "/namespaces/default/ReplicaSnapshotSpec/" + repid + v6 := fmt.Sprintf( + "{\"name\":\"%s\",\"uuid\":\"%s\",\"snapshot_name\":\"%s\",\"source_id\":\"%s\",\"pool_id\":\"local-gll2d\",\"owner\":{\"volume_snapshot\":\"%s\"},\"status\":{\"Created\":null},\"operation\":null,\"revision\":200}", + repname+"@snapshot-"+snapid, + repid, + volid+"@snapshot-"+snapid, + repname, + volid+"@snapshot-"+snapid, + ) + + kv6 := kv{} + kv6.key = k6 + kv6.value = v6 + kvs = append(kvs, kv6) + q <- kvs + } + } + + q <- kvs + } + kvs = []kv{} + kvs = append(kvs, kv{key: "END"}) + q <- kvs +} + func putFunc(cmd *cobra.Command, args []string) { if keySpaceSize <= 0 { fmt.Fprintf(os.Stderr, "expected positive --key-space-size, got (%v)", keySpaceSize) @@ -85,7 +263,15 @@ func putFunc(cmd *cobra.Command, args []string) { clients := mustCreateClients(totalClients, totalConns) k, v := make([]byte, keySize), string(mustRandBytes(valSize)) - bar = pb.New(putTotal) + kt := putTotal + if mayastor { + kt = kt * 4 + go populate_kvs() + time.Sleep(1 * time.Second) + fmt.Printf("KVs ready : %d\n", len(q)) + } + + bar = pb.New(kt) bar.Format("Bom !") bar.Start() @@ -105,17 +291,62 @@ func putFunc(cmd *cobra.Command, args []string) { }(clients[i]) } - go func() { - for i := 0; i < putTotal; i++ { - if seqKeys { - binary.PutVarint(k, int64(i%keySpaceSize)) - } else { - binary.PutVarint(k, int64(rand.Intn(keySpaceSize))) - } - requests <- v3.OpPut(string(k), v) + var writer *bufio.Writer + if saveKeysToFile != "" { + fn, err := os.OpenFile(saveKeysToFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + fmt.Printf("Error while opening file to save keys: %s", err.Error()) + os.Exit(1) } - close(requests) - }() + writer = bufio.NewWriter(fn) + defer fn.Close() + fmt.Print("Writer configured\n") + } + + if mayastor { + go func() { + for kvs := range q { + for _, kv := range kvs { + if kv.key == "END" { + close(requests) + if writer != nil { + writer.Flush() + } + return + } + if writer != nil { + if _, err := writer.WriteString(kv.key + "\n"); err != nil { + fmt.Printf("Failed to write key : %s", err.Error()) + os.Exit(1) + } + } + requests <- v3.OpPut(kv.key, kv.value) + } + } + close(requests) + }() + } else { + go func() { + for i := 0; i < putTotal; i++ { + if seqKeys { + binary.PutVarint(k, int64(i%keySpaceSize)) + } else { + binary.PutVarint(k, int64(rand.Intn(keySpaceSize))) + } + if writer != nil { + if _, err := writer.WriteString(string(k) + "\n"); err != nil { + fmt.Printf("Failed to write key : %s", err.Error()) + os.Exit(1) + } + } + requests <- v3.OpPut(string(k), v) + } + close(requests) + }() + } + if writer != nil { + writer.Flush() + } if compactInterval > 0 { go func() { diff --git a/tools/benchmark/cmd/range.go b/tools/benchmark/cmd/range.go index fc503b71bf8a..1e1b470e6b96 100644 --- a/tools/benchmark/cmd/range.go +++ b/tools/benchmark/cmd/range.go @@ -15,10 +15,14 @@ package cmd import ( + "bufio" "context" "fmt" "math" "os" + "os/exec" + "strconv" + "strings" "time" v3 "go.etcd.io/etcd/client/v3" @@ -57,6 +61,24 @@ func rangeFunc(cmd *cobra.Command, args []string) { } k := args[0] + var scanner *bufio.Scanner + if _, err := os.Stat(k); err == nil { + fn, err := os.OpenFile(k, os.O_RDONLY, 0666) + if err != nil { + fmt.Printf("Error while opening file to read keys: %s", err.Error()) + os.Exit(1) + } + scanner = bufio.NewScanner(fn) + defer fn.Close() + cmd := exec.Command("wc", "-l", k) + stdout, err := cmd.Output() + if err != nil { + fmt.Printf("Error while checking file for keys length: %s", err.Error()) + os.Exit(1) + } + rangeTotal, _ = strconv.Atoi(strings.Split(strings.Split(string(stdout), "\n")[0], " ")[0]) + } + end := "" if len(args) == 2 { end = args[1] @@ -93,6 +115,9 @@ func rangeFunc(cmd *cobra.Command, args []string) { st := time.Now() _, err := c.Do(context.Background(), op) + // for _, kv := range resp.Get().Kvs { + // fmt.Printf("Key: %s, Value: %s \n", kv.Key, kv.Value) + // } r.Results() <- report.Result{Err: err, Start: st, End: time.Now()} bar.Increment() } @@ -100,13 +125,24 @@ func rangeFunc(cmd *cobra.Command, args []string) { } go func() { - for i := 0; i < rangeTotal; i++ { - opts := []v3.OpOption{v3.WithRange(end)} - if rangeConsistency == "s" { - opts = append(opts, v3.WithSerializable()) + if scanner != nil { + for scanner.Scan() { + opts := []v3.OpOption{v3.WithRange(end)} + if rangeConsistency == "s" { + opts = append(opts, v3.WithSerializable()) + } + op := v3.OpGet(scanner.Text(), opts...) + requests <- op + } + } else { + for i := 0; i < rangeTotal; i++ { + opts := []v3.OpOption{v3.WithRange(end)} + if rangeConsistency == "s" { + opts = append(opts, v3.WithSerializable()) + } + op := v3.OpGet(k, opts...) + requests <- op } - op := v3.OpGet(k, opts...) - requests <- op } close(requests) }() diff --git a/tools/etcd-dump-db/backend.go b/tools/etcd-dump-db/backend.go index 3fe609d9cc44..57540b7f2fcc 100644 --- a/tools/etcd-dump-db/backend.go +++ b/tools/etcd-dump-db/backend.go @@ -52,6 +52,7 @@ func getBuckets(dbPath string) (buckets []string, err error) { // TODO: import directly from packages, rather than copy&paste type decoder func(k, v []byte) +type decoder_get func(k, v []byte) (revision, string, string, int64) var decoders = map[string]decoder{ "key": keyDecoder, @@ -61,6 +62,14 @@ var decoders = map[string]decoder{ "authUsers": authUsersDecoder, } +var decoders_get = map[string]decoder_get{ + "key": keyDecoderReturn, + // "lease": leaseDecoder, + // "auth": authDecoder, + // "authRoles": authRolesDecoder, + // "authUsers": authUsersDecoder, +} + type revision struct { main int64 sub int64 @@ -82,6 +91,15 @@ func keyDecoder(k, v []byte) { fmt.Printf("rev=%+v, value=[key %q | val %q | created %d | mod %d | ver %d]\n", rev, string(kv.Key), string(kv.Value), kv.CreateRevision, kv.ModRevision, kv.Version) } +func keyDecoderReturn(k, v []byte) (revision, string, string, int64) { + rev := bytesToRev(k) + var kv mvccpb.KeyValue + if err := kv.Unmarshal(v); err != nil { + panic(err) + } + return rev, string(kv.Key), string(kv.Value), kv.Version +} + func bytesToLeaseID(bytes []byte) int64 { if len(bytes) != 8 { panic(fmt.Errorf("lease ID must be 8-byte")) diff --git a/tools/etcd-dump-db/boltcli.go b/tools/etcd-dump-db/boltcli.go new file mode 100644 index 000000000000..73a170509543 --- /dev/null +++ b/tools/etcd-dump-db/boltcli.go @@ -0,0 +1,171 @@ +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + "os" + + bbolt "go.etcd.io/bbolt" + bolt "go.etcd.io/bbolt" + "go.etcd.io/etcd/mvcc/mvccpb" +) + +func Buckets(path string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + fmt.Println(err) + return + } + + db, err := bbolt.Open(path, 0600, nil) + if err != nil { + fmt.Println(err) + return + } + defer db.Close() + + err = db.View(func(tx *bbolt.Tx) error { + return tx.ForEach(func(name []byte, _ *bbolt.Bucket) error { + fmt.Println(string(name)) + return nil + }) + }) + if err != nil { + fmt.Println(err) + return + } +} + +var flockTimeout time.Duration + +func dumpkeys(path string, bucket string, prefix string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + fmt.Println(err) + return + } + + db, err := bbolt.Open(path, 0600, nil) + if err != nil { + fmt.Println(err) + return + } + defer db.Close() + db.View(func(tx *bbolt.Tx) error { + // Assume bucket exists and has keys + decode := true + b := tx.Bucket([]byte(bucket)) + + // c := b.Cursor() + + // if prefix != "" { + // prefixbytes := []byte(prefix) + // for k, v := c.Seek(prefixbytes); k != nil && bytes.HasPrefix(k, prefixbytes); k, v = c.Next() { + // fmt.Printf("key=%s\n", k, v) + // } + // return nil + // } + + //not working + //for k, _ := c.First(); k != nil; k, _ = c.Next() { + // kstring := bytes.NewBuffer(k).String() + // fmt.Printf("%s\n", kstring) + //} + + //working + b.ForEach(func(k, v []byte) error { + //fmt.Printf("%s\n", bytes.NewBuffer(k).String()) + // fmt.Printf("key=%s, \nvalue=%s\n", bytes.NewBuffer(k), bytes.NewBuffer(v)) + + if dec, ok := decoders_get[bucket]; decode && ok { + revision, k, v, rev := dec(k, v) + fmt.Printf("(%+v) Retrieved key=%q, value=%q, revision=%d\n", revision, k, v, rev) + } else { + fmt.Printf("key=%q, value=%q\n", k, v) + } + + return nil + }) + return nil + }) + if err != nil { + fmt.Println(err) + return + } +} + +// IMPORTANT - dependency from etcd server code +const revBytesLen = 8 + 1 + 8 +const markedRevBytesLen = revBytesLen + 1 + +func revToBytes(rev revision, bytes []byte) { + binary.BigEndian.PutUint64(bytes, uint64(rev.main)) + bytes[8] = '_' + binary.BigEndian.PutUint64(bytes[9:], uint64(rev.sub)) +} + +func addAKeyValue(path string, bucket string, key string, value string) { + if _, err := os.Stat(path); os.IsNotExist(err) { + fmt.Println(err) + return + } + + db, err := bbolt.Open(path, 0600, nil) + if err != nil { + fmt.Println(err) + return + } + defer db.Close() + db.Update(func(tx *bolt.Tx) error { + // Retrieve the users bucket. + // This should be created when the DB is first opened. + b := tx.Bucket([]byte(bucket)) + + ibytes := make([]byte, revBytesLen, markedRevBytesLen) + idxRev := revision{main: 2, sub: int64(0)} + revToBytes(idxRev, ibytes) + + kv := mvccpb.KeyValue{ + Key: bytes.NewBufferString(key).Bytes(), + Value: bytes.NewBufferString(value).Bytes(), + CreateRevision: 0, + ModRevision: 0, + Version: 1, + Lease: int64(0), + } + + d, err := kv.Marshal() + + if err != nil { + fmt.Println(err) + return err + } + + // Persist bytes to users bucket. + return b.Put(ibytes, d) + }) +} + +// note: take a copy of db (or snapshot also) and then do otherwise it wont work +// ./boltcli listbuckets /tmp/db +// ./boltcli dumpkeys /tmp/db [prefix] +// ./boltcli addkey /tmp/db +func main() { + if os.Args[1] == "listbuckets" { + Buckets(os.Args[2]) + } else if os.Args[1] == "dumpkeys" { + prefix := "" + if len(os.Args) > 4 { + prefix = os.Args[4] + } + dumpkeys(os.Args[2], os.Args[3], prefix) + } else if os.Args[1] == "addkey" { + if len(os.Args) < 6 { + print("Usage addkey /tmp/db ") + return + } + key := os.Args[4] + value := os.Args[5] + addAKeyValue(os.Args[2], os.Args[3], key, value) + } +} diff --git a/tools/etcd-dump-db/main.go b/tools/etcd-dump-db/main.go index f82d91f76892..e57f779968dc 100644 --- a/tools/etcd-dump-db/main.go +++ b/tools/etcd-dump-db/main.go @@ -17,7 +17,6 @@ package main import ( "fmt" "log" - "os" "path/filepath" "strings" "time" @@ -61,12 +60,12 @@ func init() { rootCommand.AddCommand(getHashCommand) } -func main() { - if err := rootCommand.Execute(); err != nil { - fmt.Fprintln(os.Stdout, err) - os.Exit(1) - } -} +// func main() { +// if err := rootCommand.Execute(); err != nil { +// fmt.Fprintln(os.Stdout, err) +// os.Exit(1) +// } +// } func listBucketCommandFunc(cmd *cobra.Command, args []string) { if len(args) < 1 {