Skip to content

Commit

Permalink
NH-64718: wip otlp processor
Browse files Browse the repository at this point in the history
  • Loading branch information
xuan-cao-swi committed Oct 25, 2023
1 parent d7ff935 commit 640ab89
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 5 deletions.
3 changes: 2 additions & 1 deletion lib/solarwinds_apm/opentelemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
require_relative './opentelemetry/solarwinds_processor'
require_relative './opentelemetry/solarwinds_sampler'
require_relative './opentelemetry/solarwinds_exporter'
require_relative './opentelemetry/solarwinds_response_propagator'
require_relative './opentelemetry/solarwinds_response_propagator'
require_relative './opentelemetry/otlp_processor'
190 changes: 190 additions & 0 deletions lib/solarwinds_apm/opentelemetry/otlp_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# © 2023 SolarWinds Worldwide, LLC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at:http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

# require 'opentelemetry/sdk'
# require 'opentelemetry-metrics-sdk'
# require 'opentelemetry/exporter/otlp'

# OpenTelemetry::SDK.configure do |c|
# c.service_name = 'sinatra-sample-2'
# end

# otlp_metric_exporter = OpenTelemetry::SDK::Metrics::Export::OTLPMetricExporter.new # this is experimental exporter that sends metrics to otlp collector through http
# OpenTelemetry.meter_provider.add_metric_reader(otlp_metric_exporter)

# meter = OpenTelemetry.meter_provider.meter('test')

# histogram = meter.create_histogram('histogram', unit: 'smidgen', description: 'a small amount of something')
# histogram.record(5, attributes: { 'foo' => 'bar' })

# otlp_metric_exporter.pull


Check notice

Code scanning / Rubocop

Don't use several empty lines in a row. Note

Layout/EmptyLines: Extra blank line detected.
module SolarWindsAPM
module OpenTelemetry
# reference: OpenTelemetry::SDK::Trace::SpanProcessor
class OTLPProcessor
HTTP_METHOD = "http.method".freeze
HTTP_ROUTE = "http.route".freeze
HTTP_STATUS_CODE = "http.status_code".freeze
HTTP_URL = "http.url".freeze
LIBOBOE_HTTP_SPAN_STATUS_UNAVAILABLE = 0

attr_reader :txn_manager
attr_accessor :description

# @param [Meter] meter the meteer created by ::OpenTelemetry.meter_provider.meter('meter_name')
# @param [TxnNameManager] txn_manager storage for transaction name
# @exporter [SolarWindsExporter] exporter SolarWindsExporter::OpenTelemetry::SolarWindsExporter
def initialize(meter, txn_manager, exporter)
@meter = meter
@txn_manager = txn_manager
@exporter = exporter
@histogram = nil
@description = nil
end

# Called when a {Span} is started, if the {Span#recording?}
# returns true.
#
# This method is called synchronously on the execution thread, should
# not throw or block the execution thread.
#
# @param [Span] span the {Span} that just started.
# @param [Context] parent_context the
# started span.
def on_start(span, parent_context)
SolarWindsAPM.logger.debug {"[#{self.class}/#{__method__}] processor on_start span: #{span.inspect}, parent_context: #{parent_context.inspect}"}

parent_span = ::OpenTelemetry::Trace.current_span(parent_context)
return if parent_span && parent_span.context != ::OpenTelemetry::Trace::SpanContext::INVALID && parent_span.context.remote? == false

trace_flags = span.context.trace_flags.sampled? ? '01' : '00'
@txn_manager.set_root_context_h(span.context.hex_trace_id,"#{span.context.hex_span_id}-#{trace_flags}")

@histogram = @meter.create_histogram('histogram', unit: 'smidgen', description: @description || '') if @histogram.nil?
rescue StandardError => e
SolarWindsAPM.logger.info {"[#{self.class}/#{__method__}] processor on_start error: #{e.message}"}
end

# Called when a {Span} is ended, if the {Span#recording?}
# returns true.
#
# This method is called synchronously on the execution thread, should
# not throw or block the execution thread.
# Only calculate inbound metrics for service root spans
#
# @param [Span] span the {Span} that just ended.
def on_finish(span)
SolarWindsAPM.logger.debug {"[#{self.class}/#{__method__}] processor on_finish span: #{span.inspect}"}

if span.parent_span_id != ::OpenTelemetry::Trace::INVALID_SPAN_ID
@exporter&.export([span.to_span_data]) if span.context.trace_flags.sampled?
return
end

meter_attrs = {'sw.service_name' => ENV['OTEL_SERVICE_NAME'], 'sw.nonce' => rand(2**64) >> 1}

span_time = calculate_span_time(start_time: span.start_timestamp, end_time: span.end_timestamp)
domain = nil

Check warning

Code scanning / Rubocop

Checks for useless assignment to a local variable. Warning

Lint/UselessAssignment: Useless assignment to variable - domain.
has_error = error?(span)
meter_attrs['sw.is_error'] = has_error ? 'true' : 'false'

trans_name = calculate_transaction_names(span)

if span_http?(span)
status_code = get_http_status_code(span)
request_method = span.attributes[HTTP_METHOD]
url_tran = span.attributes[HTTP_URL]

Check warning

Code scanning / Rubocop

Checks for useless assignment to a local variable. Warning

Lint/UselessAssignment: Useless assignment to variable - url\_tran.

meter_attrs.merge!({'http.status_code' => status_code, 'http.method' => request_method, 'sw.transaction' => trans_name})
else
meter_attrs.merge!({'sw.transaction' => trans_name})
end

@histogram.record(span_time, attributes: meter_attrs)

SolarWindsAPM.logger.debug {"[#{self.class}/#{__method__}] trans_name: #{trans_name}"}
@txn_manager["#{span.context.hex_trace_id}-#{span.context.hex_span_id}"] = trans_name if span.context.trace_flags.sampled?
@txn_manager.delete_root_context_h(span.context.hex_trace_id)
@exporter&.export([span.to_span_data]) if span.context.trace_flags.sampled?

::OpenTelemetry.meter_provider.metric_readers.each do |metric_reader|

Check notice

Code scanning / Rubocop

Use symbols as procs instead of blocks when possible. Note

Style/SymbolProc: Pass &:pull as an argument to each instead of a block.
metric_reader.pull # same as export
end
rescue StandardError => e
SolarWindsAPM.logger.info {"[#{self.class}/#{__method__}] can't flush span to exporter; processor on_finish error: #{e.message}"}
::OpenTelemetry::SDK::Trace::Export::FAILURE
end

# Export all ended spans to the configured `Exporter` that have not yet
# been exported.
#
# This method should only be called in cases where it is absolutely
# necessary, such as when using some FaaS providers that may suspend
# the process after an invocation, but before the `Processor` exports
# the completed spans.
#
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] Export::SUCCESS if no error occurred, Export::FAILURE if
# a non-specific failure occurred, Export::TIMEOUT if a timeout occurred.
def force_flush(timeout: nil)
@exporter&.force_flush(timeout: timeout) || ::OpenTelemetry::SDK::Trace::Export::SUCCESS
end

# Called when {TracerProvider#shutdown} is called.
#
# @param [optional Numeric] timeout An optional timeout in seconds.
# @return [Integer] Export::SUCCESS if no error occurred, Export::FAILURE if
# a non-specific failure occurred, Export::TIMEOUT if a timeout occurred.
def shutdown(timeout: nil)
@exporter&.shutdown(timeout: timeout) || ::OpenTelemetry::SDK::Trace::Export::SUCCESS
end

private

# This span from inbound HTTP request if from a SERVER by some http.method
def span_http?(span)
(span.kind == ::OpenTelemetry::Trace::SpanKind::SERVER && !span.attributes[HTTP_METHOD].nil?)
end

# Calculate if this span instance has_error
# return [Integer]
def error?(span)
span.status.code == ::OpenTelemetry::Trace::Status::ERROR ? 1 : 0
end

# Calculate HTTP status_code from span or default to UNAVAILABLE
# Something went wrong in OTel or instrumented service crashed early
# if no status_code in attributes of HTTP span
def get_http_status_code(span)
span.attributes[HTTP_STATUS_CODE] || LIBOBOE_HTTP_SPAN_STATUS_UNAVAILABLE
end

# Get trans_name and url_tran of this span instance.
def calculate_transaction_names(span)
trace_span_id = "#{span.context.hex_trace_id}-#{span.context.hex_span_id}"
trans_name = @txn_manager.get(trace_span_id)
if trans_name
SolarWindsAPM.logger.debug {"[#{self.class}/#{__method__}] found trans name from txn_manager: #{trans_name} by #{trace_span_id}"}
@txn_manager.del(trace_span_id)
else
trans_name = span.attributes[HTTP_ROUTE] || nil
trans_name = span.name if span.name && (trans_name.nil? || trans_name.empty?)
end
trans_name
end

# Calculate span time in microseconds (us) using start and end time
# in nanoseconds (ns). OTel span start/end_time are optional.
def calculate_span_time(start_time: nil, end_time: nil)
return 0 if start_time.nil? || end_time.nil?

((end_time.to_i - start_time.to_i) / 1e3).round
end
end
end
end
22 changes: 19 additions & 3 deletions lib/solarwinds_apm/otel_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ module OTelConfig

@@agent_enabled = true

def self.disable_agent
def self.disable_agent(reason: nil)
return unless @@agent_enabled # only show the msg once

@@agent_enabled = false
SolarWindsAPM.logger.warn {"[#{name}/#{__method__}] Agent disabled. No Trace exported."}
SolarWindsAPM.logger.warn {"[#{name}/#{__method__}] Agent disabled. No Trace exported. Reason #{reason}"}
end

def self.validate_service_key
Expand Down Expand Up @@ -95,7 +95,20 @@ def self.print_config
def self.resolve_solarwinds_processor
txn_manager = SolarWindsAPM::TxnNameManager.new
exporter = SolarWindsAPM::OpenTelemetry::SolarWindsExporter.new(txn_manager: txn_manager)
@@config[:span_processor] = SolarWindsAPM::OpenTelemetry::SolarWindsProcessor.new(exporter, txn_manager)

if ENV['OTLP_METRICS'] # or SERVERLESS
disable_agent unless defined?(::OpenTelemetry::Exporter::OTLP::MetricsExporter)

otlp_metric_exporter = ::OpenTelemetry::Exporter::OTLP::MetricsExporter.new
@@config[:metrics_exporter] = otlp_metric_exporter

meter_name = ENV['SW_APM_METER_NAME'] || SolarWindsAPM::Config[:meter_name]
meter = ::OpenTelemetry.meter_provider.meter(meter_name)
@@config[:otlp_meter] = meter
@@config[:span_processor] = SolarWindsAPM::OpenTelemetry::OTLPProcessor.new(meter, txn_manager, exporter)
else
@@config[:span_processor] = SolarWindsAPM::OpenTelemetry::SolarWindsProcessor.new(exporter, txn_manager)
end
end

def self.resolve_solarwinds_propagator
Expand Down Expand Up @@ -147,6 +160,9 @@ def self.initialize
# append our propagators
::OpenTelemetry.propagation.instance_variable_get(:@propagators).append(@@config[:propagators])

# add metrics_exporter
::OpenTelemetry.meter_provider.add_metric_reader(@@config[:metrics_exporter]) if ENV['OTLP_METRICS']

# append our processors (with our exporter)
::OpenTelemetry.tracer_provider.add_span_processor(@@config[:span_processor])

Expand Down
2 changes: 1 addition & 1 deletion lib/solarwinds_apm/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Version
MAJOR = 6 # breaking,
MINOR = 0 # feature,
PATCH = 0 # fix => BFF
PRE = 'preV4'.freeze # for pre-releases into packagecloud, set to nil for production releases into rubygems
PRE = 'preV5'.freeze # for pre-releases into packagecloud, set to nil for production releases into rubygems

STRING = [MAJOR, MINOR, PATCH, PRE].compact.join('.')
end
Expand Down

0 comments on commit 640ab89

Please sign in to comment.