Skip to content

Commit

Permalink
Add mayastor volumes as each entry to profile
Browse files Browse the repository at this point in the history
Put - save keys and Get - use keys
Get - use keys from key instead of separate arg
Put - flush before completing
Dump keys code; procedure: go build boltcli.go backend.go; it will generate boltcli binary which can be triggered like ./boltcli dumpkeys ~/bbolt.db key
  • Loading branch information
hasethuraman committed Jun 24, 2024
1 parent bdbbde9 commit b011303
Show file tree
Hide file tree
Showing 5 changed files with 479 additions and 24 deletions.
253 changes: 242 additions & 11 deletions tools/benchmark/cmd/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"bufio"
"context"
"encoding/binary"
"fmt"
Expand All @@ -28,6 +29,7 @@ import (
"go.etcd.io/etcd/pkg/v3/report"

"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/spf13/cobra"
"golang.org/x/time/rate"
"gopkg.in/cheggaaa/pb.v1"
Expand All @@ -41,6 +43,11 @@ var putCmd = &cobra.Command{
Run: putFunc,
}

type kv struct {
key string
value string
}

var (
keySize int
valSize int
Expand All @@ -50,11 +57,20 @@ var (

keySpaceSize int
seqKeys bool
mayastor bool

compactInterval time.Duration
compactIndexDelta int64

checkHashkv bool

saveKeysToFile string

msnodes int
msdisks int
mssnaps int

q = make(chan ([]kv), 10000)
)

func init() {
Expand All @@ -65,12 +81,174 @@ func init() {

putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests")
putCmd.Flags().IntVar(&keySpaceSize, "key-space-size", 1, "Maximum possible keys")
putCmd.Flags().StringVar(&saveKeysToFile, "saveKeysToFile", "", "Save keys to file")
putCmd.Flags().BoolVar(&mayastor, "mayastor", false, "Is mayastor testing")
putCmd.Flags().IntVar(&msnodes, "msnodes", 10, "Total number of nodes when mayastor flag is set")
putCmd.Flags().IntVar(&msdisks, "msdisks", 10, "Total number of disks across msnodes when mayastor flag is set")
putCmd.Flags().IntVar(&mssnaps, "mssnaps", 0, "Total number of snapshots across msnodes when mayastor flag is set")
putCmd.Flags().BoolVar(&seqKeys, "sequential-keys", false, "Use sequential keys")
putCmd.Flags().DurationVar(&compactInterval, "compact-interval", 0, `Interval to compact database (do not duplicate this with etcd's 'auto-compaction-retention' flag) (e.g. --compact-interval=5m compacts every 5-minute)`)
putCmd.Flags().Int64Var(&compactIndexDelta, "compact-index-delta", 1000, "Delta between current revision and compact revision (e.g. current revision 10000, compact at 9000)")
putCmd.Flags().BoolVar(&checkHashkv, "check-hashkv", false, "'true' to check hashkv")
}

func generateRandomIP() string {
rand.Seed(time.Now().UnixNano())
octet1 := rand.Intn(256)
octet2 := rand.Intn(256)
octet3 := rand.Intn(256)
octet4 := rand.Intn(256)
return fmt.Sprintf("%d.%d.%d.%d", octet1, octet2, octet3, octet4)
}

func generateRandomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
var result string
rand.Seed(time.Now().UnixNano())

for i := 0; i < length; i++ {
randomIndex := rand.Intn(len(charset))
result += string(charset[randomIndex])
}

return result
}

func populate_kvs() {
clusterid := "/openebs.io/mayastor/apis/v0/clusters/" + uuid.New().String()
println("Cluster ID : ", clusterid)

cn := uint(0)

kvs := []kv{}
for i := 0; i < msnodes; i++ {
k1 := fmt.Sprintf(clusterid+"/namespaces/default/NodeSpec/k8s-agentpool1-10232180-vmss00%05d", i)
v1 := fmt.Sprintf(
"{\"id\":\"k8s-agentpool1-10232180-vmss00%05d\",\"endpoint\":\"%s:10124\",\"labels\":{\"topology.kubernetestorage.csi.mayastor.com/zone\":\"\"},\"cordon_labels\":[]",
i,
generateRandomIP(),
)
kvs = append(kvs, kv{key: k1, value: v1})
}
if len(kvs) > 0 {
q <- kvs
}

kvs = []kv{}
for i := 0; i < msdisks; i++ {
cn = cn + 1
if cn >= uint(msnodes) {
cn = 0
}
agentpoold := fmt.Sprintf("k8s-agentpool1-10232180-%d", cn)
k1 := clusterid + "/namespaces/default/PoolSpec/csi-" + generateRandomString(5)
v1 := fmt.Sprintf(
"{\"node\":\"%s\",\"id\":\"csi-abcde\",\"disks\":[\"/abc/dis\"],\"status\":{\"Created\":\"Online\"},\"labels\":{\"default.cloud.org/storagedisk\":\"testdiskpoolfromcloud-request-xyz\",\"default.cloud.org/provisionedby\":\"someone-by-administring\",\"default.cloud.org/somepool\":\"testdiskpoolfromcloud-request-xyz-diskpool-abcde\",\"default.cloud.org/plumber\":\"somediskpool-provisioned-24gdd\",\"openebs.io/created-by\":\"operator-diskpool\",\"default.cloud.org/type-of-disk\":\"generallycloud\"},\"operation\":null,\"pooltype\":\"any\",\"revision\":300}",
agentpoold,
)
kvs = append(kvs, kv{key: k1, value: v1})
}
if len(kvs) > 0 {
q <- kvs
}

cn = uint(0)
for i := 0; i < putTotal; i++ {

agentpoold := fmt.Sprintf("k8s-agentpool1-10232180-%d", cn)
cn = cn + 1
if cn >= totalClients {
cn = 0
}
nexid := uuid.New().String()
volid := uuid.New().String()
replid := uuid.New().String()

k1 := clusterid + "/namespaces/default/NexusSpec/" + nexid
v1 := fmt.Sprintf(
"{\"uuid\":\"%s\",\"name\":\"%s\",\"node\":\"%s\",\"children\":[{\"Replica\":{\"uuid\":\"%s\",\"share_uri\":\"bdev:///%s?uuid=%s\"}}],\"size\":2147483648,\"spec_status\":{\"Created\":\"Online\"},\"share\":\"nvmf\",\"managed\":true,\"owner\":\"%s\",\"operation\":null}",
nexid, volid, agentpoold, replid, replid, replid, volid,
)

k2 := clusterid + "/namespaces/default/ReplicaSpec/" + replid
v2 := fmt.Sprintf(
"{\"name\":\"%s\",\"uuid\":\"%s\",\"size\":2147483648,\"pool\":\"diskpool-ppndp\",\"share\":\"none\",\"thin\":false,\"status\":{\"Created\":\"online\"},\"managed\":true,\"owners\":{\"volume\":\"%s\"},\"operation\":null}",
replid, replid, volid,
)

k3 := clusterid + "/namespaces/default/VolumeSpec/" + volid
v3 := fmt.Sprintf(
"{\"uuid\":\"%s\",\"size\":2147483648,\"labels\":null,\"num_replicas\":1,\"status\":{\"Created\":\"Online\"},\"target\":{\"node\":\"%s\",\"nexus\":\"%s\",\"protocol\":\"nvmf\"},\"policy\":{\"self_heal\":true},\"topology\":{\"node\":null,\"pool\":{\"Labelled\":{\"exclusion\":{},\"inclusion\":{\"openebs.io/created-by\":\"operator-diskpool\"}}}},\"last_nexus_id\":\"%s\",\"operation\":null,\"thin\":false}",
volid, agentpoold, nexid, nexid,
)

k4 := clusterid + "/namespaces/default/volume/" + volid + "/nexus/" + nexid + "/info"
v4 := fmt.Sprintf("{\"children\":[{\"healthy\":true,\"uuid\":\"%s\"}],\"clean_shutdown\":false}", replid)

kvs := []kv{}
kv1 := kv{}
kv1.key = k1
kv1.value = v1

kv2 := kv{}
kv2.key = k2
kv2.value = v2

kv3 := kv{}
kv3.key = k3
kv3.value = v3

kv4 := kv{}
kv4.key = k4
kv4.value = v4

kvs = append(kvs, kv1)
kvs = append(kvs, kv2)
kvs = append(kvs, kv3)
kvs = append(kvs, kv4)

if mssnaps > 0 {
snapid := uuid.New().String()
k5 := clusterid + "/namespaces/default/VolumeSnapshotSpec/" + volid + "@snapshot-" + snapid
v5 := fmt.Sprintf(
"{\"name\":\"%s\",\"volume_id\":\"%s\",\"nexus_id\":\"%s\",\"creation_timestamp\":\"2024-06-24T11:50:02.516046460Z\",\"status\":{\"Created\":null},\"operation\":null,\"revision\":200}",
volid+"@snapshot-"+snapid,
volid,
nexid,
)
kv5 := kv{}
kv5.key = k5
kv5.value = v5
kvs = append(kvs, kv5)

for j := 0; j < mssnaps; j++ {
repid := uuid.New().String()
repname := uuid.New().String()
k6 := clusterid + "/namespaces/default/ReplicaSnapshotSpec/" + repid
v6 := fmt.Sprintf(
"{\"name\":\"%s\",\"uuid\":\"%s\",\"snapshot_name\":\"%s\",\"source_id\":\"%s\",\"pool_id\":\"local-gll2d\",\"owner\":{\"volume_snapshot\":\"%s\"},\"status\":{\"Created\":null},\"operation\":null,\"revision\":200}",
repname+"@snapshot-"+snapid,
repid,
volid+"@snapshot-"+snapid,
repname,
volid+"@snapshot-"+snapid,
)

kv6 := kv{}
kv6.key = k6
kv6.value = v6
kvs = append(kvs, kv6)
q <- kvs
}
}

q <- kvs
}
kvs = []kv{}
kvs = append(kvs, kv{key: "END"})
q <- kvs
}

func putFunc(cmd *cobra.Command, args []string) {
if keySpaceSize <= 0 {
fmt.Fprintf(os.Stderr, "expected positive --key-space-size, got (%v)", keySpaceSize)
Expand All @@ -85,7 +263,15 @@ func putFunc(cmd *cobra.Command, args []string) {
clients := mustCreateClients(totalClients, totalConns)
k, v := make([]byte, keySize), string(mustRandBytes(valSize))

bar = pb.New(putTotal)
kt := putTotal
if mayastor {
kt = kt * 4
go populate_kvs()
time.Sleep(1 * time.Second)
fmt.Printf("KVs ready : %d\n", len(q))
}

bar = pb.New(kt)
bar.Format("Bom !")
bar.Start()

Expand All @@ -105,17 +291,62 @@ func putFunc(cmd *cobra.Command, args []string) {
}(clients[i])
}

go func() {
for i := 0; i < putTotal; i++ {
if seqKeys {
binary.PutVarint(k, int64(i%keySpaceSize))
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- v3.OpPut(string(k), v)
var writer *bufio.Writer
if saveKeysToFile != "" {
fn, err := os.OpenFile(saveKeysToFile, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
fmt.Printf("Error while opening file to save keys: %s", err.Error())
os.Exit(1)
}
close(requests)
}()
writer = bufio.NewWriter(fn)
defer fn.Close()
fmt.Print("Writer configured\n")
}

if mayastor {
go func() {
for kvs := range q {
for _, kv := range kvs {
if kv.key == "END" {
close(requests)
if writer != nil {
writer.Flush()
}
return
}
if writer != nil {
if _, err := writer.WriteString(kv.key + "\n"); err != nil {
fmt.Printf("Failed to write key : %s", err.Error())
os.Exit(1)
}
}
requests <- v3.OpPut(kv.key, kv.value)
}
}
close(requests)
}()
} else {
go func() {
for i := 0; i < putTotal; i++ {
if seqKeys {
binary.PutVarint(k, int64(i%keySpaceSize))
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
if writer != nil {
if _, err := writer.WriteString(string(k) + "\n"); err != nil {
fmt.Printf("Failed to write key : %s", err.Error())
os.Exit(1)
}
}
requests <- v3.OpPut(string(k), v)
}
close(requests)
}()
}
if writer != nil {
writer.Flush()
}

if compactInterval > 0 {
go func() {
Expand Down
48 changes: 42 additions & 6 deletions tools/benchmark/cmd/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
package cmd

import (
"bufio"
"context"
"fmt"
"math"
"os"
"os/exec"
"strconv"
"strings"
"time"

v3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -57,6 +61,24 @@ func rangeFunc(cmd *cobra.Command, args []string) {
}

k := args[0]
var scanner *bufio.Scanner
if _, err := os.Stat(k); err == nil {
fn, err := os.OpenFile(k, os.O_RDONLY, 0666)
if err != nil {
fmt.Printf("Error while opening file to read keys: %s", err.Error())
os.Exit(1)
}
scanner = bufio.NewScanner(fn)
defer fn.Close()
cmd := exec.Command("wc", "-l", k)
stdout, err := cmd.Output()
if err != nil {
fmt.Printf("Error while checking file for keys length: %s", err.Error())
os.Exit(1)
}
rangeTotal, _ = strconv.Atoi(strings.Split(strings.Split(string(stdout), "\n")[0], " ")[0])
}

end := ""
if len(args) == 2 {
end = args[1]
Expand Down Expand Up @@ -93,20 +115,34 @@ func rangeFunc(cmd *cobra.Command, args []string) {

st := time.Now()
_, err := c.Do(context.Background(), op)
// for _, kv := range resp.Get().Kvs {
// fmt.Printf("Key: %s, Value: %s \n", kv.Key, kv.Value)
// }
r.Results() <- report.Result{Err: err, Start: st, End: time.Now()}
bar.Increment()
}
}(clients[i])
}

go func() {
for i := 0; i < rangeTotal; i++ {
opts := []v3.OpOption{v3.WithRange(end)}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
if scanner != nil {
for scanner.Scan() {
opts := []v3.OpOption{v3.WithRange(end)}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
}
op := v3.OpGet(scanner.Text(), opts...)
requests <- op
}
} else {
for i := 0; i < rangeTotal; i++ {
opts := []v3.OpOption{v3.WithRange(end)}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
}
op := v3.OpGet(k, opts...)
requests <- op
}
op := v3.OpGet(k, opts...)
requests <- op
}
close(requests)
}()
Expand Down
Loading

0 comments on commit b011303

Please sign in to comment.