Skip to content

Commit

Permalink
Improved response handling for watch results
Browse files Browse the repository at this point in the history
  • Loading branch information
JyotinderSingh committed Nov 22, 2024
1 parent b4a31f3 commit 91eb1d2
Showing 1 changed file with 64 additions and 20 deletions.
84 changes: 64 additions & 20 deletions watch_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (w *WatchConn) Watch(ctx context.Context, cmdName string, args ...interface
// cleanup
close(firstMsgCh)
delete(w.msgCh.watchLabelFirstMsgChMap, watchLabel)
if firstMsg.Data, err = parseWithTypes(cmdName, firstMsg.Data); err != nil {
return nil, err
}
return firstMsg, nil
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -280,50 +283,91 @@ func (w *WatchConn) processWatchResult(payload []interface{}) (*WatchResult, err
}

data := payload[2]
var typedData any
typedData, err := parseWithTypes(command, data)
if err != nil {
return nil, err
}

return &WatchResult{Command: command, Fingerprint: fingerprint, Data: typedData}, nil
}

switch command {
func parseWithTypes(cmd string, data interface{}) (interface{}, error) {
switch cmd {
case "GET.WATCH", "GET":
if data == nil {
return &WatchResult{Command: command, Fingerprint: fingerprint, Data: nil}, nil
}
typedData = data
return data, nil
case "ZRANGE.WATCH", "ZRANGE":
typedData, ok = parseZRangeResult(data)
if !ok {
return nil, fmt.Errorf("err: invalid data in ZRANGE.WATCH message, expected []Z, got %T", payload[2])
}
return parseZRangeResult(data)
default:
typedData = data
return data, nil
}

return &WatchResult{Command: command, Fingerprint: fingerprint, Data: typedData}, nil
}

// parseScores parses the Data from ZRANGE into a slice of Score.
func parseZRangeResult(data interface{}) ([]Z, bool) {
// parseZRangeResult parses the Data from ZRANGE or ZRANGE.WATCH into appropriate type
func parseZRangeResult(data interface{}) (interface{}, error) {
dataList, ok := data.([]interface{})
if !ok {
return nil, false
return nil, nil
}

// Empty result case
if len(dataList) == 0 {
return []string{}, nil
}

var scores []Z
// Check if we have scores by examining data structure
// If data has even length and alternate elements can be parsed as floats,
// we treat it as WITHSCORES result
hasScores := false
if len(dataList) > 1 {
if scoreStr, ok := dataList[1].(string); ok {
if _, err := strconv.ParseFloat(scoreStr, 64); err == nil {
hasScores = true
}
}
}

if hasScores {
return parseZRangeWithScores(dataList)
}
return parseZRangeMembers(dataList)
}

// parseZRangeWithScores parses when WITHSCORES was used
func parseZRangeWithScores(dataList []interface{}) ([]Z, error) {
if len(dataList)%2 != 0 {
return nil, fmt.Errorf("err: invalid ZRANGE.WATCH message format")
}

scores := make([]Z, 0, len(dataList)/2)
for i := 0; i < len(dataList); i += 2 {
member, ok1 := dataList[i].(string)
scoreStr, ok2 := dataList[i+1].(string)
if !ok1 || !ok2 {
return nil, false
return nil, fmt.Errorf("err: invalid ZRANGE.WATCH message format")
}
scoreFloat, err := strconv.ParseFloat(scoreStr, 64)
if err != nil {
return nil, false
return nil, fmt.Errorf("err: invalid ZRANGE.WATCH message format")
}
scores = append(scores, Z{
Member: member,
Score: scoreFloat,
})
}
return scores, true
return scores, nil
}

// parseZRangeMembers parses when WITHSCORES was not used
func parseZRangeMembers(dataList []interface{}) ([]string, error) {
members := make([]string, 0, len(dataList))
for _, item := range dataList {
member, ok := item.(string)
if !ok {
return nil, fmt.Errorf("err: invalid ZRANGE.WATCH message format")
}
members = append(members, member)
}
return members, nil
}

// ReceiveTimeout acts like Receive but returns an error if a message
Expand Down

0 comments on commit 91eb1d2

Please sign in to comment.