From e0bc9f5f35c7fda5e982065297017c91fc8a725c Mon Sep 17 00:00:00 2001 From: elijahbenizzy Date: Tue, 13 Aug 2024 14:54:23 -0700 Subject: [PATCH] Adds attribute logging to s3 tracker We just had to wire it through -- this also adds app_id + partition_key to the parameters. --- burr/integrations/opentelemetry.py | 1 + burr/lifecycle/base.py | 4 ++++ burr/tracking/common/models.py | 7 +++++++ burr/tracking/s3client.py | 16 +++++++++++++--- burr/visibility/tracing.py | 2 ++ 5 files changed, 27 insertions(+), 3 deletions(-) diff --git a/burr/integrations/opentelemetry.py b/burr/integrations/opentelemetry.py index f5804722..fb9fb59c 100644 --- a/burr/integrations/opentelemetry.py +++ b/burr/integrations/opentelemetry.py @@ -427,6 +427,7 @@ def on_end(self, span: "Span") -> None: span=cached_span.action_span, tags={}, # TODO -- log app_id=cached_span.app_id, + partition_key=cached_span.partition_key, ) diff --git a/burr/lifecycle/base.py b/burr/lifecycle/base.py index cca891ba..d256cf88 100644 --- a/burr/lifecycle/base.py +++ b/burr/lifecycle/base.py @@ -206,6 +206,8 @@ def do_log_attributes( action_sequence_id: int, span: Optional["ActionSpan"], tags: dict, + app_id: str, + partition_key: Optional[str], **future_kwargs: Any, ): pass @@ -225,6 +227,8 @@ async def do_log_attributes( action_sequence_id: int, span: "ActionSpan", tags: dict, + app_id: str, + partition_key: Optional[str], **future_kwargs: Any, ): pass diff --git a/burr/tracking/common/models.py b/burr/tracking/common/models.py index 1f4622ab..357ef0d2 100644 --- a/burr/tracking/common/models.py +++ b/burr/tracking/common/models.py @@ -189,6 +189,8 @@ class EndSpanModel(IdentifyingModel): @property def sequence_id(self) -> int: + # so we have full backwards compatibility + # the server likes them all to be called sequence_id return self.action_sequence_id @@ -203,3 +205,8 @@ class AttributeModel(IdentifyingModel): value: Union[dict, str, int, float, bool, list, None] tags: Dict[str, str] type: str = "attribute" + + @property + def sequence_id(self) -> int: + # Ditto with the above + return self.action_sequence_id diff --git a/burr/tracking/s3client.py b/burr/tracking/s3client.py index dfa86d86..79906745 100644 --- a/burr/tracking/s3client.py +++ b/burr/tracking/s3client.py @@ -19,6 +19,7 @@ from burr.tracking.common.models import ( ApplicationMetadataModel, ApplicationModel, + AttributeModel, BeginEntryModel, BeginSpanModel, EndEntryModel, @@ -84,7 +85,7 @@ def _allowed_project_name(project_name: str, on_windows: bool) -> bool: return bool(re.match(pattern, project_name)) -EventType = Union[BeginEntryModel, EndEntryModel, BeginSpanModel, EndSpanModel] +EventType = Union[BeginEntryModel, EndEntryModel, BeginSpanModel, EndSpanModel, AttributeModel] def unique_ordered_prefix() -> str: @@ -122,12 +123,22 @@ def do_log_attributes( action: str, action_sequence_id: int, span: Optional["ActionSpan"], + app_id: str, + partition_key: Optional[str], tags: dict, **future_kwargs: Any, ): # TODO -- log attributes to s3 as well # Coming up shortly - pass + for attribute_name, attribute in attributes.items(): + attribute_model = AttributeModel( + key=attribute_name, + action_sequence_id=action_sequence_id, + span_id=span.uid if span is not None else None, + value=serde.serialize(attribute, **self.serde_kwargs), + tags=tags, + ) + self.submit_log_event(attribute_model, app_id=app_id, partition_key=partition_key) def __init__( self, @@ -305,7 +316,6 @@ def post_application_create( else "None", }, ) - # TODO -- log parent relationship def pre_run_step( self, diff --git a/burr/visibility/tracing.py b/burr/visibility/tracing.py index 439a78a2..b0c72989 100644 --- a/burr/visibility/tracing.py +++ b/burr/visibility/tracing.py @@ -232,6 +232,8 @@ def log_attributes(self, **attributes): action=self.action, action_sequence_id=self.action_sequence_id, span=self.context_var.get(), + app_id=self.app_id, + partition_key=self.partition_key, tags={}, # TODO -- add tags )