From dd0084bc1b9d696533ea0ee22dbcab470ee3827f Mon Sep 17 00:00:00 2001 From: Laur <49899951+laurglia@users.noreply.github.com> Date: Tue, 1 Oct 2024 21:24:58 +0300 Subject: [PATCH 1/4] fix(que): Fix bulk_enqueue when enqueuing more than 5 jobs (#1074) Previously, with Que `bulk_enqueue` a new span was created for every job in the batch. For each such span, a tag was added to _all_ jobs in the batch. This meant that if you enqueued 3 jobs, then 3 spans were created and each job had 3 tags pointing to 3 different spans. This behavior became problematic when you tried to insert more than 5 jobs, as Que supports up to 5 tags per job. More specifically, a runtime error was raised by Que. This commit fixes that bug by creating only a single span when using bulk_enqueue. --- .../instrumentation/que/patches/que_job.rb | 45 +++++++---- .../opentelemetry/instrumentation/que_test.rb | 78 +++++++++++++++++-- 2 files changed, 105 insertions(+), 18 deletions(-) diff --git a/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb b/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb index 5be898984..a189f5aee 100644 --- a/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb +++ b/instrumentation/que/lib/opentelemetry/instrumentation/que/patches/que_job.rb @@ -19,6 +19,10 @@ class << base # Module to prepend to Que singleton class module ClassMethods def enqueue(*args, job_options: {}, **arg_opts) + # In Que version 2.1.0 `bulk_enqueue` was introduced. + # In that case, the span is created inside the `bulk_enqueue` method. + return super(*args, **arg_opts) if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert] + tracer = Que::Instrumentation.instance.tracer otel_config = Que::Instrumentation.instance.config @@ -43,19 +47,8 @@ def enqueue(*args, job_options: {}, **arg_opts) OpenTelemetry.propagation.inject(tags, setter: TagSetter) end - # In Que version 2.1.0 `bulk_enqueue` was introduced and in order - # for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue. - if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert] - Thread.current[:que_jobs_to_bulk_insert][:job_options] = Thread.current[:que_jobs_to_bulk_insert][:job_options]&.merge(tags: tags) do |_, a, b| - a.is_a?(Array) && b.is_a?(Array) ? a.concat(b) : b - end - - job = super(*args, **arg_opts) - job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last - else - job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts) - job_attrs = job.que_attrs - end + job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts) + job_attrs = job.que_attrs span.name = "#{job_attrs[:job_class]} publish" span.add_attributes(QueJob.job_attributes(job_attrs)) @@ -67,6 +60,32 @@ def enqueue(*args, job_options: {}, **arg_opts) def gem_version @gem_version ||= Gem.loaded_specs['que'].version end + + if Gem.loaded_specs['que'].version >= Gem::Version.new('2.1.0') + def bulk_enqueue(**_kwargs, &block) + tracer = Que::Instrumentation.instance.tracer + otel_config = Que::Instrumentation.instance.config + + tracer.in_span('publish', kind: :producer) do |span| + super do + yield + + job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] + + unless job_attrs.empty? + span.name = "#{job_attrs.first[:job_class]} publish" + span.add_attributes(QueJob.job_attributes(job_attrs.first)) + end + + if otel_config[:propagation_style] != :none + job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] + job_options[:tags] ||= [] + OpenTelemetry.propagation.inject(job_options[:tags], setter: TagSetter) + end + end + end + end + end end def self.job_attributes(job_attrs) diff --git a/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb b/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb index 9ec9847c1..7222e82cd 100644 --- a/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb +++ b/instrumentation/que/test/opentelemetry/instrumentation/que_test.rb @@ -304,6 +304,74 @@ def self.run(first, second); end end end + describe 'enqueueing multiple jobs' do + it 'creates a span' do + Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + _(finished_spans.size).must_equal(1) + + span = finished_spans.last + _(span.kind).must_equal(:producer) + end + + it 'names the created span' do + Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + span = finished_spans.last + _(span.name).must_equal('TestJobAsync publish') + end + + it 'links spans together' do + bulk_jobs = Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + bulk_jobs.each { |job| Que.run_job_middleware(job) { job.tap(&:_run) } } + + _(finished_spans.size).must_equal(11) + + publish_span = finished_spans.first + + process_spans = finished_spans.drop(1) + + process_spans.each do |process_span| + _(publish_span.trace_id).wont_equal(process_span.trace_id) + + _(process_span.total_recorded_links).must_equal(1) + _(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id) + _(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id) + end + end + + it 'records attributes' do + Que.bulk_enqueue do + 10.times { TestJobAsync.enqueue } + end + + attributes = finished_spans.last.attributes + _(attributes['messaging.system']).must_equal('que') + _(attributes['messaging.destination']).must_equal('default') + _(attributes['messaging.destination_kind']).must_equal('queue') + _(attributes['messaging.operation']).must_equal('publish') + _(attributes['messaging.que.job_class']).must_equal('TestJobAsync') + _(attributes['messaging.que.priority']).must_equal(100) + _(attributes.key?('messaging.message_id')).must_equal(false) + end + end + + describe 'enqueueing zero jobs' do + it 'creates a span' do + Que.bulk_enqueue do + end + + _(finished_spans.size).must_equal(1) + end + end + describe 'processing a job' do before do bulk_job = Que.bulk_enqueue do @@ -363,10 +431,10 @@ def self.run(first, second); end _(finished_spans.size).must_equal(2) - span1 = finished_spans.first + span1 = finished_spans.last _(span1.kind).must_equal(:producer) - span2 = finished_spans.last + span2 = finished_spans.first _(span2.kind).must_equal(:consumer) end @@ -375,10 +443,10 @@ def self.run(first, second); end TestJobSync.enqueue end - span1 = finished_spans.first + span1 = finished_spans.last _(span1.name).must_equal('TestJobSync publish') - span2 = finished_spans.last + span2 = finished_spans.first _(span2.name).must_equal('TestJobSync process') end @@ -387,7 +455,7 @@ def self.run(first, second); end TestJobSync.enqueue end - attributes = finished_spans.last.attributes + attributes = finished_spans.first.attributes _(attributes['messaging.system']).must_equal('que') _(attributes['messaging.destination']).must_equal('default') _(attributes['messaging.destination_kind']).must_equal('queue') From ea603ac60f2a5001d9c05cbd1939ef5d58fedc17 Mon Sep 17 00:00:00 2001 From: Hannah Ramadan <76922290+hannahramadan@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:48:35 -0700 Subject: [PATCH 2/4] chore: Update CONTRIBUTING instructions for running tests using Appraisal (#1167) * chore: Update contributing instructions for running tests * Update CONTRIBUTING.md Co-authored-by: Kayla Reopelle <87386821+kaylareopelle@users.noreply.github.com> --------- Co-authored-by: Kayla Reopelle <87386821+kaylareopelle@users.noreply.github.com> --- CONTRIBUTING.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 58466f717..dc7c281c7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -80,6 +80,12 @@ For example, to test `opentelemetry-instrumentation-action_pack` you would: 2. Install the bundle with `bundle install` 3. Run the tests with `bundle exec rake` +Note: Some test suites make use of [Appraisal](https://github.com/thoughtbot/appraisal), a library for testing against different versions of dependencies. To run tests in suites that use Appraisal: + + 1. Change directory to the instrumentation you'd like to test, ex: `instrumentation/action_pack` + 2. Install the bundle with `bundle exec appraisal install` + 3. Run the tests with `bundle exec appraisal rake test` + ### Docker setup We use Docker Compose to configure and build services used in development and From 7b349b177d0fce296efdb92e5f4453c457f142b4 Mon Sep 17 00:00:00 2001 From: Sam <370182+plantfansam@users.noreply.github.com> Date: Tue, 1 Oct 2024 13:14:05 -0600 Subject: [PATCH 3/4] chore: update env var in vitess docs (#1176) Co-authored-by: Kayla Reopelle <87386821+kaylareopelle@users.noreply.github.com> --- propagator/vitess/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/propagator/vitess/README.md b/propagator/vitess/README.md index 47b88eda3..cb5c96ef9 100644 --- a/propagator/vitess/README.md +++ b/propagator/vitess/README.md @@ -40,7 +40,7 @@ Or, if you use [bundler][bundler-home], include `opentelemetry-propagator-vitess Configure your application to use this propagator with the Trilogy client instrumentation by setting the following [environment variable][envars]: ```console -OTEL_RUBY_INSTRUMENTATION_TRILOGY_PROPAGATOR=vitess +OTEL_RUBY_INSTRUMENTATION_TRILOGY_CONFIG_OPTS=propagator=vitess ``` ## How can I get involved? From 0a7f9e7950a37292d0092da7433dce821a9f524f Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Tue, 1 Oct 2024 12:37:26 -0700 Subject: [PATCH 4/4] fix: update references to logging exporter (#1178) * examples: update references to logging exporter This exporter has been replaced by the debug exporter and will be removed soon Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> * update example to use v0.109.0 of the collector Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --------- Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> Co-authored-by: Kayla Reopelle <87386821+kaylareopelle@users.noreply.github.com> --- docker-compose.yml | 6 +++--- otelcol-config.yml | 10 ++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index af815eabe..b77ad0e9d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -217,7 +217,7 @@ services: image: memcached:alpine command: memcached -m 64 ports: - - "11211:11211" + - "11211:11211" zookeeper: image: confluentinc/cp-zookeeper:latest @@ -248,8 +248,8 @@ services: - "16686:16686" otelcol: - image: otel/opentelemetry-collector:0.54.0 - command: [ "--config=/etc/otelcol-config.yml" ] + image: otel/opentelemetry-collector:0.109.0 + command: ["--config=/etc/otelcol-config.yml"] volumes: - ./otelcol-config.yml:/etc/otelcol-config.yml ports: diff --git a/otelcol-config.yml b/otelcol-config.yml index e653551b5..c406f0517 100644 --- a/otelcol-config.yml +++ b/otelcol-config.yml @@ -2,13 +2,15 @@ receivers: otlp: protocols: grpc: + endpoint: 0.0.0.0:4317 http: + endpoint: 0.0.0.0:4318 exporters: - jaeger: - endpoint: "jaeger:14250" + otlp: + endpoint: "jaeger:4317" tls: insecure: true - logging: + debug: processors: batch: @@ -18,4 +20,4 @@ service: traces: receivers: [otlp] processors: [batch] - exporters: [logging, jaeger] + exporters: [debug, otlp]