diff --git a/examples/stream_subscriber.cr b/examples/stream_subscriber.cr index 70a4b18..67c924e 100644 --- a/examples/stream_subscriber.cr +++ b/examples/stream_subscriber.cr @@ -5,10 +5,14 @@ require "./streamer" redis = Redis::Client.new streamer = Streamer::Client.new(redis, consumer: "hostname") -streamer.subscribe "chat", group: "stuff", timeout: 30.minutes do |msg| - pp id: msg.id, person: Person.from_json(msg.body), timestamp: msg.timestamp, age: msg.age +pp streamer.stream_names + +streamer.subscribe "chat", group: "stuff", timeout: 30.seconds do |msg| + person = Person.from_json(msg.body) + pp id: msg.id, person: person, timestamp: msg.timestamp, age: msg.age end +# Hit Enter to exit gets require "json" diff --git a/examples/streamer.cr b/examples/streamer.cr index 217d4fb..260c2b1 100644 --- a/examples/streamer.cr +++ b/examples/streamer.cr @@ -1,3 +1,4 @@ +require "../src/redis" require "mutex" require "log" @@ -14,15 +15,16 @@ module Streamer timeout : Time::Span = 30.minutes, &block : Message -> ) : Nil - @redis.xgroup_create stream, group, id: "$", mkstream: true rescue nil - @redis.xgroup_create_consumer stream, group, consumer + key = stream_key(stream) + @redis.xgroup_create key, group, id: "$", mkstream: true rescue nil + @redis.xgroup_create_consumer key, group, consumer queue = Channel(Message).new # RETRIEVE+ENQUEUE FIBER spawn do loop do response = @redis.xreadgroup group, consumer, - streams: {stream => ">"}, + streams: {key => ">"}, count: "10", block: 2.seconds @@ -30,7 +32,7 @@ module Streamer response = Redis::Streaming::XReadGroupResponse.new(response) response.results.each do |result| # This *shouldn't* be necessary because we specified only this stream, but let's just be sure - next unless result.key == stream + next unless result.key == key result.messages.each do |msg| queue.send Message.new(msg) @@ -44,13 +46,15 @@ module Streamer spawn do earliest_id = "-" loop do - response = Redis::Streaming::XAutoClaimResponse.new(@redis.xautoclaim(stream, group, consumer, min_idle_time: timeout, start: "-", count: 10)) + response = Redis::Streaming::XAutoClaimResponse.new(@redis.xautoclaim(key, group, consumer, min_idle_time: timeout, start: "-", count: 10)) response.messages.each do |msg| queue.send Message.new(msg) end sleep timeout if response.messages.empty? + rescue ex + pp ex end end @@ -78,7 +82,7 @@ module Streamer if successful_ids.any? @log.trace { "Sending ack for #{successful_ids.size} messages" } success_mutex.synchronize do - @redis.xack stream, group, successful_ids + @redis.xack key, group, successful_ids # TODO: Throw the array away periodically to make it somewhat elastic. # If we get an abnormal tidal wave of messages we don't want it to @@ -93,7 +97,25 @@ module Streamer end def publish(stream : String, message : String) - @redis.xadd stream, id: "*", body: message + key = stream_key(stream) + @redis.xadd key, id: "*", body: message + end + + def stream_names + stream_names = [] of String + @redis.scan_each(match: stream_key("*"), type: "stream") do |key| + stream_names << stream_name_for_key(key) + end + + stream_names + end + + private def stream_key(stream_name : String) + "streamer:stream:#{stream_name}" + end + + private def stream_name_for_key(key : String) + key.lchop("streamer:stream:") end end