From fc9f402be61dc663643cf37ecacae88f6910e957 Mon Sep 17 00:00:00 2001 From: Garett Tok Ern Liang Date: Sat, 21 Dec 2024 14:03:13 -0500 Subject: [PATCH] Prototype OTEL exporter (#1694) --- examples/experimental/otel_exporter.ipynb | 42 +++-------- src/core/trulens/core/database/base.py | 13 ++++ .../trulens/core/database/connector/base.py | 21 ++++++ src/core/trulens/core/database/sqlalchemy.py | 11 +++ .../otel_tracing/core/exporter.py | 70 +++++++++++++++++++ .../experimental/otel_tracing/core/init.py | 54 ++++++++++++++ .../unit/static/golden/api.trulens.3.11.yaml | 3 + .../static/golden/api.trulens_eval.3.11.yaml | 2 + 8 files changed, 183 insertions(+), 33 deletions(-) create mode 100644 src/core/trulens/experimental/otel_tracing/core/exporter.py create mode 100644 src/core/trulens/experimental/otel_tracing/core/init.py diff --git a/examples/experimental/otel_exporter.ipynb b/examples/experimental/otel_exporter.ipynb index 0eddcb41c..94b2b0a0a 100644 --- a/examples/experimental/otel_exporter.ipynb +++ b/examples/experimental/otel_exporter.ipynb @@ -26,34 +26,6 @@ " sys.path.append(str(base_dir))" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from opentelemetry import trace\n", - "from opentelemetry.sdk.resources import Resource\n", - "from opentelemetry.sdk.trace import TracerProvider\n", - "from opentelemetry.sdk.trace.export import BatchSpanProcessor\n", - "from opentelemetry.sdk.trace.export import ConsoleSpanExporter\n", - "\n", - "SERVICE_NAME = \"trulens\"\n", - "\n", - "\n", - "def init():\n", - " # Use Resource.create() instead of constructor directly\n", - " resource = Resource.create({\"service.name\": SERVICE_NAME})\n", - "\n", - " trace.set_tracer_provider(TracerProvider(resource=resource))\n", - " trace.get_tracer_provider().add_span_processor(\n", - " BatchSpanProcessor(ConsoleSpanExporter())\n", - " )\n", - "\n", - "\n", - "init()" - ] - }, { "cell_type": "code", "execution_count": null, @@ -62,13 +34,13 @@ "source": [ "from typing import Callable\n", "\n", + "from opentelemetry import trace\n", "from trulens.apps.custom import instrument\n", - "\n", - "SERVICE_NAME = \"trulens\"\n", + "from trulens.experimental.otel_tracing.core.init import TRULENS_SERVICE_NAME\n", "\n", "\n", "def decorator(func: Callable):\n", - " tracer = trace.get_tracer(SERVICE_NAME)\n", + " tracer = trace.get_tracer(TRULENS_SERVICE_NAME)\n", "\n", " def wrapper(*args, **kwargs):\n", " print(\"start wrap\")\n", @@ -116,8 +88,10 @@ "outputs": [], "source": [ "from trulens.core.session import TruSession\n", + "from trulens.experimental.otel_tracing.core.init import init\n", "\n", - "SERVICE_NAME = \"trulens\"" + "session = TruSession()\n", + "init(session)" ] }, { @@ -126,7 +100,9 @@ "metadata": {}, "outputs": [], "source": [ - "session = TruSession()" + "test_app = TestApp()\n", + "\n", + "test_app.respond_to_query(\"test\")" ] } ], diff --git a/src/core/trulens/core/database/base.py b/src/core/trulens/core/database/base.py index 0c2cbc6c5..8501fa652 100644 --- a/src/core/trulens/core/database/base.py +++ b/src/core/trulens/core/database/base.py @@ -6,6 +6,7 @@ import pandas as pd from trulens.core.schema import app as app_schema from trulens.core.schema import dataset as dataset_schema +from trulens.core.schema import event as event_schema from trulens.core.schema import feedback as feedback_schema from trulens.core.schema import groundtruth as groundtruth_schema from trulens.core.schema import record as record_schema @@ -446,3 +447,15 @@ def get_datasets(self) -> pd.DataFrame: A dataframe with the datasets. """ raise NotImplementedError() + + @abc.abstractmethod + def insert_event(self, event: event_schema.Event) -> types_schema.EventID: + """Insert an event into the database. + + Args: + event: The event to insert. + + Returns: + The id of the given event. + """ + raise NotImplementedError() diff --git a/src/core/trulens/core/database/connector/base.py b/src/core/trulens/core/database/connector/base.py index f6cd14644..9f32cb283 100644 --- a/src/core/trulens/core/database/connector/base.py +++ b/src/core/trulens/core/database/connector/base.py @@ -20,6 +20,7 @@ from trulens.core._utils.pycompat import Future # code style exception from trulens.core.database import base as core_db from trulens.core.schema import app as app_schema +from trulens.core.schema import event as event_schema from trulens.core.schema import feedback as feedback_schema from trulens.core.schema import record as record_schema from trulens.core.schema import types as types_schema @@ -260,6 +261,7 @@ def add_feedbacks( ], ) -> List[types_schema.FeedbackResultID]: """Add multiple feedback results to the database and return their unique ids. + # TODO: This is slow and should be batched or otherwise optimized in the future. Args: feedback_results: An iterable with each iteration being a [FeedbackResult][trulens.core.schema.feedback.FeedbackResult] or @@ -408,3 +410,22 @@ def get_leaderboard( .mean() .sort_values(by=feedback_cols, ascending=False) ) + + def add_event(self, event: event_schema.Event): + """ + Add an event to the database. + + Args: + event: The event to add to the database. + """ + return self.db.insert_event(event=event) + + def add_events(self, events: List[event_schema.Event]): + """ + Add multiple events to the database. + # TODO: This is slow and should be batched or otherwise optimized in the future. + + Args: + events: A list of events to add to the database. + """ + return [self.add_event(event=event) for event in events] diff --git a/src/core/trulens/core/database/sqlalchemy.py b/src/core/trulens/core/database/sqlalchemy.py index 80843c88b..4f7811f7b 100644 --- a/src/core/trulens/core/database/sqlalchemy.py +++ b/src/core/trulens/core/database/sqlalchemy.py @@ -45,6 +45,7 @@ from trulens.core.schema import groundtruth as groundtruth_schema from trulens.core.schema import record as record_schema from trulens.core.schema import types as types_schema +from trulens.core.schema.event import Event from trulens.core.utils import pyschema as pyschema_utils from trulens.core.utils import python as python_utils from trulens.core.utils import serial as serial_utils @@ -1041,6 +1042,16 @@ def get_datasets(self) -> pd.DataFrame: columns=["dataset_id", "name", "meta"], ) + def insert_event(self, event: Event) -> types_schema.EventID: + """See [DB.insert_event][trulens.core.database.base.DB.insert_event].""" + with self.session.begin() as session: + _event = self.orm.Event.parse(event, redact_keys=self.redact_keys) + session.add(_event) + logger.info( + f"{text_utils.UNICODE_CHECK} added event {_event.event_id}" + ) + return _event.event_id + # Use this Perf for missing Perfs. # TODO: Migrate the database instead. diff --git a/src/core/trulens/experimental/otel_tracing/core/exporter.py b/src/core/trulens/experimental/otel_tracing/core/exporter.py new file mode 100644 index 000000000..9c44b9eeb --- /dev/null +++ b/src/core/trulens/experimental/otel_tracing/core/exporter.py @@ -0,0 +1,70 @@ +from datetime import datetime +import logging +from typing import Optional, Sequence + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExporter +from opentelemetry.sdk.trace.export import SpanExportResult +from opentelemetry.trace import StatusCode +from trulens.core.database import connector as core_connector +from trulens.core.schema import event as event_schema + +logger = logging.getLogger(__name__) + + +def to_timestamp(timestamp: Optional[int]) -> datetime: + if timestamp: + return datetime.fromtimestamp(timestamp * 1e-9) + + return datetime.now() + + +class TruLensDBSpanExporter(SpanExporter): + """ + Implementation of `SpanExporter` that flushes the spans to the database in the TruLens session. + """ + + connector: core_connector.DBConnector + + def __init__(self, connector: core_connector.DBConnector): + self.connector = connector + + def _construct_event(self, span: ReadableSpan) -> event_schema.Event: + context = span.get_span_context() + parent = span.parent + + if context is None: + raise ValueError("Span context is None") + + return event_schema.Event( + event_id=str(context.span_id), + record={ + "name": span.name, + "kind": "SPAN_KIND_TRULENS", + "parent_span_id": str(parent.span_id if parent else ""), + "status": "STATUS_CODE_ERROR" + if span.status.status_code == StatusCode.ERROR + else "STATUS_CODE_UNSET", + }, + record_attributes=span.attributes, + record_type=event_schema.EventRecordType.SPAN, + resource_attributes=span.resource.attributes, + start_timestamp=to_timestamp(span.start_time), + timestamp=to_timestamp(span.end_time), + trace={ + "span_id": str(context.span_id), + "trace_id": str(context.trace_id), + "parent_id": str(parent.span_id if parent else ""), + }, + ) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + try: + events = list(map(self._construct_event, spans)) + self.connector.add_events(events) + + except Exception as e: + logger.error("Error exporting spans to the database: %s", e) + return SpanExportResult.FAILURE + + return SpanExportResult.SUCCESS diff --git a/src/core/trulens/experimental/otel_tracing/core/init.py b/src/core/trulens/experimental/otel_tracing/core/init.py new file mode 100644 index 000000000..aa6f0f67e --- /dev/null +++ b/src/core/trulens/experimental/otel_tracing/core/init.py @@ -0,0 +1,54 @@ +import logging + +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from trulens.core.session import TruSession +from trulens.experimental.otel_tracing.core.exporter import ( + TruLensDBSpanExporter, +) + +TRULENS_SERVICE_NAME = "trulens" + + +logger = logging.getLogger(__name__) + + +def init(session: TruSession, debug: bool = False): + """Initialize the OpenTelemetry SDK with TruLens configuration.""" + resource = Resource.create({"service.name": TRULENS_SERVICE_NAME}) + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + + if debug: + logging.debug( + "Initializing OpenTelemetry with TruLens configuration for console debugging" + ) + # Add a console exporter for debugging purposes + console_exporter = ConsoleSpanExporter() + console_processor = SimpleSpanProcessor(console_exporter) + provider.add_span_processor(console_processor) + + if session.connector: + logging.debug("Exporting traces to the TruLens database") + + # Check the database revision + try: + db_revision = session.connector.db.get_db_revision() + if db_revision is None: + raise ValueError( + "Database revision is not set. Please run the migrations." + ) + if int(db_revision) < 10: + raise ValueError( + "Database revision is too low. Please run the migrations." + ) + except Exception: + raise ValueError("Error checking the database revision.") + + # Add the TruLens database exporter + db_exporter = TruLensDBSpanExporter(session.connector) + db_processor = SimpleSpanProcessor(db_exporter) + provider.add_span_processor(db_processor) diff --git a/tests/unit/static/golden/api.trulens.3.11.yaml b/tests/unit/static/golden/api.trulens.3.11.yaml index ae63b815b..a10e00837 100644 --- a/tests/unit/static/golden/api.trulens.3.11.yaml +++ b/tests/unit/static/golden/api.trulens.3.11.yaml @@ -241,6 +241,7 @@ trulens.core.database.base.DB: get_virtual_ground_truth: builtins.function insert_app: builtins.function insert_dataset: builtins.function + insert_event: builtins.function insert_feedback: builtins.function insert_feedback_definition: builtins.function insert_ground_truth: builtins.function @@ -270,6 +271,8 @@ trulens.core.database.connector.base.DBConnector: attributes: RECORDS_BATCH_TIMEOUT_IN_SEC: builtins.int add_app: builtins.function + add_event: builtins.function + add_events: builtins.function add_feedback: builtins.function add_feedback_definition: builtins.function add_feedbacks: builtins.function diff --git a/tests/unit/static/golden/api.trulens_eval.3.11.yaml b/tests/unit/static/golden/api.trulens_eval.3.11.yaml index 46b095082..8eda75eaf 100644 --- a/tests/unit/static/golden/api.trulens_eval.3.11.yaml +++ b/tests/unit/static/golden/api.trulens_eval.3.11.yaml @@ -1824,6 +1824,7 @@ trulens_eval.database.base.DB: get_virtual_ground_truth: builtins.function insert_app: builtins.function insert_dataset: builtins.function + insert_event: builtins.function insert_feedback: builtins.function insert_feedback_definition: builtins.function insert_ground_truth: builtins.function @@ -1982,6 +1983,7 @@ trulens_eval.database.sqlalchemy.SQLAlchemyDB: get_virtual_ground_truth: builtins.function insert_app: builtins.function insert_dataset: builtins.function + insert_event: builtins.function insert_feedback: builtins.function insert_feedback_definition: builtins.function insert_ground_truth: builtins.function