Skip to content

Commit

Permalink
Update streaming example
Browse files Browse the repository at this point in the history
  • Loading branch information
jgaskins committed Nov 11, 2021
1 parent f98e14c commit 5702175
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
8 changes: 6 additions & 2 deletions examples/stream_subscriber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ require "./streamer"
redis = Redis::Client.new
streamer = Streamer::Client.new(redis, consumer: "hostname")

streamer.subscribe "chat", group: "stuff", timeout: 30.minutes do |msg|
pp id: msg.id, person: Person.from_json(msg.body), timestamp: msg.timestamp, age: msg.age
pp streamer.stream_names

streamer.subscribe "chat", group: "stuff", timeout: 30.seconds do |msg|
person = Person.from_json(msg.body)
pp id: msg.id, person: person, timestamp: msg.timestamp, age: msg.age
end

# Hit Enter to exit
gets

require "json"
Expand Down
36 changes: 29 additions & 7 deletions examples/streamer.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require "../src/redis"
require "mutex"
require "log"

Expand All @@ -14,23 +15,24 @@ module Streamer
timeout : Time::Span = 30.minutes,
&block : Message ->
) : Nil
@redis.xgroup_create stream, group, id: "$", mkstream: true rescue nil
@redis.xgroup_create_consumer stream, group, consumer
key = stream_key(stream)
@redis.xgroup_create key, group, id: "$", mkstream: true rescue nil
@redis.xgroup_create_consumer key, group, consumer
queue = Channel(Message).new

# RETRIEVE+ENQUEUE FIBER
spawn do
loop do
response = @redis.xreadgroup group, consumer,
streams: {stream => ">"},
streams: {key => ">"},
count: "10",
block: 2.seconds

if response
response = Redis::Streaming::XReadGroupResponse.new(response)
response.results.each do |result|
# This *shouldn't* be necessary because we specified only this stream, but let's just be sure
next unless result.key == stream
next unless result.key == key

result.messages.each do |msg|
queue.send Message.new(msg)
Expand All @@ -44,13 +46,15 @@ module Streamer
spawn do
earliest_id = "-"
loop do
response = Redis::Streaming::XAutoClaimResponse.new(@redis.xautoclaim(stream, group, consumer, min_idle_time: timeout, start: "-", count: 10))
response = Redis::Streaming::XAutoClaimResponse.new(@redis.xautoclaim(key, group, consumer, min_idle_time: timeout, start: "-", count: 10))

response.messages.each do |msg|
queue.send Message.new(msg)
end

sleep timeout if response.messages.empty?
rescue ex
pp ex
end
end

Expand Down Expand Up @@ -78,7 +82,7 @@ module Streamer
if successful_ids.any?
@log.trace { "Sending ack for #{successful_ids.size} messages" }
success_mutex.synchronize do
@redis.xack stream, group, successful_ids
@redis.xack key, group, successful_ids

# TODO: Throw the array away periodically to make it somewhat elastic.
# If we get an abnormal tidal wave of messages we don't want it to
Expand All @@ -93,7 +97,25 @@ module Streamer
end

def publish(stream : String, message : String)
@redis.xadd stream, id: "*", body: message
key = stream_key(stream)
@redis.xadd key, id: "*", body: message
end

def stream_names
stream_names = [] of String
@redis.scan_each(match: stream_key("*"), type: "stream") do |key|
stream_names << stream_name_for_key(key)
end

stream_names
end

private def stream_key(stream_name : String)
"streamer:stream:#{stream_name}"
end

private def stream_name_for_key(key : String)
key.lchop("streamer:stream:")
end
end

Expand Down

0 comments on commit 5702175

Please sign in to comment.