Skip to content

Commit

Permalink
Confluent Kafka: Ensure consume span is ended when consumer is closed (
Browse files Browse the repository at this point in the history
  • Loading branch information
joshschltelus authored Jul 2, 2024
1 parent a29242f commit df3415b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2590](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2590))
- Reference symbols from generated semantic conventions
([#2611](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2611))
- `opentelemetry-instrumentation-confluent-kafka` Confluent Kafka: Ensure consume span is ended when consumer is closed
([#2640](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2640))

## Version 1.25.0/0.46b0 (2024-05-31)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def consume(
): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def close(self): # pylint: disable=useless-super-delegation
return super().close()


class ProxiedProducer(Producer):
def __init__(self, producer: Producer, tracer: Tracer):
Expand Down Expand Up @@ -181,6 +185,11 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
self._current_consume_span = None
self._current_context_token = None

def close(self):
return ConfluentKafkaInstrumentor.wrap_close(
self._consumer.close, self
)

def committed(self, partitions, timeout=-1):
return self._consumer.committed(partitions, timeout)

Expand Down Expand Up @@ -303,6 +312,9 @@ def _inner_wrap_consume(func, instance, args, kwargs):
func, instance, self._tracer, args, kwargs
)

def _inner_wrap_close(func, instance):
return ConfluentKafkaInstrumentor.wrap_close(func, instance)

wrapt.wrap_function_wrapper(
AutoInstrumentedProducer,
"produce",
Expand All @@ -321,6 +333,12 @@ def _inner_wrap_consume(func, instance, args, kwargs):
_inner_wrap_consume,
)

wrapt.wrap_function_wrapper(
AutoInstrumentedConsumer,
"close",
_inner_wrap_close,
)

def _uninstrument(self, **kwargs):
confluent_kafka.Producer = self._original_kafka_producer
confluent_kafka.Consumer = self._original_kafka_consumer
Expand Down Expand Up @@ -403,3 +421,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):
)

return records

@staticmethod
def wrap_close(func, instance):
if instance._current_consume_span:
_end_current_consume_span(instance)
func()
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,44 @@ def test_consume(self) -> None:
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_close(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-a", 0, 0, []),
]
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-a process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-a",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0",
},
},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
consumer.close()

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def _compare_spans(self, spans, expected_spans):
self.assertEqual(len(spans), len(expected_spans))
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
for attribute_key, expected_attribute_value in expected_span[
Expand Down

0 comments on commit df3415b

Please sign in to comment.