Skip to content

Commit

Permalink
Prototype @Instrument with OTEL (#1693)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-gtokernliang authored Dec 23, 2024
1 parent fc9f402 commit 706f65b
Show file tree
Hide file tree
Showing 9 changed files with 506 additions and 43 deletions.
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ env-%:

env-tests:
poetry run pip install \
pytest \
jsondiff \
nbconvert \
nbformat \
pytest-subtests \
pytest-azurepipelines \
ruff \
opentelemetry-sdk \
pre-commit \
pytest \
pytest-azurepipelines \
pytest-cov \
jsondiff
pytest-subtests \
ruff \

env-tests-required:
poetry install --only required \
Expand Down
93 changes: 59 additions & 34 deletions examples/experimental/otel_exporter.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,18 @@
"metadata": {},
"outputs": [],
"source": [
"from typing import Callable\n",
"import logging\n",
"\n",
"from opentelemetry import trace\n",
"from trulens.apps.custom import instrument\n",
"from trulens.experimental.otel_tracing.core.init import TRULENS_SERVICE_NAME\n",
"\n",
"\n",
"def decorator(func: Callable):\n",
" tracer = trace.get_tracer(TRULENS_SERVICE_NAME)\n",
"\n",
" def wrapper(*args, **kwargs):\n",
" print(\"start wrap\")\n",
"\n",
" with tracer.start_as_current_span(\"custom\"):\n",
" result = func(*args, **kwargs)\n",
" span = trace.get_current_span()\n",
" print(\"---span---\")\n",
" print(span.get_span_context())\n",
" span.set_attribute(\"result\", result)\n",
" span.set_status(trace.Status(trace.StatusCode.OK))\n",
" return result\n",
"\n",
" return wrapper"
"root = logging.getLogger()\n",
"root.setLevel(logging.DEBUG)\n",
"handler = logging.StreamHandler(sys.stdout)\n",
"handler.setLevel(logging.DEBUG)\n",
"handler.addFilter(logging.Filter(\"trulens\"))\n",
"formatter = logging.Formatter(\n",
" \"%(asctime)s - %(name)s - %(levelname)s - %(message)s\"\n",
")\n",
"handler.setFormatter(formatter)\n",
"root.addHandler(handler)"
]
},
{
Expand All @@ -63,22 +52,46 @@
"metadata": {},
"outputs": [],
"source": [
"from examples.dev.dummy_app.dummy import Dummy\n",
"\n",
"from trulens.experimental.otel_tracing.core.instrument import instrument\n",
"\n",
"class TestApp(Dummy):\n",
" def __init__(self):\n",
" super().__init__()\n",
"\n",
" @decorator\n",
" @instrument\n",
"class TestApp:\n",
" @instrument()\n",
" def respond_to_query(self, query: str) -> str:\n",
" return f\"answer: {self.nested(query)}\"\n",
"\n",
" @decorator\n",
" @instrument\n",
" @instrument(attributes={\"nested_attr1\": \"value1\"})\n",
" def nested(self, query: str) -> str:\n",
" return f\"nested: {query}\""
" return f\"nested: {self.nested2(query)}\"\n",
"\n",
" @instrument(\n",
" attributes=lambda ret, exception, *args, **kwargs: {\n",
" \"nested2_ret\": ret,\n",
" \"nested2_args[0]\": args[0],\n",
" }\n",
" )\n",
" def nested2(self, query: str) -> str:\n",
" nested_result = \"\"\n",
"\n",
" try:\n",
" nested_result = self.nested3(query)\n",
" except Exception:\n",
" pass\n",
"\n",
" return f\"nested2: {nested_result}\"\n",
"\n",
" @instrument(\n",
" attributes=lambda ret, exception, *args, **kwargs: {\n",
" \"nested3_ex\": exception.args if exception else None,\n",
" \"nested3_ret\": ret,\n",
" \"selector_name\": \"special\",\n",
" \"cows\": \"moo\",\n",
" }\n",
" )\n",
" def nested3(self, query: str) -> str:\n",
" if query == \"throw\":\n",
" raise ValueError(\"nested3 exception\")\n",
" return \"nested3\""
]
},
{
Expand All @@ -87,11 +100,16 @@
"metadata": {},
"outputs": [],
"source": [
"import dotenv\n",
"from trulens.core.session import TruSession\n",
"from trulens.experimental.otel_tracing.core.init import init\n",
"\n",
"dotenv.load_dotenv()\n",
"\n",
"session = TruSession()\n",
"init(session)"
"session.experimental_enable_feature(\"otel_tracing\")\n",
"session.reset_database()\n",
"init(session, debug=True)"
]
},
{
Expand All @@ -100,9 +118,16 @@
"metadata": {},
"outputs": [],
"source": [
"from trulens.apps.custom import TruCustomApp\n",
"\n",
"test_app = TestApp()\n",
"custom_app = TruCustomApp(test_app)\n",
"\n",
"with custom_app as recording:\n",
" test_app.respond_to_query(\"test\")\n",
"\n",
"test_app.respond_to_query(\"test\")"
"with custom_app as recording:\n",
" test_app.respond_to_query(\"throw\")"
]
}
],
Expand Down
1 change: 1 addition & 0 deletions src/core/trulens/core/database/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ def get_datasets(self) -> pd.DataFrame:

def insert_event(self, event: Event) -> types_schema.EventID:
"""See [DB.insert_event][trulens.core.database.base.DB.insert_event]."""

with self.session.begin() as session:
_event = self.orm.Event.parse(event, redact_keys=self.redact_keys)
session.add(_event)
Expand Down
139 changes: 139 additions & 0 deletions src/core/trulens/experimental/otel_tracing/core/instrument.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from functools import wraps
import logging
from typing import Any, Callable, Dict, Optional, Union

from opentelemetry import trace
from trulens.experimental.otel_tracing.core.init import TRULENS_SERVICE_NAME
from trulens.otel.semconv.trace import SpanAttributes

logger = logging.getLogger(__name__)


def instrument(
*,
attributes: Optional[
Union[
Dict[str, Any],
Callable[
[Optional[Any], Optional[Exception], Any, Any], Dict[str, Any]
],
]
] = {},
):
"""
Decorator for marking functions to be instrumented in custom classes that are
wrapped by TruCustomApp, with OpenTelemetry tracing.
"""

def _validate_selector_name(attributes: Dict[str, Any]) -> Dict[str, Any]:
result = attributes.copy()

if (
SpanAttributes.SELECTOR_NAME_KEY in result
and SpanAttributes.SELECTOR_NAME in result
):
raise ValueError(
f"Both {SpanAttributes.SELECTOR_NAME_KEY} and {SpanAttributes.SELECTOR_NAME} cannot be set."
)

if SpanAttributes.SELECTOR_NAME in result:
# Transfer the trulens namespaced to the non-trulens namespaced key.
result[SpanAttributes.SELECTOR_NAME_KEY] = result[
SpanAttributes.SELECTOR_NAME
]
del result[SpanAttributes.SELECTOR_NAME]

if SpanAttributes.SELECTOR_NAME_KEY in result:
selector_name = result[SpanAttributes.SELECTOR_NAME_KEY]
if not isinstance(selector_name, str):
raise ValueError(
f"Selector name must be a string, not {type(selector_name)}"
)

return result

def _validate_attributes(attributes: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(attributes, dict) or any([
not isinstance(key, str) for key in attributes.keys()
]):
raise ValueError(
"Attributes must be a dictionary with string keys."
)
return _validate_selector_name(attributes)
# TODO: validate OTEL attributes.
# TODO: validate span type attributes.

def inner_decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
with (
trace.get_tracer_provider()
.get_tracer(TRULENS_SERVICE_NAME)
.start_as_current_span(
name=func.__name__,
)
) as span:
ret = None
func_exception: Optional[Exception] = None
attributes_exception: Optional[Exception] = None

try:
ret = func(*args, **kwargs)
except Exception as e:
# We want to get into the next clause to allow the users to still add attributes.
# It's on the user to deal with None as a return value.
func_exception = e

try:
attributes_to_add = {}

# Set the user provider attributes.
if attributes:
if callable(attributes):
attributes_to_add = attributes(
ret, func_exception, *args, **kwargs
)
else:
attributes_to_add = attributes

logger.info(f"Attributes to add: {attributes_to_add}")

final_attributes = _validate_attributes(attributes_to_add)

prefix = "trulens."
if (
SpanAttributes.SPAN_TYPE in final_attributes
and final_attributes[SpanAttributes.SPAN_TYPE]
!= SpanAttributes.SpanType.UNKNOWN
):
prefix += (
final_attributes[SpanAttributes.SPAN_TYPE] + "."
)

for key, value in final_attributes.items():
span.set_attribute(prefix + key, value)

if (
key != SpanAttributes.SELECTOR_NAME_KEY
and SpanAttributes.SELECTOR_NAME_KEY
in final_attributes
):
span.set_attribute(
f"trulens.{final_attributes[SpanAttributes.SELECTOR_NAME_KEY]}.{key}",
value,
)

except Exception as e:
attributes_exception = e
logger.error(f"Error setting attributes: {e}")

if func_exception:
raise func_exception
if attributes_exception:
raise attributes_exception

return ret

return wrapper

return inner_decorator
22 changes: 21 additions & 1 deletion src/otel/semconv/trulens/otel/semconv/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,27 @@ class SpanAttributes:
In some cases below, we also include span name or span name prefix.
"""

SPAN_TYPES = "trulens.span_types"
BASE = "trulens."
"""
Base prefix for the other keys.
"""

SPAN_TYPE = BASE + "span_type"
"""
Span type attribute.
"""

SELECTOR_NAME_KEY = "selector_name"
"""
Key for the user-defined selector name for the current span.
Here to help us check both trulens.selector_name and selector_name
to verify the user attributes and make corrections if necessary.
"""

SELECTOR_NAME = BASE + SELECTOR_NAME_KEY
"""
User-defined selector name for the current span.
"""

class SpanType(str, Enum):
"""Span type attribute values.
Expand Down
17 changes: 14 additions & 3 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import unittest
from unittest import TestCase

import pandas as pd
import pydantic
from pydantic import BaseModel
from trulens.core._utils.pycompat import ReferenceType
Expand Down Expand Up @@ -225,21 +226,25 @@ class WithJSONTestCase(TestCase):
"""TestCase mixin class that adds JSON comparisons and golden expectation
handling."""

def load_golden(self, golden_path: Union[str, Path]) -> serial_utils.JSON:
def load_golden(
self,
golden_path: Union[str, Path],
) -> Union[serial_utils.JSON, pd.DataFrame]:
"""Load the golden file `path` and return its contents.
Args:
golden_path: The name of the golden file to load. The file must
have an extension of either `.json` or `.yaml`. The extension
determines the input format.
"""
golden_path = Path(golden_path)

if ".json" in golden_path.suffixes:
loader = functools.partial(json.load)
elif ".yaml" in golden_path.suffixes or ".yml" in golden_path.suffixes:
loader = functools.partial(yaml.load, Loader=yaml.FullLoader)
elif ".csv" in golden_path.suffixes:
loader = functools.partial(pd.read_csv, index_col=0)
else:
raise ValueError(f"Unknown file extension {golden_path}.")

Expand All @@ -250,7 +255,9 @@ def load_golden(self, golden_path: Union[str, Path]) -> serial_utils.JSON:
return loader(f)

def write_golden(
self, golden_path: Union[str, Path], data: serial_utils.JSON
self,
golden_path: Union[str, Path],
data: Union[serial_utils.JSON, pd.DataFrame],
) -> None:
"""If writing golden file is enabled, write the golden file `path` with
`data` and raise exception indicating so.
Expand All @@ -272,6 +279,10 @@ def write_golden(
writer = functools.partial(json.dump, indent=2, sort_keys=True)
elif golden_path.suffix == ".yaml":
writer = functools.partial(yaml.dump, sort_keys=True)
elif golden_path.suffix == ".csv":
writer = lambda data, f: data.to_csv(f)
elif golden_path.suffix == ".parquet":
writer = lambda data, f: data.to_parquet(f)
else:
raise ValueError(f"Unknown file extension {golden_path.suffix}.")

Expand Down
Loading

0 comments on commit 706f65b

Please sign in to comment.