diff --git a/CHANGELOG.md b/CHANGELOG.md index f0fd2243dd..f597d64da9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-instrumentation-asyncio` instrumented `asyncio.wait_for` properly raises `asyncio.TimeoutError` as expected + ([#2637](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2637)) - `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library. ([#2612](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2612)) - `opentelemetry-instrumentation-system-metrics` Permit to use psutil 6.0+. @@ -38,6 +40,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2590](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2590)) - Reference symbols from generated semantic conventions ([#2611](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2611)) +- `opentelemetry-instrumentation-psycopg` Bugfix: Handle empty statement. + ([#2644](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2644)) +- `opentelemetry-instrumentation-confluent-kafka` Confluent Kafka: Ensure consume span is ended when consumer is closed + ([#2640](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2640)) ## Version 1.25.0/0.46b0 (2024-05-31) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py index fc1b535270..f5f0d34c4f 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/src/opentelemetry/instrumentation/asyncio/__init__.py @@ -280,8 +280,11 @@ async def trace_coroutine(self, coro): # CancelledError is raised when a coroutine is cancelled # before it has a chance to run. We don't want to record # this as an error. + # Still it needs to be raised in order for `asyncio.wait_for` + # to properly work with timeout and raise accordingly `asyncio.TimeoutError` except asyncio.CancelledError: attr["state"] = "cancelled" + raise except Exception as exc: exception = exc state = determine_state(exception) diff --git a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py index 77064aeafa..312f035d36 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py +++ b/instrumentation/opentelemetry-instrumentation-asyncio/tests/test_asyncio_wait.py @@ -68,6 +68,19 @@ async def main(): spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 2) + def test_asyncio_wait_for_with_timeout(self): + expected_timeout_error = None + + async def main(): + nonlocal expected_timeout_error + try: + await asyncio.wait_for(async_func(), 0.01) + except asyncio.TimeoutError as timeout_error: + expected_timeout_error = timeout_error + + asyncio.run(main()) + self.assertNotEqual(expected_timeout_error, None) + def test_asyncio_as_completed(self): async def main(): if sys.version_info >= (3, 11): diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 45d45ccb63..3d1cc79c93 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -144,6 +144,10 @@ def consume( ): # pylint: disable=useless-super-delegation return super().consume(*args, **kwargs) + # This method is deliberately implemented in order to allow wrapt to wrap this function + def close(self): # pylint: disable=useless-super-delegation + return super().close() + class ProxiedProducer(Producer): def __init__(self, producer: Producer, tracer: Tracer): @@ -181,6 +185,11 @@ def __init__(self, consumer: Consumer, tracer: Tracer): self._current_consume_span = None self._current_context_token = None + def close(self): + return ConfluentKafkaInstrumentor.wrap_close( + self._consumer.close, self + ) + def committed(self, partitions, timeout=-1): return self._consumer.committed(partitions, timeout) @@ -303,6 +312,9 @@ def _inner_wrap_consume(func, instance, args, kwargs): func, instance, self._tracer, args, kwargs ) + def _inner_wrap_close(func, instance): + return ConfluentKafkaInstrumentor.wrap_close(func, instance) + wrapt.wrap_function_wrapper( AutoInstrumentedProducer, "produce", @@ -321,6 +333,12 @@ def _inner_wrap_consume(func, instance, args, kwargs): _inner_wrap_consume, ) + wrapt.wrap_function_wrapper( + AutoInstrumentedConsumer, + "close", + _inner_wrap_close, + ) + def _uninstrument(self, **kwargs): confluent_kafka.Producer = self._original_kafka_producer confluent_kafka.Consumer = self._original_kafka_consumer @@ -403,3 +421,9 @@ def wrap_consume(func, instance, tracer, args, kwargs): ) return records + + @staticmethod + def wrap_close(func, instance): + if instance._current_consume_span: + _end_current_consume_span(instance) + func() diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 205de27733..27653d6777 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -237,7 +237,44 @@ def test_consume(self) -> None: span_list = self.memory_exporter.get_finished_spans() self._compare_spans(span_list, expected_spans) + def test_close(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + mocked_messages = [ + MockedMessage("topic-a", 0, 0, []), + ] + expected_spans = [ + {"name": "recv", "attributes": {}}, + { + "name": "topic-a process", + "attributes": { + SpanAttributes.MESSAGING_OPERATION: "process", + SpanAttributes.MESSAGING_KAFKA_PARTITION: 0, + SpanAttributes.MESSAGING_SYSTEM: "kafka", + SpanAttributes.MESSAGING_DESTINATION: "topic-a", + SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value, + SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0", + }, + }, + ] + + consumer = MockConsumer( + mocked_messages, + { + "bootstrap.servers": "localhost:29092", + "group.id": "mygroup", + "auto.offset.reset": "earliest", + }, + ) + self.memory_exporter.clear() + consumer = instrumentation.instrument_consumer(consumer) + consumer.poll() + consumer.close() + + span_list = self.memory_exporter.get_finished_spans() + self._compare_spans(span_list, expected_spans) + def _compare_spans(self, spans, expected_spans): + self.assertEqual(len(spans), len(expected_spans)) for span, expected_span in zip(spans, expected_spans): self.assertEqual(expected_span["name"], span.name) for attribute_key, expected_attribute_value in expected_span[ diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 5d7054151a..4f61713b29 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -269,7 +269,8 @@ def get_operation_name(self, cursor, args): if isinstance(statement, Composed): statement = statement.as_string(cursor) - if isinstance(statement, str): + # `statement` can be empty string. See #2643 + if statement and isinstance(statement, str): # Strip leading comments so we get the operation name. return self._leading_comment_remover.sub("", statement).split()[0] diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py index 5a5b39d80b..dc9969ba8c 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/tests/test_psycopg_integration.py @@ -245,14 +245,18 @@ def test_span_name(self): cursor.execute("/* leading comment */ query") cursor.execute("/* leading comment */ query /* trailing comment */") cursor.execute("query /* trailing comment */") + cursor.execute("") + cursor.execute("--") spans_list = self.memory_exporter.get_finished_spans() - self.assertEqual(len(spans_list), 6) + self.assertEqual(len(spans_list), 8) self.assertEqual(spans_list[0].name, "Test") self.assertEqual(spans_list[1].name, "multi") self.assertEqual(spans_list[2].name, "tab") self.assertEqual(spans_list[3].name, "query") self.assertEqual(spans_list[4].name, "query") self.assertEqual(spans_list[5].name, "query") + self.assertEqual(spans_list[6].name, "postgresql") + self.assertEqual(spans_list[7].name, "--") # pylint: disable=unused-argument def test_not_recording(self):