Skip to content

Commit

Permalink
Merge pull request #1 from vavato-be/retry-autoclaim
Browse files Browse the repository at this point in the history
Rudimentary retry support through xautoclaim
  • Loading branch information
alexmreis authored Jul 13, 2021
2 parents 7d51c3a + 01fcabf commit 16e194b
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 44 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: release

on:
push:
branches:
- main

jobs:
release-please:
runs-on: ubuntu-latest
steps:
- uses: GoogleCloudPlatform/release-please-action@v2
id: release
with:
release-type: ruby
package-name: nagare-redis
bump-minor-pre-major: true
version-file: "lib/nagare/version.rb"
# Checkout code if release was created
- uses: actions/checkout@v2
if: ${{ steps.release.outputs.release_created }}
# Setup ruby if a release was created
- uses: ruby/setup-ruby@v1
with:
ruby-version: 2.6.5
if: ${{ steps.release.outputs.release_created }}
# Bundle install
- run: bundle install
if: ${{ steps.release.outputs.release_created }}
# Publish
- name: publish gem
run: |
mkdir -p $HOME/.gem
touch $HOME/.gem/credentials
chmod 0600 $HOME/.gem/credentials
printf -- "---\n:rubygems_api_key: ${GEM_HOST_API_KEY}\n" > $HOME/.gem/credentials
gem build *.gemspec
gem push *.gem
env:
# Make sure to update the secret name
# if yours isn't named RUBYGEMS_AUTH_TOKEN
GEM_HOST_API_KEY: "${{secrets.RUBYGEMS_AUTH_TOKEN}}"
if: ${{ steps.release.outputs.release_created }}
4 changes: 2 additions & 2 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ jobs:
- uses: actions/checkout@v2

- name: Set up Ruby 2.6
uses: actions/setup-ruby@v1
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.6.6
ruby-version: 2.6.8

- uses: actions/cache@v1
with:
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ source 'https://rubygems.org'
gemspec

gem 'rake', '~> 12.0'
gem 'redis', github: 'vavato-be/redis-rb'
gem 'rspec', '~> 3.0'
49 changes: 28 additions & 21 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
GIT
remote: https://github.com/vavato-be/redis-rb.git
revision: 79f36a58e69beb72f3bcef2ae584b30324fa2f07
specs:
redis (6.2.0)

PATH
remote: .
specs:
nagare-redis (0.1.3)
redis (~> 4.1, >= 4.1.0)
nagare-redis (0.1.5)
redis (~> 6.2, >= 6.2.0)

GEM
remote: https://rubygems.org/
specs:
ast (2.4.1)
ast (2.4.2)
diff-lcs (1.3)
parallel (1.19.2)
parser (2.7.1.4)
parallel (1.20.1)
parser (3.0.2.0)
ast (~> 2.4.1)
rainbow (3.0.0)
rake (12.3.2)
redis (4.2.1)
regexp_parser (1.7.1)
rexml (3.2.4)
regexp_parser (2.1.1)
rexml (3.2.5)
rspec (3.9.0)
rspec-core (~> 3.9.0)
rspec-expectations (~> 3.9.0)
Expand All @@ -30,31 +35,33 @@ GEM
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.9.0)
rspec-support (3.9.3)
rubocop (0.88.0)
rubocop (1.18.3)
parallel (~> 1.10)
parser (>= 2.7.1.1)
parser (>= 3.0.0.0)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.7)
regexp_parser (>= 1.8, < 3.0)
rexml
rubocop-ast (>= 0.1.0, < 1.0)
rubocop-ast (>= 1.7.0, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 1.4.0, < 2.0)
rubocop-ast (0.2.0)
parser (>= 2.7.0.1)
rubocop-rspec (1.42.0)
rubocop (>= 0.87.0)
ruby-progressbar (1.10.1)
unicode-display_width (1.7.0)
unicode-display_width (>= 1.4.0, < 3.0)
rubocop-ast (1.7.0)
parser (>= 3.0.1.1)
rubocop-rspec (2.4.0)
rubocop (~> 1.0)
rubocop-ast (>= 1.1.0)
ruby-progressbar (1.11.0)
unicode-display_width (2.0.0)

PLATFORMS
ruby

DEPENDENCIES
nagare-redis!
rake (~> 12.0)
redis!
rspec (~> 3.0)
rubocop (~> 0.88, >= 0.88)
rubocop-rspec (~> 1.42, >= 1.42.0)
rubocop (~> 1.18.3, >= 1.18.3)
rubocop-rspec (~> 2.4.0, >= 2.4.0)

BUNDLED WITH
2.1.0
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,25 @@ To use with rails, add nagare to the initializers:
#### config/initializers/nagare.rb
```ruby
Nagare.configure do |config|
# After x seconds a consumer is considered dead and its messages
# are assigned to a different consumer in the group. Configuring
# After x milisseconds a pending message is considered failed and
# gets retried by a different consumer in the group. Configuring
# it too low might cause double processing of messages as a consumer
# "steals" the load of another while the first one is still processing
# it and hasn't had the chance to ACK, configuring it too high will
# introduce latency in your processing.
# Default: 300 (5 minutes)
config.dead_consumer_timeout = 600
# Default: 600.000 (10 minutes)
config.min_idle_time = 600_0000

# This is the consumer group name that will be used or created in
# Redis. Use a different group for every microservice / application
# Default: Rails.env
config.group_name = :monolith

# A suffix is supported in order to separate different environments
# within the same redis database. Useful for development/test. This
# gets added automatically to stream names and consumer group names.
config.suffix = ''

# URL to connect to redis. Defaults to redis://localhost:6379 uses
# ENV['REDIS_URL'] if present.
config.redis_url = 'redis://10.1.1.1:6379'
Expand Down
8 changes: 5 additions & 3 deletions lib/nagare/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@ module Nagare
# See the README for possible values and what they do
class Config
class << self
attr_accessor :dead_consumer_timeout, :group_name, :redis_url, :threads,
:suffix
attr_accessor :group_name, :redis_url, :threads, :suffix, :min_idle_time

# Runs code in the block passed in to configure Nagare and sets defaults
# when values are not set.
#
# returns Nagare::Config self
# returns [Nagare::Config] self
# rubocop:disable Metrics/CyclomaticComplexity
def configure
yield(self)
@dead_consumer_timeout ||= 5000
@group_name ||= 'nagare'
@redis_url = redis_url || ENV['REDIS_URL'] || 'redis://localhost:6379'
@threads ||= 1
@suffix ||= nil
@min_idle_time ||= 600_000
self
end
# rubocop:enable Metrics/CyclomaticComplexity
end
end
end
1 change: 1 addition & 0 deletions lib/nagare/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def stream_name
# The ClassMethods module is automatically loaded into child classes
# effectively adding the `stream` class method to the child class.`
def self.inherited(subclass)
super
subclass.extend(ClassMethods)
end

Expand Down
14 changes: 11 additions & 3 deletions lib/nagare/listener_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,30 @@ def poll
private

def poll_stream(stream, listeners)
# TODO: Use thread pool
messages = Nagare::RedisStreams.read_next_messages(stream, group)
return unless Nagare::RedisStreams.group_exists?(stream, group)

messages = Nagare::RedisStreams.claim_next_stuck_message(stream, group)

if messages.nil? || messages.empty?
messages = Nagare::RedisStreams.read_next_messages(stream, group)
end
return unless messages.any?

messages.each do |message|
deliver_message(stream, message, listeners)
end
end

def claim_pending_messages(stream)
return nil unless Nagare::RedisStreams.group_exists?(stream, group)
end

def deliver_message(stream, message, listeners)
listener_failed = false

listeners.each do |listener|
invoke_listener(stream, message, listener)
rescue StandardError => e
# TODO: Retry logic
logger.error e.message
logger.error e.backtrace.join("\n")
listener_failed = true
Expand Down
21 changes: 19 additions & 2 deletions lib/nagare/redis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def connection
# @param group [String] name of the group
#
# @return [Boolean] true if the group exists, otherwise false
# rubocop:disable Metrics/AbcSize
def group_exists?(stream, group)
stream = stream_name(stream)
info = connection.xinfo(:groups, stream.to_s)
Expand All @@ -41,7 +40,6 @@ def group_exists?(stream, group)
logger.info e.backtrace.join("\n")
false
end
# rubocop:enable Metrics/AbcSize

##
# Creates a group in redis for the stream using xgroup
Expand Down Expand Up @@ -81,6 +79,25 @@ def publish(stream, event_name, data)
connection.xadd(stream, { "#{event_name}": data })
end

##
# Claums the next message of the consumer group that is stuck
# (pending and past min_idle_time since being picked up)
#
# @param stream [String] name of the stream
# @param group [String] name of the consumer group
#
# @return [Array[Hash]] array containing the 1 message or empty
def claim_next_stuck_message(stream, group)
stream = stream_name(stream)
result = connection.xautoclaim(stream,
"#{stream}-#{group}",
"#{hostname}-#{thread_id}",
Nagare::Config.min_idle_time,
'0-0',
count: 1)
result['entries'] || []
end

##
# Reads the next messages from the consumer group in redis.
#
Expand Down
2 changes: 1 addition & 1 deletion lib/nagare/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Nagare
VERSION = '0.1.4'
VERSION = '0.1.5'
end
10 changes: 6 additions & 4 deletions nagare.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ Gem::Specification.new do |spec|
spec.metadata['source_code_uri'] = 'https://github.com/vavato-be/nagare.git'
spec.metadata['changelog_uri'] = 'https://github.com/vavato-be/nagare/CHANGELOG.md'

spec.add_dependency 'redis', '~> 4.1', '>= 4.1.0'
spec.add_development_dependency 'rubocop', '~> 0.88', '>= 0.88'
spec.add_development_dependency 'rubocop-rspec', '~> 1.42', '>= 1.42.0'
spec.add_dependency 'redis', '~> 6.2', '>= 6.2.0'
spec.add_development_dependency 'rubocop', '~> 1.18.3', '>= 1.18.3'
spec.add_development_dependency 'rubocop-rspec', '~> 2.4.0', '>= 2.4.0'

# Specify which files should be added to the gem when it is released.
# The `git ls-files -z` loads the files in the RubyGem that have been added into git.
spec.files = Dir.chdir(File.expand_path(__dir__)) do
`git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
`git ls-files -z`.split("\x0").reject do |f|
f.match(%r{^(test|spec|features)/})
end
end
spec.bindir = 'exe'
spec.executables << 'nagare'
Expand Down
49 changes: 45 additions & 4 deletions spec/lib/nagare/listener_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

before do
allow(Nagare::Config).to receive(:group_name).and_return(group)
allow(Nagare::RedisStreams).to receive(:claim_next_stuck_message).and_return([])
allow(Nagare::RedisStreams).to receive(:group_exists?).and_return(true)
end

describe '.start_listening' do
Expand Down Expand Up @@ -53,7 +55,8 @@

context 'when the listener processes the event without raising errors' do
before do
allow(Nagare::RedisStreams).to receive(:read_next_messages).and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:read_next_messages)
.and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:mark_processed)
described_class.poll
end
Expand All @@ -63,14 +66,17 @@
end

it 'marks the message as processed (ACK) with the redis streams group' do
expect(Nagare::RedisStreams).to have_received(:mark_processed).with(stream.to_sym, group, message_id)
expect(Nagare::RedisStreams).to have_received(:mark_processed).with(
stream.to_sym, group, message_id
)
end
end

context 'when the listener raises an error' do
before do
allow(described_class).to receive(:listener_pool).and_return({ 'listener_pool_spec': [listener_class] })
allow(Nagare::RedisStreams).to receive(:read_next_messages).and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:read_next_messages)
.and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:mark_processed)
allow(listener).to receive(:handle_event).and_raise('Processing error')

Expand All @@ -82,7 +88,42 @@
end

it 'does NOT mark the message as processed (ACK) with the redis streams group' do
expect(Nagare::RedisStreams).not_to have_received(:mark_processed).with(stream.to_sym, group, message_id)
expect(Nagare::RedisStreams).not_to have_received(:mark_processed).with(
stream.to_sym, group, message_id
)
end
end
end

context 'when there are listeners registered and a pending event in the stream' do
let(:event) { { foo: 'bar' } }
let(:message_id) { 'message_id-0' }

before do
allow(described_class).to receive(:listener_pool).and_return({ 'listener_pool_spec': [listener_class] })
end

context 'when the listener processes the event without raising errors' do
before do
allow(Nagare::RedisStreams).to receive(:claim_next_stuck_message)
.and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:read_next_messages)
allow(Nagare::RedisStreams).to receive(:mark_processed)
described_class.poll
end

it 'invokes #handle_event on the listener, passing in the event' do
expect(listener).to have_received(:handle_event).with(event)
end

it 'marks the message as processed (ACK) with the redis streams group' do
expect(Nagare::RedisStreams).to have_received(:mark_processed).with(
stream.to_sym, group, message_id
)
end

it 'does not try to read the next message from the stream' do
expect(Nagare::RedisStreams).not_to have_received(:read_next_messages)
end
end
end
Expand Down
Loading

0 comments on commit 16e194b

Please sign in to comment.