Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable exporting spans to snowflake stage if a TruLensSnowflakeSpanExporter is provided #1708

Merged
merged 74 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
929187f
move things out of app.py
sfc-gh-gtokernliang Jan 3, 2025
b4d2c84
add tests
sfc-gh-gtokernliang Jan 3, 2025
f112f38
update golden
sfc-gh-gtokernliang Jan 3, 2025
197cd0a
update app.py
sfc-gh-gtokernliang Jan 3, 2025
062d197
update golden
sfc-gh-gtokernliang Jan 3, 2025
1cc2a95
update golden
sfc-gh-gtokernliang Jan 3, 2025
dfc3fa5
move main output
sfc-gh-gtokernliang Jan 4, 2025
95f250c
PR feedback
sfc-gh-gtokernliang Jan 8, 2025
788f945
not sure why this is needed
sfc-gh-gtokernliang Jan 9, 2025
ef17076
add tests
sfc-gh-gtokernliang Jan 3, 2025
2c9d9bb
update golden
sfc-gh-gtokernliang Jan 3, 2025
85cbdc6
update test
sfc-gh-gtokernliang Jan 9, 2025
65544c9
update golden
sfc-gh-gtokernliang Jan 9, 2025
9b9644b
save
sfc-gh-gtokernliang Dec 16, 2024
02855f9
draft
sfc-gh-gtokernliang Dec 19, 2024
d9a4694
save
sfc-gh-gtokernliang Dec 20, 2024
5c939ac
add back debugger
sfc-gh-gtokernliang Dec 20, 2024
d025a07
update notebook
sfc-gh-gtokernliang Dec 20, 2024
bcd677f
fix
sfc-gh-gtokernliang Dec 21, 2024
8456019
update semcov
sfc-gh-gtokernliang Dec 21, 2024
d2d2361
remove artifacts
sfc-gh-gtokernliang Dec 24, 2024
b3ff929
remove span_types from SpanAttributes
sfc-gh-gtokernliang Dec 24, 2024
d4603b1
modified it to accept multiple tokens
sfc-gh-gtokernliang Dec 24, 2024
cbeb5a4
fix bug with multiple func calls
sfc-gh-gtokernliang Dec 24, 2024
1846dba
PR feedback
sfc-gh-gtokernliang Dec 24, 2024
eab220e
add tests
sfc-gh-gtokernliang Jan 2, 2025
093f8ad
add tests
sfc-gh-gtokernliang Jan 2, 2025
0236798
nits
sfc-gh-gtokernliang Jan 2, 2025
7d0e800
pr feedback
sfc-gh-gtokernliang Jan 3, 2025
a8e9084
update golden
sfc-gh-gtokernliang Jan 3, 2025
5d35d11
update test file
sfc-gh-gtokernliang Jan 3, 2025
af18dde
update golden
sfc-gh-gtokernliang Jan 6, 2025
f3c9e22
PR feedback
sfc-gh-gtokernliang Jan 9, 2025
e94147f
draft
sfc-gh-gtokernliang Jan 4, 2025
2a74796
remove file
sfc-gh-gtokernliang Jan 4, 2025
fd645d1
don't compress
sfc-gh-gtokernliang Jan 4, 2025
258abd2
remove print
sfc-gh-gtokernliang Jan 4, 2025
28cee08
update golden
sfc-gh-gtokernliang Jan 6, 2025
7dc4a1c
added comments
sfc-gh-gtokernliang Jan 6, 2025
d8422ad
gzip
sfc-gh-gtokernliang Jan 6, 2025
09f69ca
update
sfc-gh-gtokernliang Jan 9, 2025
ab0bb6a
update protobuf
sfc-gh-gtokernliang Jan 9, 2025
9e50f2c
Merge branch 'main' of github.com:truera/trulens into garett/SNOW-187…
sfc-gh-gtokernliang Jan 9, 2025
21f9b72
pr feedback
sfc-gh-gtokernliang Jan 9, 2025
37b6dcc
is this where I add it?
sfc-gh-gtokernliang Jan 9, 2025
e23b2fc
maybe here?
sfc-gh-gtokernliang Jan 10, 2025
af90428
undo migration
sfc-gh-gtokernliang Jan 10, 2025
0fce605
update
sfc-gh-gtokernliang Jan 10, 2025
0422373
save
sfc-gh-gtokernliang Jan 11, 2025
3083dd1
improvements
sfc-gh-gtokernliang Jan 11, 2025
18d1094
feedback
sfc-gh-gtokernliang Jan 11, 2025
910265b
remove app.py
sfc-gh-gtokernliang Jan 11, 2025
9280281
remove instruments.py
sfc-gh-gtokernliang Jan 11, 2025
6ca1f3c
remove unused
sfc-gh-gtokernliang Jan 11, 2025
a91b0eb
update
sfc-gh-gtokernliang Jan 11, 2025
b3c5eca
Merge branch 'main' of github.com:truera/trulens into garett/SNOW-187…
sfc-gh-gtokernliang Jan 11, 2025
bd3fb50
update
sfc-gh-gtokernliang Jan 11, 2025
881c68d
update
sfc-gh-gtokernliang Jan 11, 2025
5fff6fc
extract
sfc-gh-gtokernliang Jan 11, 2025
2d7b6a0
Merge branch 'garett/add-utils' of github.com:truera/trulens into gar…
sfc-gh-gtokernliang Jan 11, 2025
8bae5fb
nits
sfc-gh-gtokernliang Jan 11, 2025
c11b1c9
update golden
sfc-gh-gtokernliang Jan 12, 2025
5a55584
Merge branch 'garett/add-utils' of github.com:truera/trulens into gar…
sfc-gh-gtokernliang Jan 12, 2025
61d87a8
pr feedback
sfc-gh-gtokernliang Jan 14, 2025
8824794
Merge branch 'garett/add-utils' of github.com:truera/trulens into gar…
sfc-gh-gtokernliang Jan 14, 2025
a9f0537
pr feedback
sfc-gh-gtokernliang Jan 14, 2025
9f4c8a3
Merge branch 'main' of github.com:truera/trulens into garett/SNOW-187…
sfc-gh-gtokernliang Jan 16, 2025
f46af3c
save
sfc-gh-gtokernliang Jan 16, 2025
2903b52
draft
sfc-gh-gtokernliang Jan 17, 2025
1a5d455
comments
sfc-gh-gtokernliang Jan 17, 2025
0353258
PR feedback
sfc-gh-gtokernliang Jan 17, 2025
c28e10d
pr feedback
sfc-gh-gtokernliang Jan 17, 2025
1c1d718
update golden
sfc-gh-gtokernliang Jan 17, 2025
4e27038
PR feedback
sfc-gh-gtokernliang Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/experimental/otel_exporter.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@
" session=session,\n",
")\n",
"\n",
"with custom_app() as recording:\n",
"with custom_app(run_name=\"test run\", input_id=\"456\") as recording:\n",
" test_app.respond_to_query(\"test\")\n",
"\n",
"with custom_app(run_name=\"test run\", input_id=\"456\") as recording:\n",
"with custom_app() as recording:\n",
" test_app.respond_to_query(\"throw\")"
]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging
from typing import Sequence

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.sdk.trace.export import SpanExportResult
from trulens.core.database import connector as core_connector
from trulens.experimental.otel_tracing.core.exporter.utils import (
check_if_trulens_span,
)
from trulens.experimental.otel_tracing.core.exporter.utils import (
construct_event,
)

logger = logging.getLogger(__name__)


class TruLensOTELSpanExporter(SpanExporter):
"""
Implementation of `SpanExporter` that flushes the spans in the TruLens session to the connector.
"""

connector: core_connector.DBConnector
"""
The database connector used to export the spans.
"""

def __init__(self, connector: core_connector.DBConnector):
self.connector = connector

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
trulens_spans = list(filter(check_if_trulens_span, spans))

try:
events = list(map(construct_event, trulens_spans))
self.connector.add_events(events)

except Exception as e:
logger.error(
f"Error exporting spans to the database: {e}",
)
return SpanExportResult.FAILURE

return SpanExportResult.SUCCESS
107 changes: 107 additions & 0 deletions src/core/trulens/experimental/otel_tracing/core/exporter/snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import logging
import os
import tempfile
from typing import Sequence

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.sdk.trace.export import SpanExportResult
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.core.database import connector as core_connector
from trulens.experimental.otel_tracing.core.exporter.utils import (
check_if_trulens_span,
)
from trulens.experimental.otel_tracing.core.exporter.utils import (
convert_readable_span_to_proto,
)

logger = logging.getLogger(__name__)


class TruLensSnowflakeSpanExporter(SpanExporter):
"""
Implementation of `SpanExporter` that flushes the spans in the TruLens session to a Snowflake Stage.
"""

connector: core_connector.DBConnector
"""
The database connector used to export the spans.
"""

def __init__(self, connector: core_connector.DBConnector):
self.connector = connector

def _export_to_snowflake_stage(
self, spans: Sequence[ReadableSpan]
) -> SpanExportResult:
"""
Exports a list of spans to a Snowflake stage as a protobuf file.
This function performs the following steps:
1. Writes the provided spans to a temporary protobuf file.
2. Creates a Snowflake stage if it does not already exist.
3. Uploads the temporary protobuf file to the Snowflake stage.
4. Removes the temporary protobuf file.
Args:
spans (Sequence[ReadableSpan]): A sequence of spans to be exported.
Returns:
SpanExportResult: The result of the export operation, either SUCCESS or FAILURE.
"""
if not isinstance(self.connector, SnowflakeConnector):
return SpanExportResult.FAILURE

# Avoid uploading empty files to the stage
if not spans:
return SpanExportResult.SUCCESS

snowpark_session = self.connector.snowpark_session
tmp_file_path = ""

try:
with tempfile.NamedTemporaryFile(
delete=False, suffix=".pb", mode="wb"
) as tmp_file:
tmp_file_path = tmp_file.name
logger.debug(
f"Writing spans to the protobuf file: {tmp_file_path}"
)

for span in spans:
span_proto = convert_readable_span_to_proto(span)
tmp_file.write(span_proto.SerializeToString())
logger.debug(
f"Spans written to the protobuf file: {tmp_file_path}"
)
except Exception as e:
logger.error(f"Error writing spans to the protobuf file: {e}")
return SpanExportResult.FAILURE

try:
logger.debug("Uploading file to Snowflake stage")

logger.debug("Creating Snowflake stage if it does not exist")
snowpark_session.sql(
"CREATE TEMP STAGE IF NOT EXISTS trulens_spans"
).collect()

logger.debug("Uploading the protobuf file to the stage")
snowpark_session.sql(
f"PUT file://{tmp_file_path} @trulens_spans"
).collect()

except Exception as e:
logger.error(f"Error uploading the protobuf file to the stage: {e}")
return SpanExportResult.FAILURE

try:
logger.debug("Removing the temporary protobuf file")
os.remove(tmp_file_path)
except Exception as e:
# Not returning failure here since the export was technically a success
logger.error(f"Error removing the temporary protobuf file: {e}")

return SpanExportResult.SUCCESS

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
trulens_spans = list(filter(check_if_trulens_span, spans))

return self._export_to_snowflake_stage(trulens_spans)
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
from datetime import datetime
import logging
import os
import tempfile
from typing import Any, Optional, Sequence
from typing import Any, Optional

from opentelemetry.proto.common.v1.common_pb2 import AnyValue
from opentelemetry.proto.common.v1.common_pb2 import ArrayValue
from opentelemetry.proto.common.v1.common_pb2 import KeyValue
from opentelemetry.proto.common.v1.common_pb2 import KeyValueList
from opentelemetry.proto.trace.v1.trace_pb2 import Span as SpanProto
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.sdk.trace.export import SpanExportResult
from opentelemetry.trace import StatusCode
from trulens.connectors.snowflake import SnowflakeConnector
from trulens.core.database import connector as core_connector
from trulens.core.schema import event as event_schema
from trulens.otel.semconv.trace import SpanAttributes

Expand Down Expand Up @@ -147,121 +141,3 @@ def construct_event(span: ReadableSpan) -> event_schema.Event:
"parent_id": str(parent.span_id if parent else ""),
},
)


class TruLensSnowflakeSpanExporter(SpanExporter):
"""
Implementation of `SpanExporter` that flushes the spans in the TruLens session to a Snowflake Stage.
"""

connector: core_connector.DBConnector
"""
The database connector used to export the spans.
"""

def __init__(self, connector: core_connector.DBConnector):
self.connector = connector

def _export_to_snowflake_stage(
self, spans: Sequence[ReadableSpan]
) -> SpanExportResult:
"""
Exports a list of spans to a Snowflake stage as a protobuf file.
This function performs the following steps:
1. Writes the provided spans to a temporary protobuf file.
2. Creates a Snowflake stage if it does not already exist.
3. Uploads the temporary protobuf file to the Snowflake stage.
4. Removes the temporary protobuf file.
Args:
spans (Sequence[ReadableSpan]): A sequence of spans to be exported.
Returns:
SpanExportResult: The result of the export operation, either SUCCESS or FAILURE.
"""
if not isinstance(self.connector, SnowflakeConnector):
return SpanExportResult.FAILURE

# Avoid uploading empty files to the stage
if not spans:
return SpanExportResult.SUCCESS

snowpark_session = self.connector.snowpark_session
tmp_file_path = ""

try:
with tempfile.NamedTemporaryFile(
delete=False, suffix=".pb", mode="wb"
) as tmp_file:
tmp_file_path = tmp_file.name
logger.debug(
f"Writing spans to the protobuf file: {tmp_file_path}"
)

for span in spans:
span_proto = convert_readable_span_to_proto(span)
tmp_file.write(span_proto.SerializeToString())
logger.debug(
f"Spans written to the protobuf file: {tmp_file_path}"
)
except Exception as e:
logger.error(f"Error writing spans to the protobuf file: {e}")
return SpanExportResult.FAILURE

try:
logger.debug("Uploading file to Snowflake stage")

logger.debug("Creating Snowflake stage if it does not exist")
snowpark_session.sql(
"CREATE TEMP STAGE IF NOT EXISTS trulens_spans"
).collect()

logger.debug("Uploading the protobuf file to the stage")
snowpark_session.sql(
f"PUT file://{tmp_file_path} @trulens_spans"
).collect()

except Exception as e:
logger.error(f"Error uploading the protobuf file to the stage: {e}")
return SpanExportResult.FAILURE

try:
logger.debug("Removing the temporary protobuf file")
os.remove(tmp_file_path)
except Exception as e:
# Not returning failure here since the export was technically a success
logger.error(f"Error removing the temporary protobuf file: {e}")

return SpanExportResult.SUCCESS

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
trulens_spans = list(filter(check_if_trulens_span, spans))

return self._export_to_snowflake_stage(trulens_spans)


class TruLensOTELSpanExporter(SpanExporter):
"""
Implementation of `SpanExporter` that flushes the spans in the TruLens session to the connector.
"""

connector: core_connector.DBConnector
"""
The database connector used to export the spans.
"""

def __init__(self, connector: core_connector.DBConnector):
self.connector = connector

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
trulens_spans = list(filter(check_if_trulens_span, spans))

try:
events = list(map(construct_event, trulens_spans))
self.connector.add_events(events)

except Exception as e:
logger.error(
f"Error exporting spans to the database: {e}",
)
return SpanExportResult.FAILURE

return SpanExportResult.SUCCESS
25 changes: 16 additions & 9 deletions src/core/trulens/experimental/otel_tracing/core/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid

from opentelemetry import trace
from opentelemetry.baggage import get_baggage
from opentelemetry.baggage import remove_baggage
from opentelemetry.baggage import set_baggage
import opentelemetry.context as context_api
Expand Down Expand Up @@ -110,17 +111,24 @@ class OTELRecordingContext:
context api keeps track of what is changed in the context, and used to undo the changes.
"""

context_keys_added: List[str] = []
"""
Keys added to the OTEL context.
"""

def __init__(self, *, app: core_app.App, run_name: str, input_id: str):
self.app = app
self.run_name = run_name
self.input_id = input_id
self.tokens = []
self.context_keys_added = []
self.span_context = None

# Calling set_baggage does not actually add the baggage to the current context, but returns a new one
# To avoid issues with remembering to add/remove the baggage, we attach it to the runtime context.
def attach_to_context(self, key: str, value: object):
self.tokens.append(context_api.attach(set_baggage(key, value)))
self.context_keys_added.append(key)

# For use as a context manager.
def __enter__(self):
Expand All @@ -130,7 +138,10 @@ def __enter__(self):

tracer = trace.get_tracer_provider().get_tracer(TRULENS_SERVICE_NAME)

self.attach_to_context(SpanAttributes.RECORD_ID, otel_record_id)
# There might be nested records - in those scenarios use the outer one.
if not get_baggage(SpanAttributes.RECORD_ID):
self.attach_to_context(SpanAttributes.RECORD_ID, otel_record_id)

self.attach_to_context(SpanAttributes.APP_NAME, self.app.app_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do the same as RECORD_ID For the other attributes? Since I guess the APP_NAME and other stuff aren't thread safe since if you call the child LLM app then continue working the baggage will get changed on you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed attach_to_context to check for existence + ensure that value is non null

self.attach_to_context(SpanAttributes.APP_VERSION, self.app.app_version)

Expand Down Expand Up @@ -171,16 +182,12 @@ def __exit__(self, exc_type, exc_value, exc_tb):
# TODO[SNOW-1854360]: Add in feature function spans.
self.span_context.__exit__(exc_type, exc_value, exc_tb)

remove_baggage(SpanAttributes.RECORD_ID)
remove_baggage(SpanAttributes.APP_NAME)
remove_baggage(SpanAttributes.APP_VERSION)

# Safe to remove baggage keys even if we did not set them
remove_baggage(SpanAttributes.RUN_NAME)
remove_baggage(SpanAttributes.INPUT_ID)

logger.debug("Exiting the OTEL app context.")

# Clearing the context / baggage added.
while self.context_keys_added:
remove_baggage(self.context_keys_added.pop())

while self.tokens:
# Clearing the context once we're done with this root span.
# See https://github.com/open-telemetry/opentelemetry-python/issues/2432#issuecomment-1593458684
Expand Down
2 changes: 1 addition & 1 deletion src/core/trulens/experimental/otel_tracing/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from trulens.core.database.connector import DBConnector
from trulens.core.utils import python as python_utils
from trulens.core.utils import text as text_utils
from trulens.experimental.otel_tracing.core.exporter import (
from trulens.experimental.otel_tracing.core.exporter.connector import (
TruLensOTELSpanExporter,
)

Expand Down
Loading
Loading