From 38f156aeb091d95fd349f865567d02d77d2752d5 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Tue, 27 Sep 2022 15:41:41 +0530 Subject: [PATCH] implementing INFO and allkeys-random key eviction --- README.md | 16 ++++++++++++++ config/main.go | 8 +++++-- core/eval.go | 25 +++++++++++++++++++++ core/eviction.go | 23 +++++++++++++++++-- core/expire.go | 2 +- core/stats.go | 7 ++++++ core/store.go | 7 +++++- storm/set/main.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 138 insertions(+), 6 deletions(-) create mode 100644 core/stats.go create mode 100644 storm/set/main.go diff --git a/README.md b/README.md index dc137d0e4..c339dc940 100644 --- a/README.md +++ b/README.md @@ -15,3 +15,19 @@ GET k: *2\r\n$3\r\nGET\r\n$1\r\nk\r\n $ (printf 'CMD1CMD2CMD3';) | nc localhost 7379 $ (printf '*1\r\n$4\r\nPING\r\n*3\r\n$3\r\nSET\r\n$1\r\nk\r\n$1\r\nv\r\n*2\r\n$3\r\nGET\r\n$1\r\nk\r\n';) | nc localhost 7379 ``` + +## Storm + +Storm is a series of utility that allows us to fire bulk request to the Dice Database + +``` +$ go run storm/set/main.go +``` + +## Monitoring through Prometheus + +``` +$ ./redis_exporter -redis.addr redis://localhost:7379 +$ ./prometheus --web.enable-admin-api +$ curl -X POST -g 'http://localhost:9090/api/v1/admin/tsdb/delete_series?match[]={instance="localhost:9121"}' +``` diff --git a/config/main.go b/config/main.go index 96b8b3482..fb6bffe6f 100644 --- a/config/main.go +++ b/config/main.go @@ -2,7 +2,11 @@ package config var Host string = "0.0.0.0" var Port int = 7379 -var KeysLimit int = 5 -var EvictionStrategy string = "simple-first" +var KeysLimit int = 100 + +// Will evict EvictionRatio of keys whenever eviction runs +var EvictionRatio float64 = 0.40 + +var EvictionStrategy string = "allkeys-random" var AOFFile string = "./dice-master.aof" diff --git a/core/eval.go b/core/eval.go index 58ed4f3b1..790f068c6 100644 --- a/core/eval.go +++ b/core/eval.go @@ -3,6 +3,7 @@ package core import ( "bytes" "errors" + "fmt" "io" "strconv" "time" @@ -189,6 +190,24 @@ func evalINCR(args []string) []byte { return Encode(i, false) } +func evalINFO(args []string) []byte { + var info []byte + buf := bytes.NewBuffer(info) + buf.WriteString("# Keyspace\r\n") + for i := range KeyspaceStat { + buf.WriteString(fmt.Sprintf("db%d:keys=%d,expires=0,avg_ttl=0\r\n", i, KeyspaceStat[i]["keys"])) + } + return Encode(buf.String(), false) +} + +func evalCLIENT(args []string) []byte { + return RESP_OK +} + +func evalLATENCY(args []string) []byte { + return Encode([]string{}, false) +} + func EvalAndRespond(cmds RedisCmds, c io.ReadWriter) { var response []byte buf := bytes.NewBuffer(response) @@ -211,6 +230,12 @@ func EvalAndRespond(cmds RedisCmds, c io.ReadWriter) { buf.Write(evalBGREWRITEAOF(cmd.Args)) case "INCR": buf.Write(evalINCR(cmd.Args)) + case "INFO": + buf.Write(evalINFO(cmd.Args)) + case "CLIENT": + buf.Write(evalCLIENT(cmd.Args)) + case "LATENCY": + buf.Write(evalLATENCY(cmd.Args)) default: buf.Write(evalPING(cmd.Args)) } diff --git a/core/eviction.go b/core/eviction.go index d7fa220ef..8805e8e29 100644 --- a/core/eviction.go +++ b/core/eviction.go @@ -1,21 +1,40 @@ package core -import "github.com/dicedb/dice/config" +import ( + "github.com/dicedb/dice/config" +) // Evicts the first key it found while iterating the map // TODO: Make it efficient by doing thorough sampling func evictFirst() { for k := range store { - delete(store, k) + Del(k) return } } +// Randomly removes keys to make space for the new data added. +// The number of keys removed will be sufficient to free up least 10% space +func evictAllkeysRandom() { + evictCount := int64(config.EvictionRatio * float64(config.KeysLimit)) + // Iteration of Golang dictionary can be considered as a random + // because it depends on the hash of the inserted key + for k := range store { + Del(k) + evictCount-- + if evictCount <= 0 { + break + } + } +} + // TODO: Make the eviction strategy configuration driven // TODO: Support multiple eviction strategies func evict() { switch config.EvictionStrategy { case "simple-first": evictFirst() + case "allkeys-random": + evictAllkeysRandom() } } diff --git a/core/expire.go b/core/expire.go index a403e9b1b..38f34821e 100644 --- a/core/expire.go +++ b/core/expire.go @@ -17,7 +17,7 @@ func expireSample() float32 { limit-- // if the key is expired if obj.ExpiresAt <= time.Now().UnixMilli() { - delete(store, key) + Del(key) expiredCount++ } } diff --git a/core/stats.go b/core/stats.go new file mode 100644 index 000000000..6b8619e4e --- /dev/null +++ b/core/stats.go @@ -0,0 +1,7 @@ +package core + +var KeyspaceStat [4]map[string]int + +func UpdateDBStat(num int, metric string, value int) { + KeyspaceStat[num][metric] = value +} diff --git a/core/store.go b/core/store.go index 5d3b235d3..103a2dfdb 100644 --- a/core/store.go +++ b/core/store.go @@ -30,13 +30,17 @@ func Put(k string, obj *Obj) { evict() } store[k] = obj + if KeyspaceStat[0] == nil { + KeyspaceStat[0] = make(map[string]int) + } + KeyspaceStat[0]["keys"]++ } func Get(k string) *Obj { v := store[k] if v != nil { if v.ExpiresAt != -1 && v.ExpiresAt <= time.Now().UnixMilli() { - delete(store, k) + Del(k) return nil } } @@ -46,6 +50,7 @@ func Get(k string) *Obj { func Del(k string) bool { if _, ok := store[k]; ok { delete(store, k) + KeyspaceStat[0]["keys"]-- return true } return false diff --git a/storm/set/main.go b/storm/set/main.go new file mode 100644 index 000000000..a0012211c --- /dev/null +++ b/storm/set/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "io" + "math/rand" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/dicedb/dice/core" +) + +func getRandomKeyValue() (string, int64) { + token := int64(rand.Uint64() % 5000000) + return "k" + strconv.FormatInt(token, 10), token +} + +func stormSet(wg *sync.WaitGroup) { + defer wg.Done() + conn, err := net.Dial("tcp", "localhost:7379") + if err != nil { + panic(err) + } + + for { + time.Sleep(500 * time.Millisecond) + k, v := getRandomKeyValue() + var buf [512]byte + cmd := fmt.Sprintf("SET %s %d", k, v) + fmt.Println(cmd) + _, err = conn.Write(core.Encode(strings.Split(cmd, " "), false)) + if err != nil { + panic(err) + } + _, err = conn.Read(buf[:]) + if err != nil { + if err == io.EOF { + return + } + panic(err) + } + } + conn.Close() +} + +func main() { + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go stormSet(&wg) + } + wg.Wait() +}