Skip to content

Commit

Permalink
Prototype OTEL exporter (#1694)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-gtokernliang authored Dec 21, 2024
1 parent 226446d commit fc9f402
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 33 deletions.
42 changes: 9 additions & 33 deletions examples/experimental/otel_exporter.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,6 @@
" sys.path.append(str(base_dir))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from opentelemetry import trace\n",
"from opentelemetry.sdk.resources import Resource\n",
"from opentelemetry.sdk.trace import TracerProvider\n",
"from opentelemetry.sdk.trace.export import BatchSpanProcessor\n",
"from opentelemetry.sdk.trace.export import ConsoleSpanExporter\n",
"\n",
"SERVICE_NAME = \"trulens\"\n",
"\n",
"\n",
"def init():\n",
" # Use Resource.create() instead of constructor directly\n",
" resource = Resource.create({\"service.name\": SERVICE_NAME})\n",
"\n",
" trace.set_tracer_provider(TracerProvider(resource=resource))\n",
" trace.get_tracer_provider().add_span_processor(\n",
" BatchSpanProcessor(ConsoleSpanExporter())\n",
" )\n",
"\n",
"\n",
"init()"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -62,13 +34,13 @@
"source": [
"from typing import Callable\n",
"\n",
"from opentelemetry import trace\n",
"from trulens.apps.custom import instrument\n",
"\n",
"SERVICE_NAME = \"trulens\"\n",
"from trulens.experimental.otel_tracing.core.init import TRULENS_SERVICE_NAME\n",
"\n",
"\n",
"def decorator(func: Callable):\n",
" tracer = trace.get_tracer(SERVICE_NAME)\n",
" tracer = trace.get_tracer(TRULENS_SERVICE_NAME)\n",
"\n",
" def wrapper(*args, **kwargs):\n",
" print(\"start wrap\")\n",
Expand Down Expand Up @@ -116,8 +88,10 @@
"outputs": [],
"source": [
"from trulens.core.session import TruSession\n",
"from trulens.experimental.otel_tracing.core.init import init\n",
"\n",
"SERVICE_NAME = \"trulens\""
"session = TruSession()\n",
"init(session)"
]
},
{
Expand All @@ -126,7 +100,9 @@
"metadata": {},
"outputs": [],
"source": [
"session = TruSession()"
"test_app = TestApp()\n",
"\n",
"test_app.respond_to_query(\"test\")"
]
}
],
Expand Down
13 changes: 13 additions & 0 deletions src/core/trulens/core/database/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd
from trulens.core.schema import app as app_schema
from trulens.core.schema import dataset as dataset_schema
from trulens.core.schema import event as event_schema
from trulens.core.schema import feedback as feedback_schema
from trulens.core.schema import groundtruth as groundtruth_schema
from trulens.core.schema import record as record_schema
Expand Down Expand Up @@ -446,3 +447,15 @@ def get_datasets(self) -> pd.DataFrame:
A dataframe with the datasets.
"""
raise NotImplementedError()

@abc.abstractmethod
def insert_event(self, event: event_schema.Event) -> types_schema.EventID:
"""Insert an event into the database.
Args:
event: The event to insert.
Returns:
The id of the given event.
"""
raise NotImplementedError()
21 changes: 21 additions & 0 deletions src/core/trulens/core/database/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from trulens.core._utils.pycompat import Future # code style exception
from trulens.core.database import base as core_db
from trulens.core.schema import app as app_schema
from trulens.core.schema import event as event_schema
from trulens.core.schema import feedback as feedback_schema
from trulens.core.schema import record as record_schema
from trulens.core.schema import types as types_schema
Expand Down Expand Up @@ -260,6 +261,7 @@ def add_feedbacks(
],
) -> List[types_schema.FeedbackResultID]:
"""Add multiple feedback results to the database and return their unique ids.
# TODO: This is slow and should be batched or otherwise optimized in the future.
Args:
feedback_results: An iterable with each iteration being a [FeedbackResult][trulens.core.schema.feedback.FeedbackResult] or
Expand Down Expand Up @@ -408,3 +410,22 @@ def get_leaderboard(
.mean()
.sort_values(by=feedback_cols, ascending=False)
)

def add_event(self, event: event_schema.Event):
"""
Add an event to the database.
Args:
event: The event to add to the database.
"""
return self.db.insert_event(event=event)

def add_events(self, events: List[event_schema.Event]):
"""
Add multiple events to the database.
# TODO: This is slow and should be batched or otherwise optimized in the future.
Args:
events: A list of events to add to the database.
"""
return [self.add_event(event=event) for event in events]
11 changes: 11 additions & 0 deletions src/core/trulens/core/database/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from trulens.core.schema import groundtruth as groundtruth_schema
from trulens.core.schema import record as record_schema
from trulens.core.schema import types as types_schema
from trulens.core.schema.event import Event
from trulens.core.utils import pyschema as pyschema_utils
from trulens.core.utils import python as python_utils
from trulens.core.utils import serial as serial_utils
Expand Down Expand Up @@ -1041,6 +1042,16 @@ def get_datasets(self) -> pd.DataFrame:
columns=["dataset_id", "name", "meta"],
)

def insert_event(self, event: Event) -> types_schema.EventID:
"""See [DB.insert_event][trulens.core.database.base.DB.insert_event]."""
with self.session.begin() as session:
_event = self.orm.Event.parse(event, redact_keys=self.redact_keys)
session.add(_event)
logger.info(
f"{text_utils.UNICODE_CHECK} added event {_event.event_id}"
)
return _event.event_id


# Use this Perf for missing Perfs.
# TODO: Migrate the database instead.
Expand Down
70 changes: 70 additions & 0 deletions src/core/trulens/experimental/otel_tracing/core/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from datetime import datetime
import logging
from typing import Optional, Sequence

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.sdk.trace.export import SpanExportResult
from opentelemetry.trace import StatusCode
from trulens.core.database import connector as core_connector
from trulens.core.schema import event as event_schema

logger = logging.getLogger(__name__)


def to_timestamp(timestamp: Optional[int]) -> datetime:
if timestamp:
return datetime.fromtimestamp(timestamp * 1e-9)

return datetime.now()


class TruLensDBSpanExporter(SpanExporter):
"""
Implementation of `SpanExporter` that flushes the spans to the database in the TruLens session.
"""

connector: core_connector.DBConnector

def __init__(self, connector: core_connector.DBConnector):
self.connector = connector

def _construct_event(self, span: ReadableSpan) -> event_schema.Event:
context = span.get_span_context()
parent = span.parent

if context is None:
raise ValueError("Span context is None")

return event_schema.Event(
event_id=str(context.span_id),
record={
"name": span.name,
"kind": "SPAN_KIND_TRULENS",
"parent_span_id": str(parent.span_id if parent else ""),
"status": "STATUS_CODE_ERROR"
if span.status.status_code == StatusCode.ERROR
else "STATUS_CODE_UNSET",
},
record_attributes=span.attributes,
record_type=event_schema.EventRecordType.SPAN,
resource_attributes=span.resource.attributes,
start_timestamp=to_timestamp(span.start_time),
timestamp=to_timestamp(span.end_time),
trace={
"span_id": str(context.span_id),
"trace_id": str(context.trace_id),
"parent_id": str(parent.span_id if parent else ""),
},
)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
try:
events = list(map(self._construct_event, spans))
self.connector.add_events(events)

except Exception as e:
logger.error("Error exporting spans to the database: %s", e)
return SpanExportResult.FAILURE

return SpanExportResult.SUCCESS
54 changes: 54 additions & 0 deletions src/core/trulens/experimental/otel_tracing/core/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging

from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from trulens.core.session import TruSession
from trulens.experimental.otel_tracing.core.exporter import (
TruLensDBSpanExporter,
)

TRULENS_SERVICE_NAME = "trulens"


logger = logging.getLogger(__name__)


def init(session: TruSession, debug: bool = False):
"""Initialize the OpenTelemetry SDK with TruLens configuration."""
resource = Resource.create({"service.name": TRULENS_SERVICE_NAME})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

if debug:
logging.debug(
"Initializing OpenTelemetry with TruLens configuration for console debugging"
)
# Add a console exporter for debugging purposes
console_exporter = ConsoleSpanExporter()
console_processor = SimpleSpanProcessor(console_exporter)
provider.add_span_processor(console_processor)

if session.connector:
logging.debug("Exporting traces to the TruLens database")

# Check the database revision
try:
db_revision = session.connector.db.get_db_revision()
if db_revision is None:
raise ValueError(
"Database revision is not set. Please run the migrations."
)
if int(db_revision) < 10:
raise ValueError(
"Database revision is too low. Please run the migrations."
)
except Exception:
raise ValueError("Error checking the database revision.")

# Add the TruLens database exporter
db_exporter = TruLensDBSpanExporter(session.connector)
db_processor = SimpleSpanProcessor(db_exporter)
provider.add_span_processor(db_processor)
3 changes: 3 additions & 0 deletions tests/unit/static/golden/api.trulens.3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ trulens.core.database.base.DB:
get_virtual_ground_truth: builtins.function
insert_app: builtins.function
insert_dataset: builtins.function
insert_event: builtins.function
insert_feedback: builtins.function
insert_feedback_definition: builtins.function
insert_ground_truth: builtins.function
Expand Down Expand Up @@ -270,6 +271,8 @@ trulens.core.database.connector.base.DBConnector:
attributes:
RECORDS_BATCH_TIMEOUT_IN_SEC: builtins.int
add_app: builtins.function
add_event: builtins.function
add_events: builtins.function
add_feedback: builtins.function
add_feedback_definition: builtins.function
add_feedbacks: builtins.function
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/static/golden/api.trulens_eval.3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1824,6 +1824,7 @@ trulens_eval.database.base.DB:
get_virtual_ground_truth: builtins.function
insert_app: builtins.function
insert_dataset: builtins.function
insert_event: builtins.function
insert_feedback: builtins.function
insert_feedback_definition: builtins.function
insert_ground_truth: builtins.function
Expand Down Expand Up @@ -1982,6 +1983,7 @@ trulens_eval.database.sqlalchemy.SQLAlchemyDB:
get_virtual_ground_truth: builtins.function
insert_app: builtins.function
insert_dataset: builtins.function
insert_event: builtins.function
insert_feedback: builtins.function
insert_feedback_definition: builtins.function
insert_ground_truth: builtins.function
Expand Down

0 comments on commit fc9f402

Please sign in to comment.