diff --git a/Gemfile b/Gemfile index 4406e4a4..9f96a26d 100644 --- a/Gemfile +++ b/Gemfile @@ -2,4 +2,6 @@ source 'https://rubygems.org' gem 'ruby-prof', platforms: [:ruby_22, :ruby_23, :ruby_24] +gem 'rake', '>= 12.3', '< 14.0' + gemspec diff --git a/README.md b/README.md index 1ebb7b80..1543d9e8 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,6 @@ class Processor include Sneakers::Worker from_queue :logs - def work(msg) err = JSON.parse(msg) if err["type"] == "error" diff --git a/examples/benchmark_worker.rb b/examples/benchmark_worker.rb index d1866e3c..4f38c300 100644 --- a/examples/benchmark_worker.rb +++ b/examples/benchmark_worker.rb @@ -6,7 +6,7 @@ class BenchmarkWorker from_queue 'downloads', exchange_options: { durable: false }, queue_options: { durable: false }, - :ack => true, + consumer_options: { manual_ack: true }, :threads => 50, :prefetch => 50, :timeout_job_after => 1, diff --git a/examples/max_retry_handler.rb b/examples/max_retry_handler.rb index 4571418f..e407a6d9 100644 --- a/examples/max_retry_handler.rb +++ b/examples/max_retry_handler.rb @@ -15,7 +15,7 @@ Sneakers.logger.level = Logger::DEBUG WORKER_OPTIONS = { - :ack => true, + :consumer_options => { :manual_ack => true }, :threads => 1, :prefetch => 1, :timeout_job_after => 60, diff --git a/examples/middleware_worker.rb b/examples/middleware_worker.rb index e1ccf0b4..685fb2a2 100644 --- a/examples/middleware_worker.rb +++ b/examples/middleware_worker.rb @@ -21,8 +21,10 @@ def initialize(app, *args) def call(deserialized_msg, delivery_info, metadata, handler) puts "******** DemoMiddleware - before; args #{@args}" - @app.call(deserialized_msg, delivery_info, metadata, handler) + res = @app.call(deserialized_msg, delivery_info, metadata, handler) puts "******** DemoMiddleware - after" + + res end end diff --git a/examples/profiling_worker.rb b/examples/profiling_worker.rb index 1aab0abc..3abb4fc2 100644 --- a/examples/profiling_worker.rb +++ b/examples/profiling_worker.rb @@ -28,7 +28,7 @@ class ProfilingWorker include Sneakers::Worker from_queue 'downloads', - :ack => true, + :consumer_options => { :manual_ack => true }, :threads => 50, :prefetch => 50, :timeout_job_after => 1, diff --git a/examples/workflow_worker.rb b/examples/workflow_worker.rb index 0739aed1..0d1a5c6f 100644 --- a/examples/workflow_worker.rb +++ b/examples/workflow_worker.rb @@ -6,7 +6,7 @@ class WorkflowWorker from_queue 'downloads', exchange_options: { durable: false }, queue_options: { durable: false }, - :ack => true, + consumer_options: { manual_ack: true }, :threads => 50, :prefetch => 50, :timeout_job_after => 1, diff --git a/lib/sneakers/configuration.rb b/lib/sneakers/configuration.rb index 40ebedfc..5392db6f 100644 --- a/lib/sneakers/configuration.rb +++ b/lib/sneakers/configuration.rb @@ -21,6 +21,12 @@ class Configuration :arguments => {} }.freeze + CONSUMER_OPTION_DEFAULTS = { + :block => false, + :manual_ack => true, + :arguments => {} + }.freeze + DEFAULTS = { # Set up default handler which just logs the error. # Remove this in production if you don't want sensitive data logged. @@ -40,12 +46,12 @@ class Configuration :prefetch => 10, :threads => 10, :share_threads => false, - :ack => true, :heartbeat => 30, :hooks => {}, :exchange => 'sneakers', :exchange_options => EXCHANGE_OPTION_DEFAULTS, - :queue_options => QUEUE_OPTION_DEFAULTS + :queue_options => QUEUE_OPTION_DEFAULTS, + :consumer_options => CONSUMER_OPTION_DEFAULTS }.freeze @@ -107,6 +113,7 @@ def map_all_deprecated_options(hash) hash = map_deprecated_options_key(:exchange_options, :durable, :durable, false, hash) hash = map_deprecated_options_key(:queue_options, :durable, :durable, true, hash) hash = map_deprecated_options_key(:queue_options, :arguments, :arguments, true, hash) + hash = map_deprecated_options_key(:consumer_options, :ack, :manual_ack, true, hash) hash end diff --git a/lib/sneakers/queue.rb b/lib/sneakers/queue.rb index 0bd3a5b7..8e84e217 100644 --- a/lib/sneakers/queue.rb +++ b/lib/sneakers/queue.rb @@ -55,7 +55,7 @@ def subscribe(worker) # retry queues, etc). handler = handler_klass.new(@channel, queue, worker.opts) - @consumer = queue.subscribe(block: false, manual_ack: @opts[:ack]) do | delivery_info, metadata, msg | + @consumer = queue.subscribe(@opts[:consumer_options]) do | delivery_info, metadata, msg | worker.do_work(delivery_info, metadata, msg, handler) end nil diff --git a/lib/sneakers/runner.rb b/lib/sneakers/runner.rb index b681baa2..37f674f1 100644 --- a/lib/sneakers/runner.rb +++ b/lib/sneakers/runner.rb @@ -12,8 +12,8 @@ def run @se.run end - def stop - @se.stop + def stop(stop_graceful=true) + @se.stop(stop_graceful) end end @@ -40,7 +40,7 @@ def to_h def reload_config! - Sneakers.logger.warn("Loading runner configuration...") + Sneakers.logger.info("Loading runner configuration...") config_file = Sneakers::CONFIG[:runner_config_file] if config_file diff --git a/lib/sneakers/spawner.rb b/lib/sneakers/spawner.rb index 54b04b1f..25fb4e16 100644 --- a/lib/sneakers/spawner.rb +++ b/lib/sneakers/spawner.rb @@ -1,4 +1,5 @@ require 'yaml' +require 'erb' module Sneakers class Spawner @@ -12,7 +13,7 @@ def self.spawn end @pids = [] @exec_string = "bundle exec rake sneakers:run" - worker_config = YAML.load(File.read(worker_group_config_file)) + worker_config = YAML.load(ERB.new(File.read(worker_group_config_file)).result) worker_config.keys.each do |group_name| workers = worker_config[group_name]['classes'] workers = workers.join "," if workers.is_a?(Array) diff --git a/lib/sneakers/tasks.rb b/lib/sneakers/tasks.rb index 689aa3e6..c1b96f9c 100644 --- a/lib/sneakers/tasks.rb +++ b/lib/sneakers/tasks.rb @@ -10,7 +10,11 @@ Rake::Task['environment'].invoke if defined?(::Rails) - ::Rails.application.eager_load! + if defined?(::Zeitwerk) + ::Zeitwerk::Loader.eager_load_all + else + ::Rails.application.eager_load! + end end if ENV["WORKERS"].nil? diff --git a/lib/sneakers/version.rb b/lib/sneakers/version.rb index 5304ed17..c69fef9a 100644 --- a/lib/sneakers/version.rb +++ b/lib/sneakers/version.rb @@ -1,3 +1,3 @@ module Sneakers - VERSION = "2.12.0.pre" + VERSION = "2.13.0.pre" end diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 53df6ce2..bfe74095 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -16,7 +16,7 @@ def initialize(queue = nil, pool = nil, opts = {}) queue_name = self.class.queue_name opts = Sneakers::CONFIG.merge(opts) - @should_ack = opts[:ack] + @should_ack = opts[:consumer_options][:manual_ack] @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads]) @call_with_params = respond_to?(:work_with_params) @content_type = opts[:content_type] @@ -72,31 +72,31 @@ def process_work(delivery_info, metadata, msg, handler) end res = block_to_call.call(deserialized_msg, delivery_info, metadata, handler) end - rescue StandardError, ScriptError => ex + rescue SignalException, SystemExit + # ServerEngine handles these exceptions, so they are not expected to be raised within the worker. + # Nevertheless, they are listed here to ensure that they are not caught by the rescue block below. + raise + rescue Exception => ex res = :error error = ex worker_error(ex, log_msg: log_msg(msg), class: self.class.name, message: msg, delivery_info: delivery_info, metadata: metadata) - end - - if @should_ack - - if res == :ack + ensure + if @should_ack + case res # note to future-self. never acknowledge multiple (multiple=true) messages under threads. - handler.acknowledge(delivery_info, metadata, msg) - elsif res == :error - handler.error(delivery_info, metadata, msg, error) - elsif res == :reject - handler.reject(delivery_info, metadata, msg) - elsif res == :requeue - handler.reject(delivery_info, metadata, msg, true) - else - handler.noop(delivery_info, metadata, msg) + when :ack then handler.acknowledge(delivery_info, metadata, msg) + when :error then handler.error(delivery_info, metadata, msg, error) + when :reject then handler.reject(delivery_info, metadata, msg) + when :requeue then handler.reject(delivery_info, metadata, msg, true) + else + handler.noop(delivery_info, metadata, msg) + end + metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") end - metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}") - end - metrics.increment("work.#{self.class.name}.ended") + metrics.increment("work.#{self.class.name}.ended") + end end def stop diff --git a/sneakers.gemspec b/sneakers.gemspec index 9e29a2e5..c9456cb3 100755 --- a/sneakers.gemspec +++ b/sneakers.gemspec @@ -12,7 +12,7 @@ Gem::Specification.new do |gem| gem.email = ['jondotan@gmail.com'] gem.description = %q( Fast background processing framework for Ruby and RabbitMQ ) gem.summary = %q( Fast background processing framework for Ruby and RabbitMQ ) - gem.homepage = 'http://sneakers.io' + gem.homepage = 'https://github.com/jondot/sneakers' gem.license = 'MIT' gem.required_ruby_version = Gem::Requirement.new(">= 2.2") @@ -27,13 +27,12 @@ Gem::Specification.new do |gem| gem.add_dependency 'bunny', '~> 2.14' gem.add_dependency 'concurrent-ruby', '~> 1.0' gem.add_dependency 'thor' - gem.add_dependency 'rake', '~> 12.3' + gem.add_dependency 'rake', '>= 12.3', '< 14.0' # for integration environment (see .travis.yml and integration_spec) gem.add_development_dependency 'rabbitmq_http_api_client' gem.add_development_dependency 'redis' - gem.add_development_dependency 'rake', '~> 12.3' gem.add_development_dependency 'minitest', '~> 5.11' gem.add_development_dependency 'rr', '~> 1.2.1' gem.add_development_dependency 'unparser', '0.2.2' # keep below 0.2.5 for ruby 2.0 compat. diff --git a/spec/sneakers/publisher_spec.rb b/spec/sneakers/publisher_spec.rb index fd7f79b3..ede79030 100644 --- a/spec/sneakers/publisher_spec.rb +++ b/spec/sneakers/publisher_spec.rb @@ -7,7 +7,9 @@ { :prefetch => 25, :durable => true, - :ack => true, + :consumer_options => { + :manual_ack => true + }, :heartbeat => 2, :vhost => '/', :exchange => "sneakers", diff --git a/spec/sneakers/queue_spec.rb b/spec/sneakers/queue_spec.rb index fe203f4f..421d60b4 100644 --- a/spec/sneakers/queue_spec.rb +++ b/spec/sneakers/queue_spec.rb @@ -5,7 +5,6 @@ let :queue_vars do { :prefetch => 25, - :ack => true, :heartbeat => 2, :vhost => '/', :exchange => "sneakers", @@ -16,6 +15,10 @@ }, queue_options: { durable: true + }, + :consumer_options => { + :block => false, + :manual_ack => true } } end diff --git a/spec/sneakers/worker_handlers_spec.rb b/spec/sneakers/worker_handlers_spec.rb index 7d3f15a7..495c49d0 100644 --- a/spec/sneakers/worker_handlers_spec.rb +++ b/spec/sneakers/worker_handlers_spec.rb @@ -11,7 +11,10 @@ class HandlerTestWorker include Sneakers::Worker from_queue 'defaults', - :ack => true + :consumer_options => { + :manual_ack => true + } + def work(msg) if msg.is_a?(StandardError) diff --git a/spec/sneakers/worker_spec.rb b/spec/sneakers/worker_spec.rb index 0217fa7a..1d821e7c 100644 --- a/spec/sneakers/worker_spec.rb +++ b/spec/sneakers/worker_spec.rb @@ -17,7 +17,9 @@ class DummyWorker :exclusive => true, :arguments => { 'x-arg' => 'value' } }, - :ack => false, + :consumer_options => { + :manual_ack => false + }, :threads => 50, :prefetch => 40, :exchange => 'dummy', @@ -38,7 +40,9 @@ def work(msg) class AcksWorker include Sneakers::Worker from_queue 'defaults', - :ack => true + :consumer_options => { + :manual_ack => true + } def work(msg) if msg == :ack @@ -56,7 +60,9 @@ def work(msg) class PublishingWorker include Sneakers::Worker from_queue 'defaults', - :ack => false, + :consumer_options => { + :manual_ack => false + }, :exchange => 'foochange' def work(msg) @@ -67,7 +73,9 @@ def work(msg) class JSONPublishingWorker include Sneakers::Worker from_queue 'defaults', - :ack => false, + :consumer_options => { + :manual_ack => false + }, :exchange => 'foochange' def work(msg) @@ -78,7 +86,9 @@ def work(msg) class LoggingWorker include Sneakers::Worker from_queue 'defaults', - :ack => false + :consumer_options => { + :manual_ack => false + } def work(msg) logger.info "hello" @@ -88,7 +98,9 @@ def work(msg) class JSONWorker include Sneakers::Worker from_queue 'defaults', - :ack => false, + :consumer_options => { + :manual_ack => false + }, :content_type => 'application/json' def work(msg) @@ -98,7 +110,10 @@ def work(msg) class MetricsWorker include Sneakers::Worker from_queue 'defaults', - :ack => true + :consumer_options => { + :manual_ack => true + } + def work(msg) metrics.increment "foobar" @@ -109,7 +124,9 @@ def work(msg) class WithParamsWorker include Sneakers::Worker from_queue 'defaults', - :ack => true + :consumer_options => { + :manual_ack => true + } def work_with_params(msg, delivery_info, metadata) msg @@ -176,7 +193,6 @@ def work(msg) :prefetch => 10, :threads => 10, :share_threads => false, - :ack => true, :amqp => "amqp://guest:guest@localhost:5672", :vhost => "/", :exchange => "sneakers", @@ -192,6 +208,11 @@ def work(msg) :exclusive => false, :arguments => {} }, + :consumer_options => { + :block => false, + :manual_ack => true, + :arguments => {} + }, :hooks => {}, :handler => Sneakers::Handlers::Oneshot, :heartbeat => 30, @@ -214,7 +235,6 @@ def work(msg) :prefetch => 40, :threads => 50, :share_threads => false, - :ack => false, :amqp => "amqp://guest:guest@localhost:5672", :vhost => "/", :exchange => "dummy", @@ -230,6 +250,11 @@ def work(msg) :exclusive => true, :arguments => { 'x-arg' => 'value' } }, + :consumer_options => { + :block => false, + :manual_ack => false, + :arguments => {} + }, :hooks => {}, :handler => Sneakers::Handlers::Oneshot, :heartbeat => 5, @@ -252,7 +277,6 @@ def work(msg) :prefetch => 10, :threads => 10, :share_threads => false, - :ack => true, :amqp => "amqp://guest:guest@localhost:5672", :vhost => "/", :exchange => "sneakers", @@ -268,6 +292,11 @@ def work(msg) :exclusive => false, :arguments => { 'x-arg2' => 'value2' } }, + :consumer_options => { + :block => false, + :manual_ack => true, + :arguments => {} + }, :hooks => {}, :handler => Sneakers::Handlers::Oneshot, :heartbeat => 30, @@ -408,7 +437,10 @@ def call(deserialized_msg, delivery_info, metadata, handler) let(:worker) do Class.new do include Sneakers::Worker - from_queue 'defaults', ack: false + from_queue 'defaults', + :consumer_options => { + :manual_ack => false + } def work_with_params(msg, delivery_info, metadata) msg