Skip to content

Commit

Permalink
Extract span attrs from RQ job object & fix tests (#3786)
Browse files Browse the repository at this point in the history
  • Loading branch information
sentrivana authored Nov 21, 2024
1 parent a674788 commit 25d311e
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 34 deletions.
12 changes: 12 additions & 0 deletions MIGRATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ Looking to upgrade from Sentry SDK 2.x to 3.x? Here's a comprehensive list of wh
| `client` | `client.address`, `client.port` |
| full URL | `url.full` |

- If you're using the RQ integration, the `sampling_context` argument of `traces_sampler` doesn't contain the `rq_job` object anymore. Instead, the individual properties of the scope, if available, are accessible as follows:

| RQ property | Sampling context key(s) |
| --------------- | ---------------------------- |
| `rq_job.args` | `rq.job.args` |
| `rq_job.kwargs` | `rq.job.kwargs` |
| `rq_job.func` | `rq.job.func` |
| `queue.name` | `messaging.destination.name` |
| `job.id` | `messaging.message.id` |

Note that `rq.job.args`, `rq.job.kwargs`, and `rq.job.func` are serialized and not the actual objects on the job.

### Removed

- Spans no longer have a `description`. Use `name` instead.
Expand Down
4 changes: 2 additions & 2 deletions sentry_sdk/integrations/opentelemetry/potel_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ def _root_span_to_transaction_event(self, span):

transaction_name, transaction_source = extract_transaction_name_source(span)
span_data = extract_span_data(span)
(_, description, status, http_status, _) = span_data

trace_context = get_trace_context(span, span_data=span_data)
contexts = {"trace": trace_context}

profile_context = get_profile_context(span)
if profile_context:
contexts["profile"] = profile_context

(_, description, _, http_status, _) = span_data

if http_status:
contexts["response"] = {"status_code": http_status}

Expand Down
17 changes: 14 additions & 3 deletions sentry_sdk/integrations/opentelemetry/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def extract_span_data(span):
description = span.name
status, http_status = extract_span_status(span)
origin = None

if span.attributes is None:
return (op, description, status, http_status, origin)

Expand All @@ -133,11 +132,23 @@ def extract_span_data(span):

rpc_service = span.attributes.get(SpanAttributes.RPC_SERVICE)
if rpc_service:
return ("rpc", description, status, http_status, origin)
return (
span.attributes.get(SentrySpanAttribute.OP) or "rpc",
description,
status,
http_status,
origin,
)

messaging_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM)
if messaging_system:
return ("message", description, status, http_status, origin)
return (
span.attributes.get(SentrySpanAttribute.OP) or "message",
description,
status,
http_status,
origin,
)

faas_trigger = span.attributes.get(SpanAttributes.FAAS_TRIGGER)
if faas_trigger:
Expand Down
45 changes: 41 additions & 4 deletions sentry_sdk/integrations/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import (
_serialize_span_attribute,
capture_internal_exceptions,
ensure_integration_enabled,
event_from_exception,
Expand Down Expand Up @@ -35,6 +36,15 @@
DEFAULT_TRANSACTION_NAME = "unknown RQ task"


JOB_PROPERTY_TO_ATTRIBUTE = {
"id": "messaging.message.id",
}

QUEUE_PROPERTY_TO_ATTRIBUTE = {
"name": "messaging.destination.name",
}


class RqIntegration(Integration):
identifier = "rq"
origin = f"auto.queue.{identifier}"
Expand All @@ -54,8 +64,8 @@ def setup_once():
old_perform_job = Worker.perform_job

@ensure_integration_enabled(RqIntegration, old_perform_job)
def sentry_patched_perform_job(self, job, *args, **kwargs):
# type: (Any, Job, *Queue, **Any) -> bool
def sentry_patched_perform_job(self, job, queue, *args, **kwargs):
# type: (Any, Job, Queue, *Any, **Any) -> bool
with sentry_sdk.new_scope() as scope:
try:
transaction_name = job.func_name or DEFAULT_TRANSACTION_NAME
Expand All @@ -76,9 +86,9 @@ def sentry_patched_perform_job(self, job, *args, **kwargs):
name=transaction_name,
source=TRANSACTION_SOURCE_TASK,
origin=RqIntegration.origin,
custom_sampling_context={"rq_job": job},
attributes=_prepopulate_attributes(job, queue),
):
rv = old_perform_job(self, job, *args, **kwargs)
rv = old_perform_job(self, job, queue, *args, **kwargs)

if self.is_horse:
# We're inside of a forked process and RQ is
Expand Down Expand Up @@ -167,3 +177,30 @@ def _capture_exception(exc_info, **kwargs):
)

sentry_sdk.capture_event(event, hint=hint)


def _prepopulate_attributes(job, queue):
# type: (Job, Queue) -> dict[str, Any]
attributes = {
"messaging.system": "rq",
}

for prop, attr in JOB_PROPERTY_TO_ATTRIBUTE.items():
if getattr(job, prop, None) is not None:
attributes[attr] = getattr(job, prop)

for prop, attr in QUEUE_PROPERTY_TO_ATTRIBUTE.items():
if getattr(queue, prop, None) is not None:
attributes[attr] = getattr(queue, prop)

for key in ("args", "kwargs"):
if getattr(job, key, None):
attributes[f"rq.job.{key}"] = _serialize_span_attribute(getattr(job, key))

func = job.func
if callable(func):
func = func.__name__

attributes["rq.job.func"] = _serialize_span_attribute(func)

return attributes
41 changes: 16 additions & 25 deletions tests/integrations/rq/test_rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ def test_transaction_with_error(
)

assert envelope["type"] == "transaction"
assert envelope["contexts"]["trace"] == error_event["contexts"]["trace"]
assert envelope["contexts"]["trace"] == DictionaryContaining(
error_event["contexts"]["trace"]
)
assert envelope["transaction"] == error_event["transaction"]
assert envelope["extra"]["rq-job"] == DictionaryContaining(
{
Expand Down Expand Up @@ -148,10 +150,7 @@ def test_error_has_trace_context_if_tracing_disabled(
assert error_event["contexts"]["trace"]


def test_tracing_enabled(
sentry_init,
capture_events,
):
def test_tracing_enabled(sentry_init, capture_events, DictionaryContaining):
sentry_init(integrations=[RqIntegration()], traces_sample_rate=1.0)
events = capture_events()

Expand All @@ -165,7 +164,10 @@ def test_tracing_enabled(

assert error_event["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
assert transaction["transaction"] == "tests.integrations.rq.test_rq.crashing_job"
assert transaction["contexts"]["trace"] == error_event["contexts"]["trace"]
assert (
DictionaryContaining(error_event["contexts"]["trace"])
== transaction["contexts"]["trace"]
)


def test_tracing_disabled(
Expand Down Expand Up @@ -218,9 +220,7 @@ def test_transaction_no_error(
)


def test_traces_sampler_gets_correct_values_in_sampling_context(
sentry_init, DictionaryContaining, ObjectDescribedBy # noqa:N803
):
def test_traces_sampler_gets_correct_values_in_sampling_context(sentry_init):
traces_sampler = mock.Mock(return_value=True)
sentry_init(integrations=[RqIntegration()], traces_sampler=traces_sampler)

Expand All @@ -230,22 +230,13 @@ def test_traces_sampler_gets_correct_values_in_sampling_context(
queue.enqueue(do_trick, "Bodhi", trick="roll over")
worker.work(burst=True)

traces_sampler.assert_any_call(
DictionaryContaining(
{
"rq_job": ObjectDescribedBy(
type=rq.job.Job,
attrs={
"description": "tests.integrations.rq.test_rq.do_trick('Bodhi', trick='roll over')",
"result": "Bodhi, can you roll over? Good dog!",
"func_name": "tests.integrations.rq.test_rq.do_trick",
"args": ("Bodhi",),
"kwargs": {"trick": "roll over"},
},
),
}
)
)
sampling_context = traces_sampler.call_args_list[0][0][0]
assert sampling_context["messaging.system"] == "rq"
assert sampling_context["rq.job.args"] == ["Bodhi"]
assert sampling_context["rq.job.kwargs"] == '{"trick": "roll over"}'
assert sampling_context["rq.job.func"] == "do_trick"
assert sampling_context["messaging.message.id"]
assert sampling_context["messaging.destination.name"] == "default"


@pytest.mark.skipif(
Expand Down

0 comments on commit 25d311e

Please sign in to comment.