Skip to content

Commit

Permalink
Added Websocket support (DiceDB#690)
Browse files Browse the repository at this point in the history
  • Loading branch information
psrvere authored Sep 28, 2024
1 parent 6786c3a commit be8f2de
Show file tree
Hide file tree
Showing 15 changed files with 779 additions and 11 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ var (
EnableMultiThreading = false
EnableHTTP = true
HTTPPort = 8082

EnableWebsocket = true
WebsocketPort = 8379

// if RequirePass is set to an empty string, no authentication is required
RequirePass = utils.EmptyStr

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/cockroachdb/swiss v0.0.0-20240612210725-f4de07ae6964
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
github.com/dicedb/go-dice v0.0.0-20240820180649-d97f15fca831
github.com/gorilla/websocket v1.5.3
github.com/google/go-cmp v0.6.0
github.com/ohler55/ojg v1.24.0
github.com/pelletier/go-toml/v2 v2.2.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
Expand Down
46 changes: 46 additions & 0 deletions integration_tests/commands/websocket/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package websocket

import (
"context"
"log/slog"
"os"
"sync"
"testing"
"time"

"github.com/dicedb/dice/internal/logger"
)

func TestMain(m *testing.M) {
logger := logger.New(logger.Opts{WithTimestamp: false})
slog.SetDefault(logger)
var wg sync.WaitGroup

// Run the test server
// This is a synchronous method, because internally it
// checks for available port and then forks a goroutine
// to start the server
opts := TestServerOptions{
Port: 8380,
Logger: logger,
}
RunWebsocketServer(context.Background(), &wg, opts)

// Wait for the server to start
time.Sleep(2 * time.Second)

executor := NewWebsocketCommandExecutor()

// Run the test suite
exitCode := m.Run()

// abort
conn := executor.ConnectToServer()
executor.FireCommand(conn, WebsocketCommand{
Message: "abort",
})
executor.DisconnectServer(conn)

wg.Wait()
os.Exit(exitCode)
}
262 changes: 262 additions & 0 deletions integration_tests/commands/websocket/set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package websocket

import (
"fmt"
"strconv"
"strings"
"testing"
"time"

testifyAssert "github.com/stretchr/testify/assert"

"gotest.tools/v3/assert"
)

type TestCase struct {
name string
commands []WebsocketCommand
expected []interface{}
}

func TestSet(t *testing.T) {
exec := NewWebsocketCommandExecutor()

testCases := []TestCase{
{
name: "Set and Get Simple Value",
commands: []WebsocketCommand{
{Message: "set k v"},
{Message: "get k"},
},
expected: []interface{}{"OK", "v"},
},
{
name: "Set and Get Integer Value",
commands: []WebsocketCommand{
{Message: "set k 123456789"},
{Message: "get k"},
},
expected: []interface{}{"OK", float64(1.23456789e+08)},
},
{
name: "Overwrite Existing Key",
commands: []WebsocketCommand{
{Message: "set k v1"},
{Message: "set k 5"},
{Message: "get k"},
},
expected: []interface{}{"OK", "OK", float64(5)},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
conn := exec.ConnectToServer()
// delete existing key
_, err := exec.FireCommand(conn, WebsocketCommand{
Message: "del k",
})
testifyAssert.NoError(t, err)

for i, cmd := range tc.commands {
result, err := exec.FireCommand(conn, cmd)
testifyAssert.NoError(t, err)
assert.DeepEqual(t, tc.expected[i], result)
}
})
}
}

func TestSetWithOptions(t *testing.T) {
exec := NewWebsocketCommandExecutor()
expiryTime := strconv.FormatInt(time.Now().Add(1*time.Minute).UnixMilli(), 10)

testCases := []TestCase{
{
name: "Set with EX option",
commands: []WebsocketCommand{
{Message: "set k v ex 3"},
{Message: "get k"},
{Message: "sleep 3"},
{Message: "get k"},
},
expected: []interface{}{"OK", "v", "OK", "(nil)"},
},
{
name: "Set with PX option",
commands: []WebsocketCommand{
{Message: "set k v px 2000"},
{Message: "get k"},
{Message: "sleep 3"},
{Message: "get k"},
},
expected: []interface{}{"OK", "v", "OK", "(nil)"},
},
{
name: "Set with EX and PX option",
commands: []WebsocketCommand{
{Message: "set k v ex 2 px 2000"},
},
expected: []interface{}{"ERR syntax error"},
},
{
name: "XX on non-existing key",
commands: []WebsocketCommand{
{Message: "del k"},
{Message: "set k v xx true"},
{Message: "get k"},
},
expected: []interface{}{float64(0), "(nil)", "(nil)"},
},
{
name: "NX on non-existing key",
commands: []WebsocketCommand{
{Message: "del k"},
{Message: "set k v nx"},
{Message: "get k"},
},
expected: []interface{}{float64(0), "OK", "v"},
},
{
name: "NX on existing key",
commands: []WebsocketCommand{
{Message: "del k"},
{Message: "set k v nx"},
{Message: "get k"},
{Message: "set k v nx"},
},
expected: []interface{}{float64(0), "OK", "v", "(nil)"},
},
{
name: "PXAT option",
commands: []WebsocketCommand{
{Message: fmt.Sprintf("set k v pxat %v", expiryTime)},
{Message: "get k"},
},
expected: []interface{}{"OK", "v"},
},
{
name: "PXAT option with delete",
commands: []WebsocketCommand{
{Message: fmt.Sprintf("set k1 v1 pxat %v", expiryTime)},
{Message: "get k1"},
{Message: "sleep 4"},
{Message: "del k1"},
},
expected: []interface{}{"OK", "v1", "OK", float64(1)},
},
{
name: "PXAT option with invalid unix time ms",
commands: []WebsocketCommand{
{Message: "set k2 v2 pxat 123123"},
{Message: "get k2"},
},
expected: []interface{}{"OK", "(nil)"},
},
{
name: "XX on existing key",
commands: []WebsocketCommand{
{Message: "set k v2"},
{Message: "set k v2 xx"},
{Message: "get k"},
},
expected: []interface{}{"OK", "OK", "v2"},
},
{
name: "Multiple XX operations",
commands: []WebsocketCommand{
{Message: "set k v1"},
{Message: "set k v2 xx"},
{Message: "set k v3 xx"},
{Message: "get k"},
},
expected: []interface{}{"OK", "OK", "OK", "v3"},
},
{
name: "EX option",
commands: []WebsocketCommand{
{Message: "set k v ex 1"},
{Message: "get k"},
{Message: "sleep 2"},
{Message: "get k"},
},
expected: []interface{}{"OK", "v", "OK", "(nil)"},
},
{
name: "XX option",
commands: []WebsocketCommand{
{Message: "set k v xx ex 1"},
{Message: "get k"},
{Message: "sleep 2"},
{Message: "get k"},
{Message: "set k v xx ex 1"},
{Message: "get k"},
},
expected: []interface{}{"(nil)", "(nil)", "OK", "(nil)", "(nil)", "(nil)"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
conn := exec.ConnectToServer()
exec.FireCommand(conn, WebsocketCommand{Message: "del k"})
exec.FireCommand(conn, WebsocketCommand{Message: "del k1"})
exec.FireCommand(conn, WebsocketCommand{Message: "del k2"})
for i, cmd := range tc.commands {
result, err := exec.FireCommand(conn, cmd)
assert.NilError(t, err)
assert.Equal(t, tc.expected[i], result)
}
})
}
}

func TestSetWithExat(t *testing.T) {
exec := NewWebsocketCommandExecutor()
Etime := strconv.FormatInt(time.Now().Unix()+5, 10)
BadTime := "123123"

testCases := []TestCase{
{
name: "SET with EXAT",
commands: []WebsocketCommand{
{Message: "del k"},
{Message: fmt.Sprintf("set k v exat %v", Etime)},
{Message: "get k"},
{Message: "ttl k"},
},
expected: []interface{}{float64(0), "OK", "v", float64(4)},
},
{
name: "SET with invalid EXAT expires key immediately",
commands: []WebsocketCommand{
{Message: "del k"},
{Message: fmt.Sprintf("set k v exat %v", BadTime)},
{Message: "get k"},
{Message: "ttl k"},
},
expected: []interface{}{float64(0), "OK", "(nil)", float64(-2)},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
conn := exec.ConnectToServer()
// Ensure key is deleted before the test
exec.FireCommand(conn, WebsocketCommand{
Message: "del k",
})

for i, cmd := range tc.commands {
result, err := exec.FireCommand(conn, cmd)
assert.NilError(t, err)
command := strings.Split(cmd.Message, "")
if command[0] == "ttl" {
assert.Assert(t, result.(float64) <= tc.expected[i].(float64))
} else {
assert.DeepEqual(t, tc.expected[i], result)
}
}
})
}
}
Loading

0 comments on commit be8f2de

Please sign in to comment.