Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add XAUTOCLAIM command, added to Redis in 6.2 #1018

Merged
merged 1 commit into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3280,6 +3280,38 @@ def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
synchronize { |client| client.call(args, &blk) }
end

# Transfers ownership of pending stream entries that match the specified criteria.
#
# @example Claim next pending message stuck > 5 minutes and mark as retry
# redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0')
# @example Claim 50 next pending messages stuck > 5 minutes and mark as retry
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', count: 50)
# @example Claim next pending message stuck > 5 minutes and don't mark as retry
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', justid: true)
# @example Claim next pending message after this id stuck > 5 minutes and mark as retry
# redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '1641321233-0')
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param min_idle_time [Integer] the number of milliseconds
# @param start [String] entry id to start scanning from or 0-0 for everything
# @param count [Integer] number of messages to claim (default 1)
# @param justid [Boolean] whether to fetch just an array of entry ids or not.
# Does not increment retry count when true
#
# @return [Hash{String => Hash}] the entries successfully claimed
# @return [Array<String>] the entry ids successfully claimed if justid option is `true`
def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)
args = [:xautoclaim, key, group, consumer, min_idle_time, start]
if count
args << 'COUNT' << count.to_s
end
args << 'JUSTID' if justid
blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim
synchronize { |client| client.call(args, &blk) }
end

# Fetches not acknowledging pending entries
#
# @example With key and group
Expand Down Expand Up @@ -3490,6 +3522,20 @@ def method_missing(command, *args) # rubocop:disable Style/MissingRespondToMissi
end
}

HashifyStreamAutoclaim = lambda { |reply|
{
'next' => reply[0],
'entries' => reply[1].map { |entry| [entry[0], entry[1].each_slice(2).to_h] }
}
}

HashifyStreamAutoclaimJustId = lambda { |reply|
{
'next' => reply[0],
'entries' => reply[1]
}
}

HashifyStreamPendings = lambda { |reply|
{
'size' => reply[0],
Expand Down
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REDIS_BRANCH ?= 6.0
REDIS_BRANCH ?= 6.2
TMP := tmp
BUILD_DIR := ${TMP}/cache/redis-${REDIS_BRANCH}
TARBALL := ${TMP}/redis-${REDIS_BRANCH}.tar.gz
Expand Down
67 changes: 67 additions & 0 deletions test/lint/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module Lint
module Streams
MIN_REDIS_VERSION = '4.9.0'
MIN_REDIS_VERSION_XAUTOCLAIM = '6.2.0'
ENTRY_ID_FORMAT = /\d+-\d+/.freeze

def setup
Expand Down Expand Up @@ -633,6 +634,72 @@ def test_xclaim_with_invalid_arguments
assert_raises(Redis::CommandError) { redis.xclaim('', '', '', '', '') }
end

def test_xautoclaim
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01
byroot marked this conversation as resolved.
Show resolved Hide resolved

actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0')

assert_equal '0-0', actual['next']
assert_equal %w(0-2 0-3), actual['entries'].map(&:first)
assert_equal(%w(v2 v3), actual['entries'].map { |i| i.last['f'] })
end

def test_xautoclaim_with_justid_option
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01

actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', justid: true)

assert_equal '0-0', actual['next']
assert_equal %w(0-2 0-3), actual['entries']
end

def test_xautoclaim_with_count_option
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01

actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', count: 1)

assert_equal '0-3', actual['next']
assert_equal %w(0-2), actual['entries'].map(&:first)
assert_equal(%w(v2), actual['entries'].map { |i| i.last['f'] })
end

def test_xautoclaim_with_larger_interval
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01

actual = redis.xautoclaim('s1', 'g1', 'c2', 36_000, '0-0')

assert_equal '0-0', actual['next']
assert_equal [], actual['entries']
end

def test_xpending
redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
Expand Down