Skip to content

Commit

Permalink
Refactor - Move command execution responsibility to CommandHandler fr…
Browse files Browse the repository at this point in the history
…om IOThread (#1358)
  • Loading branch information
psrvere authored Jan 4, 2025
1 parent 31dad51 commit 699f3bb
Show file tree
Hide file tree
Showing 19 changed files with 848 additions and 719 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type performance struct {
WatchChanBufSize int `config:"watch_chan_buf_size" default:"20000"`
ShardCronFrequency time.Duration `config:"shard_cron_frequency" default:"1s"`
MultiplexerPollTimeout time.Duration `config:"multiplexer_poll_timeout" default:"100ms"`
MaxClients int32 `config:"max_clients" default:"20000" validate:"min=0"`
MaxClients uint32 `config:"max_clients" default:"20000" validate:"min=0"`
StoreMapInitSize int `config:"store_map_init_size" default:"1024000"`
AdhocReqChanBufSize int `config:"adhoc_req_chan_buf_size" default:"20"`
EnableProfiling bool `config:"profiling" default:"false"`
Expand Down
7 changes: 5 additions & 2 deletions integration_tests/commands/resp/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"testing"
"time"

"github.com/dicedb/dice/internal/commandhandler"
"github.com/dicedb/dice/internal/iothread"
"github.com/dicedb/dice/internal/server/resp"
"github.com/dicedb/dice/internal/wal"
Expand Down Expand Up @@ -211,10 +212,12 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) {
cmdWatchSubscriptionChan := make(chan watchmanager.WatchSubscription)
gec := make(chan error)
shardManager := shard.NewShardManager(1, cmdWatchChan, gec)
ioThreadManager := iothread.NewManager(20000, shardManager)
ioThreadManager := iothread.NewManager(config.DiceConfig.Performance.MaxClients)
cmdHandlerManager := commandhandler.NewRegistry(config.DiceConfig.Performance.MaxClients, shardManager)

// Initialize the RESP Server
wl, _ := wal.NewNullWAL()
testServer := resp.NewServer(shardManager, ioThreadManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)
testServer := resp.NewServer(shardManager, ioThreadManager, cmdHandlerManager, cmdWatchSubscriptionChan, cmdWatchChan, gec, wl)

ctx, cancel := context.WithCancel(context.Background())
fmt.Println("Starting the test server on port", config.DiceConfig.RespServer.Port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"math"
Expand All @@ -24,7 +24,7 @@ import (
"github.com/dicedb/dice/internal/ops"
)

// This file contains functions used by the IOThread to handle and process responses
// This file contains functions used by the CommandHandler to handle and process responses
// from multiple shards during distributed operations. For commands that are executed
// across several shards, such as MultiShard commands, dedicated functions are responsible
// for aggregating and managing the results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"fmt"
Expand All @@ -26,34 +26,6 @@ import (
diceerrors "github.com/dicedb/dice/internal/errors"
)

// RespAuth returns with an encoded "OK" if the user is authenticated
// If the user is not authenticated, it returns with an encoded error message
func (t *BaseIOThread) RespAuth(args []string) interface{} {
// Check for incorrect number of arguments (arity error).
if len(args) < 1 || len(args) > 2 {
return diceerrors.ErrWrongArgumentCount("AUTH")
}

if config.DiceConfig.Auth.Password == "" {
return diceerrors.ErrAuth
}

username := config.DiceConfig.Auth.UserName
var password string

if len(args) == 1 {
password = args[0]
} else {
username, password = args[0], args[1]
}

if err := t.Session.Validate(username, password); err != nil {
return err
}

return clientio.OK
}

// RespPING evaluates the PING command and returns the appropriate response.
// If no arguments are provided, it responds with "PONG" (standard behavior).
// If an argument is provided, it returns the argument as the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package iothread
package commandhandler

import (
"context"
Expand All @@ -28,7 +28,7 @@ import (
"github.com/dicedb/dice/internal/store"
)

// This file is utilized by the IOThread to decompose commands that need to be executed
// This file is utilized by the CommandHandler to decompose commands that need to be executed
// across multiple shards. For commands that operate on multiple keys or necessitate
// distribution across shards (e.g., MultiShard commands), a Breakup function is invoked
// to transform the original command into multiple smaller commands, each directed at
Expand All @@ -41,13 +41,13 @@ import (
// decomposeRename breaks down the RENAME command into separate DELETE and SET commands.
// It first waits for the result of a GET command from shards. If successful, it removes
// the old key using a DEL command and sets the new key with the retrieved value using a SET command.
func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeRename(ctx context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response
var val string
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
evalResp := preProcessedResp.EvalResponse
if evalResp.Error != nil {
Expand Down Expand Up @@ -85,13 +85,13 @@ func decomposeRename(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCm
// decomposeCopy breaks down the COPY command into a SET command that copies a value from
// one key to another. It first retrieves the value of the original key from shards, then
// sets the value to the destination key using a SET command.
func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeCopy(ctx context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response
var resp *ops.StoreResponse
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
resp = preProcessedResp
}
Expand Down Expand Up @@ -124,14 +124,14 @@ func decomposeCopy(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
// decomposePFMerge decomposes the PFMERGE command into individual GET commands for each HLL.
// For each key it creates a separate GET command to get the value at the given key, and waits for all responses to be
// returned before proceeding.
func decomposePFMerge(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposePFMerge(ctx context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
// Waiting for GET command response for all the keys to be merged
resp := make([]*object.InternalObj, 0, len(cd.Args)-1)
for i := 1; i < len(cd.Args); i++ {
select {
case <-ctx.Done():
slog.Error("IOThread timed out waiting for response from shards", slog.String("id", thread.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-thread.preprocessingChan:
slog.Error("CommandHandler timed out waiting for response from shards", slog.String("id", h.id), slog.Any("error", ctx.Err()))
case preProcessedResp, ok := <-h.preprocessingChan:
if ok {
if preProcessedResp.EvalResponse.Error != nil {
return nil, diceerrors.ErrInvalidHyperLogLogKey
Expand Down Expand Up @@ -166,7 +166,7 @@ func decomposePFMerge(ctx context.Context, thread *BaseIOThread, cd *cmd.DiceDBC
// decomposeMSet decomposes the MSET (Multi-set) command into individual SET commands.
// It expects an even number of arguments (key-value pairs). For each pair, it creates
// a separate SET command to store the value at the given key.
func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeMSet(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args)%2 != 0 {
return nil, diceerrors.ErrWrongArgumentCount("MSET")
}
Expand All @@ -190,7 +190,7 @@ func decomposeMSet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
// decomposeMGet decomposes the MGET (Multi-get) command into individual GET commands.
// It expects a list of keys, and for each key, it creates a separate GET command to
// retrieve the value associated with that key.
func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeMGet(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("MGET")
}
Expand All @@ -206,7 +206,7 @@ func decomposeMGet(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cm
return decomposedCmds, nil
}

func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeSInter(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SINTER")
}
Expand All @@ -222,7 +222,7 @@ func decomposeSInter(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*
return decomposedCmds, nil
}

func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeSDiff(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 1 {
return nil, diceerrors.ErrWrongArgumentCount("SDIFF")
}
Expand All @@ -238,7 +238,7 @@ func decomposeSDiff(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeJSONMget(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) < 2 {
return nil, diceerrors.ErrWrongArgumentCount("JSON.MGET")
}
Expand All @@ -257,7 +257,7 @@ func decomposeJSONMget(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([
return decomposedCmds, nil
}

func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeTouch(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) == 0 {
return nil, diceerrors.ErrWrongArgumentCount("TOUCH")
}
Expand All @@ -274,13 +274,13 @@ func decomposeTouch(_ context.Context, _ *BaseIOThread, cd *cmd.DiceDBCmd) ([]*c
return decomposedCmds, nil
}

func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeDBSize(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 0 {
return nil, diceerrors.ErrWrongArgumentCount("DBSIZE")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardSize,
Expand All @@ -291,13 +291,13 @@ func decomposeDBSize(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd)
return decomposedCmds, nil
}

func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeKeys(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) != 1 {
return nil, diceerrors.ErrWrongArgumentCount("KEYS")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.SingleShardKeys,
Expand All @@ -308,13 +308,13 @@ func decomposeKeys(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) (
return decomposedCmds, nil
}

func decomposeFlushDB(_ context.Context, thread *BaseIOThread, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
func (h *BaseCommandHandler) decomposeFlushDB(_ context.Context, cd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) {
if len(cd.Args) > 1 {
return nil, diceerrors.ErrWrongArgumentCount("FLUSHDB")
}

decomposedCmds := make([]*cmd.DiceDBCmd, 0, len(cd.Args))
for i := uint8(0); i < uint8(thread.shardManager.GetShardCount()); i++ {
for i := uint8(0); i < uint8(h.shardManager.GetShardCount()); i++ {
decomposedCmds = append(decomposedCmds,
&cmd.DiceDBCmd{
Cmd: store.FlushDB,
Expand Down
Loading

0 comments on commit 699f3bb

Please sign in to comment.