Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(que): Fix bulk_enqueue when enqueuing more than 5 jobs #1

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class << base
# Module to prepend to Que singleton class
module ClassMethods
def enqueue(*args, job_options: {}, **arg_opts)
# In Que version 2.1.0 `bulk_enqueue` was introduced.
# In that case, the span is created inside the `bulk_enqueue` method.
return super(*args, **arg_opts) if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]

tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

Expand All @@ -43,19 +47,8 @@ def enqueue(*args, job_options: {}, **arg_opts)
OpenTelemetry.propagation.inject(tags, setter: TagSetter)
end

# In Que version 2.1.0 `bulk_enqueue` was introduced and in order
# for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue.
if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]
Thread.current[:que_jobs_to_bulk_insert][:job_options] = Thread.current[:que_jobs_to_bulk_insert][:job_options]&.merge(tags: tags) do |_, a, b|
a.is_a?(Array) && b.is_a?(Array) ? a.concat(b) : b
end

job = super(*args, **arg_opts)
job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last
else
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs
end
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs

span.name = "#{job_attrs[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs))
Expand All @@ -67,6 +60,32 @@ def enqueue(*args, job_options: {}, **arg_opts)
def gem_version
@gem_version ||= Gem.loaded_specs['que'].version
end

if Gem.loaded_specs['que'].version >= Gem::Version.new('2.1.0')
def bulk_enqueue(**_kwargs, &block)
tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

tracer.in_span('publish', kind: :producer) do |span|
super do
yield

job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]

unless job_attrs.empty?
span.name = "#{job_attrs.first[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs.first))
end

if otel_config[:propagation_style] != :none
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
job_options[:tags] ||= []
OpenTelemetry.propagation.inject(job_options[:tags], setter: TagSetter)
end
end
end
end
end
end

def self.job_attributes(job_attrs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,74 @@ def self.run(first, second); end
end
end

describe 'enqueueing multiple jobs' do
it 'creates a span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

_(finished_spans.size).must_equal(1)

span = finished_spans.last
_(span.kind).must_equal(:producer)
end

it 'names the created span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

span = finished_spans.last
_(span.name).must_equal('TestJobAsync publish')
end

it 'links spans together' do
bulk_jobs = Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

bulk_jobs.each { |job| Que.run_job_middleware(job) { job.tap(&:_run) } }

_(finished_spans.size).must_equal(11)

publish_span = finished_spans.first

process_spans = finished_spans.drop(1)

process_spans.each do |process_span|
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end

it 'records attributes' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

attributes = finished_spans.last.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
_(attributes['messaging.operation']).must_equal('publish')
_(attributes['messaging.que.job_class']).must_equal('TestJobAsync')
_(attributes['messaging.que.priority']).must_equal(100)
_(attributes.key?('messaging.message_id')).must_equal(false)
end
end

describe 'enqueueing zero jobs' do
it 'creates a span' do
Que.bulk_enqueue do
end

_(finished_spans.size).must_equal(1)
end
end

describe 'processing a job' do
before do
bulk_job = Que.bulk_enqueue do
Expand Down Expand Up @@ -363,10 +431,10 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

span1 = finished_spans.first
span1 = finished_spans.last
_(span1.kind).must_equal(:producer)

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.kind).must_equal(:consumer)
end

Expand All @@ -375,10 +443,10 @@ def self.run(first, second); end
TestJobSync.enqueue
end

span1 = finished_spans.first
span1 = finished_spans.last
_(span1.name).must_equal('TestJobSync publish')

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.name).must_equal('TestJobSync process')
end

Expand All @@ -387,7 +455,7 @@ def self.run(first, second); end
TestJobSync.enqueue
end

attributes = finished_spans.last.attributes
attributes = finished_spans.first.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
Expand Down