Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rudimentary retry support through xautoclaim #1

Merged
merged 4 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: release

on:
push:
branches:
- main

jobs:
release-please:
runs-on: ubuntu-latest
steps:
- uses: GoogleCloudPlatform/release-please-action@v2
id: release
with:
release-type: ruby
package-name: nagare-redis
bump-minor-pre-major: true
version-file: "lib/nagare/version.rb"
# Checkout code if release was created
- uses: actions/checkout@v2
if: ${{ steps.release.outputs.release_created }}
# Setup ruby if a release was created
- uses: ruby/setup-ruby@v1
with:
ruby-version: 2.6.5
if: ${{ steps.release.outputs.release_created }}
# Bundle install
- run: bundle install
if: ${{ steps.release.outputs.release_created }}
# Publish
- name: publish gem
run: |
mkdir -p $HOME/.gem
touch $HOME/.gem/credentials
chmod 0600 $HOME/.gem/credentials
printf -- "---\n:rubygems_api_key: ${GEM_HOST_API_KEY}\n" > $HOME/.gem/credentials
gem build *.gemspec
gem push *.gem
env:
# Make sure to update the secret name
# if yours isn't named RUBYGEMS_AUTH_TOKEN
GEM_HOST_API_KEY: "${{secrets.RUBYGEMS_AUTH_TOKEN}}"
if: ${{ steps.release.outputs.release_created }}
4 changes: 2 additions & 2 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ jobs:
- uses: actions/checkout@v2

- name: Set up Ruby 2.6
uses: actions/setup-ruby@v1
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.6.6
ruby-version: 2.6.8

- uses: actions/cache@v1
with:
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ source 'https://rubygems.org'
gemspec

gem 'rake', '~> 12.0'
gem 'redis', github: 'vavato-be/redis-rb'
gem 'rspec', '~> 3.0'
49 changes: 28 additions & 21 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
GIT
remote: https://github.com/vavato-be/redis-rb.git
revision: 79f36a58e69beb72f3bcef2ae584b30324fa2f07
specs:
redis (6.2.0)

PATH
remote: .
specs:
nagare-redis (0.1.3)
redis (~> 4.1, >= 4.1.0)
nagare-redis (0.1.5)
redis (~> 6.2, >= 6.2.0)

GEM
remote: https://rubygems.org/
specs:
ast (2.4.1)
ast (2.4.2)
diff-lcs (1.3)
parallel (1.19.2)
parser (2.7.1.4)
parallel (1.20.1)
parser (3.0.2.0)
ast (~> 2.4.1)
rainbow (3.0.0)
rake (12.3.2)
redis (4.2.1)
regexp_parser (1.7.1)
rexml (3.2.4)
regexp_parser (2.1.1)
rexml (3.2.5)
rspec (3.9.0)
rspec-core (~> 3.9.0)
rspec-expectations (~> 3.9.0)
Expand All @@ -30,31 +35,33 @@ GEM
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.9.0)
rspec-support (3.9.3)
rubocop (0.88.0)
rubocop (1.18.3)
parallel (~> 1.10)
parser (>= 2.7.1.1)
parser (>= 3.0.0.0)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.7)
regexp_parser (>= 1.8, < 3.0)
rexml
rubocop-ast (>= 0.1.0, < 1.0)
rubocop-ast (>= 1.7.0, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 1.4.0, < 2.0)
rubocop-ast (0.2.0)
parser (>= 2.7.0.1)
rubocop-rspec (1.42.0)
rubocop (>= 0.87.0)
ruby-progressbar (1.10.1)
unicode-display_width (1.7.0)
unicode-display_width (>= 1.4.0, < 3.0)
rubocop-ast (1.7.0)
parser (>= 3.0.1.1)
rubocop-rspec (2.4.0)
rubocop (~> 1.0)
rubocop-ast (>= 1.1.0)
ruby-progressbar (1.11.0)
unicode-display_width (2.0.0)

PLATFORMS
ruby

DEPENDENCIES
nagare-redis!
rake (~> 12.0)
redis!
rspec (~> 3.0)
rubocop (~> 0.88, >= 0.88)
rubocop-rspec (~> 1.42, >= 1.42.0)
rubocop (~> 1.18.3, >= 1.18.3)
rubocop-rspec (~> 2.4.0, >= 2.4.0)

BUNDLED WITH
2.1.0
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,25 @@ To use with rails, add nagare to the initializers:
#### config/initializers/nagare.rb
```ruby
Nagare.configure do |config|
# After x seconds a consumer is considered dead and its messages
# are assigned to a different consumer in the group. Configuring
# After x milisseconds a pending message is considered failed and
# gets retried by a different consumer in the group. Configuring
# it too low might cause double processing of messages as a consumer
# "steals" the load of another while the first one is still processing
# it and hasn't had the chance to ACK, configuring it too high will
# introduce latency in your processing.
# Default: 300 (5 minutes)
config.dead_consumer_timeout = 600
# Default: 600.000 (10 minutes)
config.min_idle_time = 600_0000

# This is the consumer group name that will be used or created in
# Redis. Use a different group for every microservice / application
# Default: Rails.env
config.group_name = :monolith

# A suffix is supported in order to separate different environments
# within the same redis database. Useful for development/test. This
# gets added automatically to stream names and consumer group names.
config.suffix = ''

# URL to connect to redis. Defaults to redis://localhost:6379 uses
# ENV['REDIS_URL'] if present.
config.redis_url = 'redis://10.1.1.1:6379'
Expand Down
8 changes: 5 additions & 3 deletions lib/nagare/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@ module Nagare
# See the README for possible values and what they do
class Config
class << self
attr_accessor :dead_consumer_timeout, :group_name, :redis_url, :threads,
:suffix
attr_accessor :group_name, :redis_url, :threads, :suffix, :min_idle_time

# Runs code in the block passed in to configure Nagare and sets defaults
# when values are not set.
#
# returns Nagare::Config self
# returns [Nagare::Config] self
# rubocop:disable Metrics/CyclomaticComplexity
def configure
yield(self)
@dead_consumer_timeout ||= 5000
@group_name ||= 'nagare'
@redis_url = redis_url || ENV['REDIS_URL'] || 'redis://localhost:6379'
@threads ||= 1
@suffix ||= nil
@min_idle_time ||= 600_000
self
end
# rubocop:enable Metrics/CyclomaticComplexity
end
end
end
1 change: 1 addition & 0 deletions lib/nagare/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def stream_name
# The ClassMethods module is automatically loaded into child classes
# effectively adding the `stream` class method to the child class.`
def self.inherited(subclass)
super
subclass.extend(ClassMethods)
end

Expand Down
14 changes: 11 additions & 3 deletions lib/nagare/listener_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,30 @@ def poll
private

def poll_stream(stream, listeners)
# TODO: Use thread pool
messages = Nagare::RedisStreams.read_next_messages(stream, group)
return unless Nagare::RedisStreams.group_exists?(stream, group)

messages = Nagare::RedisStreams.claim_next_stuck_message(stream, group)

if messages.nil? || messages.empty?
messages = Nagare::RedisStreams.read_next_messages(stream, group)
end
return unless messages.any?

messages.each do |message|
deliver_message(stream, message, listeners)
end
end

def claim_pending_messages(stream)
return nil unless Nagare::RedisStreams.group_exists?(stream, group)
end

def deliver_message(stream, message, listeners)
listener_failed = false

listeners.each do |listener|
invoke_listener(stream, message, listener)
rescue StandardError => e
# TODO: Retry logic
logger.error e.message
logger.error e.backtrace.join("\n")
listener_failed = true
Expand Down
21 changes: 19 additions & 2 deletions lib/nagare/redis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def connection
# @param group [String] name of the group
#
# @return [Boolean] true if the group exists, otherwise false
# rubocop:disable Metrics/AbcSize
def group_exists?(stream, group)
stream = stream_name(stream)
info = connection.xinfo(:groups, stream.to_s)
Expand All @@ -41,7 +40,6 @@ def group_exists?(stream, group)
logger.info e.backtrace.join("\n")
false
end
# rubocop:enable Metrics/AbcSize

##
# Creates a group in redis for the stream using xgroup
Expand Down Expand Up @@ -81,6 +79,25 @@ def publish(stream, event_name, data)
connection.xadd(stream, { "#{event_name}": data })
end

##
# Claums the next message of the consumer group that is stuck
# (pending and past min_idle_time since being picked up)
#
# @param stream [String] name of the stream
# @param group [String] name of the consumer group
#
# @return [Array[Hash]] array containing the 1 message or empty
def claim_next_stuck_message(stream, group)
stream = stream_name(stream)
result = connection.xautoclaim(stream,
"#{stream}-#{group}",
"#{hostname}-#{thread_id}",
Nagare::Config.min_idle_time,
'0-0',
count: 1)
result['entries'] || []
end

##
# Reads the next messages from the consumer group in redis.
#
Expand Down
2 changes: 1 addition & 1 deletion lib/nagare/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Nagare
VERSION = '0.1.4'
VERSION = '0.1.5'
end
10 changes: 6 additions & 4 deletions nagare.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ Gem::Specification.new do |spec|
spec.metadata['source_code_uri'] = 'https://github.com/vavato-be/nagare.git'
spec.metadata['changelog_uri'] = 'https://github.com/vavato-be/nagare/CHANGELOG.md'

spec.add_dependency 'redis', '~> 4.1', '>= 4.1.0'
spec.add_development_dependency 'rubocop', '~> 0.88', '>= 0.88'
spec.add_development_dependency 'rubocop-rspec', '~> 1.42', '>= 1.42.0'
spec.add_dependency 'redis', '~> 6.2', '>= 6.2.0'
spec.add_development_dependency 'rubocop', '~> 1.18.3', '>= 1.18.3'
spec.add_development_dependency 'rubocop-rspec', '~> 2.4.0', '>= 2.4.0'

# Specify which files should be added to the gem when it is released.
# The `git ls-files -z` loads the files in the RubyGem that have been added into git.
spec.files = Dir.chdir(File.expand_path(__dir__)) do
`git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
`git ls-files -z`.split("\x0").reject do |f|
f.match(%r{^(test|spec|features)/})
end
end
spec.bindir = 'exe'
spec.executables << 'nagare'
Expand Down
49 changes: 45 additions & 4 deletions spec/lib/nagare/listener_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

before do
allow(Nagare::Config).to receive(:group_name).and_return(group)
allow(Nagare::RedisStreams).to receive(:claim_next_stuck_message).and_return([])
allow(Nagare::RedisStreams).to receive(:group_exists?).and_return(true)
end

describe '.start_listening' do
Expand Down Expand Up @@ -53,7 +55,8 @@

context 'when the listener processes the event without raising errors' do
before do
allow(Nagare::RedisStreams).to receive(:read_next_messages).and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:read_next_messages)
.and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:mark_processed)
described_class.poll
end
Expand All @@ -63,14 +66,17 @@
end

it 'marks the message as processed (ACK) with the redis streams group' do
expect(Nagare::RedisStreams).to have_received(:mark_processed).with(stream.to_sym, group, message_id)
expect(Nagare::RedisStreams).to have_received(:mark_processed).with(
stream.to_sym, group, message_id
)
end
end

context 'when the listener raises an error' do
before do
allow(described_class).to receive(:listener_pool).and_return({ 'listener_pool_spec': [listener_class] })
allow(Nagare::RedisStreams).to receive(:read_next_messages).and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:read_next_messages)
.and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:mark_processed)
allow(listener).to receive(:handle_event).and_raise('Processing error')

Expand All @@ -82,7 +88,42 @@
end

it 'does NOT mark the message as processed (ACK) with the redis streams group' do
expect(Nagare::RedisStreams).not_to have_received(:mark_processed).with(stream.to_sym, group, message_id)
expect(Nagare::RedisStreams).not_to have_received(:mark_processed).with(
stream.to_sym, group, message_id
)
end
end
end

context 'when there are listeners registered and a pending event in the stream' do
let(:event) { { foo: 'bar' } }
let(:message_id) { 'message_id-0' }

before do
allow(described_class).to receive(:listener_pool).and_return({ 'listener_pool_spec': [listener_class] })
end

context 'when the listener processes the event without raising errors' do
before do
allow(Nagare::RedisStreams).to receive(:claim_next_stuck_message)
.and_return([[message_id, event]])
allow(Nagare::RedisStreams).to receive(:read_next_messages)
allow(Nagare::RedisStreams).to receive(:mark_processed)
described_class.poll
end

it 'invokes #handle_event on the listener, passing in the event' do
expect(listener).to have_received(:handle_event).with(event)
end

it 'marks the message as processed (ACK) with the redis streams group' do
expect(Nagare::RedisStreams).to have_received(:mark_processed).with(
stream.to_sym, group, message_id
)
end

it 'does not try to read the next message from the stream' do
expect(Nagare::RedisStreams).not_to have_received(:read_next_messages)
end
end
end
Expand Down
Loading