Skip to content

Commit

Permalink
Add mayastor volumes as each entry to profile
Browse files Browse the repository at this point in the history
Put - save keys and Get - use keys

Get - use keys from key instead of separate arg

Put - flush before completing

Dump keys code; procedure: go build boltcli.go backend.go; it will generate boltcli binary which can be triggered like ./boltcli dumpkeys ~/bbolt.db key
  • Loading branch information
hasethuraman committed Jun 24, 2024
1 parent bdbbde9 commit a6c96d1
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 24 deletions.
144 changes: 133 additions & 11 deletions tools/benchmark/cmd/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"bufio"
"context"
"encoding/binary"
"fmt"
Expand All @@ -28,6 +29,7 @@ import (
"go.etcd.io/etcd/pkg/v3/report"

"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/spf13/cobra"
"golang.org/x/time/rate"
"gopkg.in/cheggaaa/pb.v1"
Expand All @@ -41,6 +43,11 @@ var putCmd = &cobra.Command{
Run: putFunc,
}

type kv struct {
key string
value string
}

var (
keySize int
valSize int
Expand All @@ -50,11 +57,16 @@ var (

keySpaceSize int
seqKeys bool
mayastor bool

compactInterval time.Duration
compactIndexDelta int64

checkHashkv bool

saveKeysToFile string

q = make(chan ([]kv), 10000)
)

func init() {
Expand All @@ -65,12 +77,69 @@ func init() {

putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests")
putCmd.Flags().IntVar(&keySpaceSize, "key-space-size", 1, "Maximum possible keys")
putCmd.Flags().StringVar(&saveKeysToFile, "saveKeysToFile", "", "Save keys to file")
putCmd.Flags().BoolVar(&mayastor, "mayastor", false, "Is mayastor testing")
putCmd.Flags().BoolVar(&seqKeys, "sequential-keys", false, "Use sequential keys")
putCmd.Flags().DurationVar(&compactInterval, "compact-interval", 0, `Interval to compact database (do not duplicate this with etcd's 'auto-compaction-retention' flag) (e.g. --compact-interval=5m compacts every 5-minute)`)
putCmd.Flags().Int64Var(&compactIndexDelta, "compact-index-delta", 1000, "Delta between current revision and compact revision (e.g. current revision 10000, compact at 9000)")
putCmd.Flags().BoolVar(&checkHashkv, "check-hashkv", false, "'true' to check hashkv")
}

func populate_kvs() {
clusterid := uuid.New().String()
cn := uint(0)

for i := 0; i < putTotal; i++ {

agentpoold := fmt.Sprintf("k8s-agentpool1-10232180-%d", cn)
cn = cn + 1
if cn >= totalClients {
cn = 0
}
nexid := uuid.New().String()
volid := uuid.New().String()
replid := uuid.New().String()

k1 := "/openebs.io/mayastor/apis/v0/clusters/" + clusterid + "/namespaces/azstor/NexusSpec/" + nexid
v1 := fmt.Sprintf("{\"uuid\":\"%s\",\"name\":\"%s\",\"node\":\"%s\",\"children\":[{\"Replica\":{\"uuid\":\"%s\",\"share_uri\":\"bdev:///%s?uuid=%s\"}}],\"size\":2147483648,\"spec_status\":{\"Created\":\"Online\"},\"share\":\"nvmf\",\"managed\":true,\"owner\":\"%s\",\"operation\":null}", nexid, volid, agentpoold, replid, replid, replid, volid)

k2 := "/openebs.io/mayastor/apis/v0/clusters/" + clusterid + "/namespaces/azstor/ReplicaSpec/" + replid
v2 := fmt.Sprintf("{\"name\":\"%s\",\"uuid\":\"%s\",\"size\":2147483648,\"pool\":\"diskpool-ppndp\",\"share\":\"none\",\"thin\":false,\"status\":{\"Created\":\"online\"},\"managed\":true,\"owners\":{\"volume\":\"%s\"},\"operation\":null}", replid, replid, volid)

k3 := "/openebs.io/mayastor/apis/v0/clusters/" + clusterid + "/namespaces/azstor/VolumeSpec/" + volid
v3 := fmt.Sprintf("{\"uuid\":\"%s\",\"size\":2147483648,\"labels\":null,\"num_replicas\":1,\"status\":{\"Created\":\"Online\"},\"target\":{\"node\":\"%s\",\"nexus\":\"%s\",\"protocol\":\"nvmf\"},\"policy\":{\"self_heal\":true},\"topology\":{\"node\":null,\"pool\":{\"Labelled\":{\"exclusion\":{},\"inclusion\":{\"openebs.io/created-by\":\"operator-diskpool\"}}}},\"last_nexus_id\":\"%s\",\"operation\":null,\"thin\":false}", volid, agentpoold, nexid, nexid)

k4 := "/openebs.io/mayastor/apis/v0/clusters/" + clusterid + "/namespaces/azstor/volume/" + volid + "/nexus/" + nexid + "/info"
v4 := fmt.Sprintf("{\"children\":[{\"healthy\":true,\"uuid\":\"%s\"}],\"clean_shutdown\":false}", replid)

kvs := []kv{}
kv1 := kv{}
kv1.key = k1
kv1.value = v1

kv2 := kv{}
kv2.key = k2
kv2.value = v2

kv3 := kv{}
kv3.key = k3
kv3.value = v3

kv4 := kv{}
kv4.key = k4
kv4.value = v4

kvs = append(kvs, kv1)
kvs = append(kvs, kv2)
kvs = append(kvs, kv3)
kvs = append(kvs, kv4)
q <- kvs
}
kvs := []kv{}
kvs = append(kvs, kv{key: "END"})
q <- kvs
}

func putFunc(cmd *cobra.Command, args []string) {
if keySpaceSize <= 0 {
fmt.Fprintf(os.Stderr, "expected positive --key-space-size, got (%v)", keySpaceSize)
Expand All @@ -85,7 +154,15 @@ func putFunc(cmd *cobra.Command, args []string) {
clients := mustCreateClients(totalClients, totalConns)
k, v := make([]byte, keySize), string(mustRandBytes(valSize))

bar = pb.New(putTotal)
kt := putTotal
if mayastor {
kt = kt * 4
go populate_kvs()
time.Sleep(1 * time.Second)
fmt.Printf("KVs ready : %d\n", len(q))
}

bar = pb.New(kt)
bar.Format("Bom !")
bar.Start()

Expand All @@ -105,17 +182,62 @@ func putFunc(cmd *cobra.Command, args []string) {
}(clients[i])
}

go func() {
for i := 0; i < putTotal; i++ {
if seqKeys {
binary.PutVarint(k, int64(i%keySpaceSize))
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- v3.OpPut(string(k), v)
var writer *bufio.Writer
if saveKeysToFile != "" {
fn, err := os.OpenFile(saveKeysToFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
fmt.Printf("Error while opening file to save keys: %s", err.Error())
os.Exit(1)
}
close(requests)
}()
writer = bufio.NewWriter(fn)
defer fn.Close()
fmt.Print("Writer configured\n")
}

if mayastor {
go func() {
for kvs := range q {
for _, kv := range kvs {
if kv.key == "END" {
close(requests)
if writer != nil {
writer.Flush()
}
return
}
if writer != nil {
if _, err := writer.WriteString(kv.key + "\n"); err != nil {
fmt.Printf("Failed to write key : %s", err.Error())
os.Exit(1)
}
}
requests <- v3.OpPut(kv.key, kv.value)
}
}
close(requests)
}()
} else {
go func() {
for i := 0; i < putTotal; i++ {
if seqKeys {
binary.PutVarint(k, int64(i%keySpaceSize))
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
if writer != nil {
if _, err := writer.WriteString(string(k) + "\n"); err != nil {
fmt.Printf("Failed to write key : %s", err.Error())
os.Exit(1)
}
}
requests <- v3.OpPut(string(k), v)
}
close(requests)
}()
}
if writer != nil {
writer.Flush()
}

if compactInterval > 0 {
go func() {
Expand Down
48 changes: 42 additions & 6 deletions tools/benchmark/cmd/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
package cmd

import (
"bufio"
"context"
"fmt"
"math"
"os"
"os/exec"
"strconv"
"strings"
"time"

v3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -57,6 +61,24 @@ func rangeFunc(cmd *cobra.Command, args []string) {
}

k := args[0]
var scanner *bufio.Scanner
if _, err := os.Stat(k); err == nil {
fn, err := os.OpenFile(k, os.O_RDONLY, 0666)
if err != nil {
fmt.Printf("Error while opening file to read keys: %s", err.Error())
os.Exit(1)
}
scanner = bufio.NewScanner(fn)
defer fn.Close()
cmd := exec.Command("wc", "-l", k)
stdout, err := cmd.Output()
if err != nil {
fmt.Printf("Error while checking file for keys length: %s", err.Error())
os.Exit(1)
}
rangeTotal, _ = strconv.Atoi(strings.Split(strings.Split(string(stdout), "\n")[0], " ")[0])
}

end := ""
if len(args) == 2 {
end = args[1]
Expand Down Expand Up @@ -93,20 +115,34 @@ func rangeFunc(cmd *cobra.Command, args []string) {

st := time.Now()
_, err := c.Do(context.Background(), op)
// for _, kv := range resp.Get().Kvs {
// fmt.Printf("Key: %s, Value: %s \n", kv.Key, kv.Value)
// }
r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
bar.Increment()
}
}(clients[i])
}

go func() {
for i := 0; i < rangeTotal; i++ {
opts := []v3.OpOption{v3.WithRange(end)}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
if scanner != nil {
for scanner.Scan() {
opts := []v3.OpOption{v3.WithRange(end)}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
}
op := v3.OpGet(scanner.Text(), opts...)
requests <- op
}
} else {
for i := 0; i < rangeTotal; i++ {
opts := []v3.OpOption{v3.WithRange(end)}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
}
op := v3.OpGet(k, opts...)
requests <- op
}
op := v3.OpGet(k, opts...)
requests <- op
}
close(requests)
}()
Expand Down
18 changes: 18 additions & 0 deletions tools/etcd-dump-db/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func getBuckets(dbPath string) (buckets []string, err error) {
// TODO: import directly from packages, rather than copy&paste

type decoder func(k, v []byte)
type decoder_get func(k, v []byte) (revision, string, string, int64)

var decoders = map[string]decoder{
"key": keyDecoder,
Expand All @@ -61,6 +62,14 @@ var decoders = map[string]decoder{
"authUsers": authUsersDecoder,
}

var decoders_get = map[string]decoder_get{
"key": keyDecoderReturn,
// "lease": leaseDecoder,
// "auth": authDecoder,
// "authRoles": authRolesDecoder,
// "authUsers": authUsersDecoder,
}

type revision struct {
main int64
sub int64
Expand All @@ -82,6 +91,15 @@ func keyDecoder(k, v []byte) {
fmt.Printf("rev=%+v, value=[key %q | val %q | created %d | mod %d | ver %d]\n", rev, string(kv.Key), string(kv.Value), kv.CreateRevision, kv.ModRevision, kv.Version)
}

func keyDecoderReturn(k, v []byte) (revision, string, string, int64) {
rev := bytesToRev(k)
var kv mvccpb.KeyValue
if err := kv.Unmarshal(v); err != nil {
panic(err)
}
return rev, string(kv.Key), string(kv.Value), kv.Version
}

func bytesToLeaseID(bytes []byte) int64 {
if len(bytes) != 8 {
panic(fmt.Errorf("lease ID must be 8-byte"))
Expand Down
Loading

0 comments on commit a6c96d1

Please sign in to comment.