Skip to content

Commit

Permalink
Introduce Event table for ORM to prepare for OTEL traces (#1692)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-gtokernliang authored Dec 21, 2024
1 parent c6b7f05 commit 226446d
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 28 deletions.
46 changes: 23 additions & 23 deletions docs/contributing/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ schema.
If upgrading DB, You must do this step!!

1. Make desired changes to SQLAlchemy orm models in `src/core/trulens/core/database/orm.py`.
1. Get a database with the new changes:
1. `rm default.sqlite`
1. Run `TruSession()` to create a fresh database that uses the new ORM.
1. Run automatic alembic revision script generator. This will generate a new python script in `src/core/trulens/core/database/migrations`.
1. `cd src/core/trulens/core/database/migrations`
1. `SQLALCHEMY_URL="sqlite:///../../../../../../default.sqlite" alembic revision --autogenerate -m "<short_description>" --rev-id "<next_integer_version>"`
1. Check over the automatically generated script in `src/core/trulens/core/database/migration/versions` to make sure it looks correct.
1. Get a database with the new changes:
1. `rm default.sqlite`
1. Run `TruSession()` to create a fresh database that uses the new ORM.
1. Add the version to `src/core/trulens/core/database/migrations/data.py` in the variable `sql_alchemy_migration_versions`
1. Make any `sqlalchemy_upgrade_paths` updates in `src/core/trulens/core/database/migrations/data.py` if a backfill is necessary.

Expand All @@ -30,32 +30,32 @@ Note: Some of these instructions may be outdated and are in progress if being up
github; which will invalidate it upon commit)
1. cd `tests/docs_notebooks/notebooks_to_test`
1. remove any local dbs
* `rm -rf default.sqlite`
- `rm -rf default.sqlite`
1. run below notebooks (Making sure you also run with the most recent code in
trulens) TODO: Move these to a script
* all_tools.ipynb # `cp ../../../generated_files/all_tools.ipynb ./`
* llama_index_quickstart.ipynb # `cp
../../../examples/quickstart/llama_index_quickstart.ipynb ./`
* langchain-retrieval-augmentation-with-trulens.ipynb # `cp
../../../examples/vector-dbs/pinecone/langchain-retrieval-augmentation-with-trulens.ipynb
./`
* Add any other notebooks you think may have possible breaking changes
- all_tools.ipynb # `cp ../../../generated_files/all_tools.ipynb ./`
- llama_index_quickstart.ipynb # `cp
../../../examples/quickstart/llama_index_quickstart.ipynb ./`
- langchain-retrieval-augmentation-with-trulens.ipynb # `cp
../../../examples/vector-dbs/pinecone/langchain-retrieval-augmentation-with-trulens.ipynb
./`
- Add any other notebooks you think may have possible breaking changes
1. replace the last compatible db with this new db file
* Use the version you chose for --rev-id
* `mkdir release_dbs/sql_alchemy_<NEW_VERSION>/`
* `cp default.sqlite
release_dbs/sql_alchemy_<NEW_VERSION>/`
- Use the version you chose for --rev-id
- `mkdir release_dbs/sql_alchemy_<NEW_VERSION>/`
- `cp default.sqlite
release_dbs/sql_alchemy_<NEW_VERSION>/`
1. `git add release_dbs`

## Testing the DB

Run the tests with the requisite env vars.

```bash
HUGGINGFACE_API_KEY="<to_fill_out>" \
OPENAI_API_KEY="<to_fill_out>" \
PINECONE_API_KEY="<to_fill_out>" \
PINECONE_ENV="<to_fill_out>" \
HUGGINGFACEHUB_API_TOKEN="<to_fill_out>" \
python -m pytest tests/docs_notebooks -k backwards_compat
```
```bash
HUGGINGFACE_API_KEY="<to_fill_out>" \
OPENAI_API_KEY="<to_fill_out>" \
PINECONE_API_KEY="<to_fill_out>" \
PINECONE_ENV="<to_fill_out>" \
HUGGINGFACEHUB_API_TOKEN="<to_fill_out>" \
python -m pytest tests/docs_notebooks -k backwards_compat
```
153 changes: 153 additions & 0 deletions examples/experimental/otel_exporter.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# !pip install opentelemetry-api\n",
"# !pip install opentelemetry-sdk"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"import sys\n",
"\n",
"# Add base dir to path to be able to access test folder.\n",
"base_dir = Path().cwd().parent.parent.resolve()\n",
"if str(base_dir) not in sys.path:\n",
" print(f\"Adding {base_dir} to sys.path\")\n",
" sys.path.append(str(base_dir))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from opentelemetry import trace\n",
"from opentelemetry.sdk.resources import Resource\n",
"from opentelemetry.sdk.trace import TracerProvider\n",
"from opentelemetry.sdk.trace.export import BatchSpanProcessor\n",
"from opentelemetry.sdk.trace.export import ConsoleSpanExporter\n",
"\n",
"SERVICE_NAME = \"trulens\"\n",
"\n",
"\n",
"def init():\n",
" # Use Resource.create() instead of constructor directly\n",
" resource = Resource.create({\"service.name\": SERVICE_NAME})\n",
"\n",
" trace.set_tracer_provider(TracerProvider(resource=resource))\n",
" trace.get_tracer_provider().add_span_processor(\n",
" BatchSpanProcessor(ConsoleSpanExporter())\n",
" )\n",
"\n",
"\n",
"init()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import Callable\n",
"\n",
"from trulens.apps.custom import instrument\n",
"\n",
"SERVICE_NAME = \"trulens\"\n",
"\n",
"\n",
"def decorator(func: Callable):\n",
" tracer = trace.get_tracer(SERVICE_NAME)\n",
"\n",
" def wrapper(*args, **kwargs):\n",
" print(\"start wrap\")\n",
"\n",
" with tracer.start_as_current_span(\"custom\"):\n",
" result = func(*args, **kwargs)\n",
" span = trace.get_current_span()\n",
" print(\"---span---\")\n",
" print(span.get_span_context())\n",
" span.set_attribute(\"result\", result)\n",
" span.set_status(trace.Status(trace.StatusCode.OK))\n",
" return result\n",
"\n",
" return wrapper"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from examples.dev.dummy_app.dummy import Dummy\n",
"\n",
"\n",
"class TestApp(Dummy):\n",
" def __init__(self):\n",
" super().__init__()\n",
"\n",
" @decorator\n",
" @instrument\n",
" def respond_to_query(self, query: str) -> str:\n",
" return f\"answer: {self.nested(query)}\"\n",
"\n",
" @decorator\n",
" @instrument\n",
" def nested(self, query: str) -> str:\n",
" return f\"nested: {query}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from trulens.core.session import TruSession\n",
"\n",
"SERVICE_NAME = \"trulens\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"session = TruSession()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "trulens",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
2 changes: 1 addition & 1 deletion src/core/trulens/core/database/migrations/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from trulens.core.schema import record as record_schema
from trulens.core.utils import pyschema as pyschema_utils

sql_alchemy_migration_versions: List[int] = [1, 2, 3]
sql_alchemy_migration_versions: List[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
"""DB versions."""

sqlalchemy_upgrade_paths: Dict[int, Tuple[int, Callable[[DB]]]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""create event table
Revision ID: 10
Revises: 9
Create Date: 2024-12-11 09:32:48.976169
"""

from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = "10"
down_revision = "9"
branch_labels = None
depends_on = None


def upgrade(config) -> None:
prefix = config.get_main_option("trulens.table_prefix")

if prefix is None:
raise RuntimeError("trulens.table_prefix is not set")

op.create_table(
prefix + "events",
sa.Column("event_id", sa.VARCHAR(length=256), nullable=False),
sa.Column("record", sa.JSON(), nullable=False),
sa.Column("record_attributes", sa.JSON(), nullable=False),
sa.Column("record_type", sa.VARCHAR(length=256), nullable=False),
sa.Column("resource_attributes", sa.JSON(), nullable=False),
sa.Column("start_timestamp", sa.TIMESTAMP(), nullable=False),
sa.Column("timestamp", sa.TIMESTAMP(), nullable=False),
sa.Column("trace", sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint("event_id"),
)


def downgrade(config) -> None:
prefix = config.get_main_option("trulens.table_prefix")

if prefix is None:
raise RuntimeError("trulens.table_prefix is not set")

op.drop_table(prefix + "events")
80 changes: 80 additions & 0 deletions src/core/trulens/core/database/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
from sqlite3 import Connection as SQLite3Connection
from typing import ClassVar, Dict, Generic, Type, TypeVar

from sqlalchemy import JSON
from sqlalchemy import TIMESTAMP
from sqlalchemy import VARCHAR
from sqlalchemy import Column
from sqlalchemy import Engine
from sqlalchemy import Enum
from sqlalchemy import Float
from sqlalchemy import ForeignKey
from sqlalchemy import Text
Expand All @@ -22,6 +25,7 @@
from trulens.core.database import base as core_db
from trulens.core.schema import app as app_schema
from trulens.core.schema import dataset as dataset_schema
from trulens.core.schema import event as event_schema
from trulens.core.schema import feedback as feedback_schema
from trulens.core.schema import groundtruth as groundtruth_schema
from trulens.core.schema import record as record_schema
Expand Down Expand Up @@ -116,6 +120,7 @@ class ORM(abc.ABC, Generic[T]):
FeedbackResult: Type[T]
GroundTruth: Type[T]
Dataset: Type[T]
Event: Type[T]


def new_orm(base: Type[T], prefix: str = "trulens_") -> Type[ORM[T]]:
Expand Down Expand Up @@ -401,6 +406,81 @@ def parse(
dataset_json=obj.model_dump_json(redact_keys=redact_keys),
)

class Event(base):
"""
ORM class for OTEL traces/spans. This should be kept in line with the event table
https://docs.snowflake.com/en/developer-guide/logging-tracing/event-table-columns#data-for-trace-events
https://docs.snowflake.com/en/developer-guide/logging-tracing/event-table-columns#label-event-table-record-column-span
"""

_table_base_name = "events"

record = Column(JSON, nullable=False)
"""
For a span, this is an object that includes:
- name: the function/procedure that emitted the data
- kind: SPAN_KIND_TRULENS
- parent_span_id: the unique identifier for the parent span
- status: STATUS_CODE_ERROR when the span corresponds to an unhandled exception. Otherwise, STATUS_CODE_UNSET.
"""

event_id = Column(TYPE_ID, nullable=False, primary_key=True)
"""
Used as primary key for the schema. Not technically present in the event table schema,
but having it here makes it easier to work with the ORM.
"""

record_attributes = Column(JSON, nullable=False)
"""
Attributes of the record that can either come from the user, or based on the TruLens semantic conventions.
"""

record_type = Column(
Enum(event_schema.EventRecordType), nullable=False
)
"""
Specifies the kind of record specified by this row. This will always be "SPAN" for TruLens.
"""

resource_attributes = Column(JSON, nullable=False)
"""
Reserved.
"""

start_timestamp = Column(TIMESTAMP, nullable=False)
"""
The timestamp when the span started. This is a UNIX timestamp in milliseconds.
Note: The Snowflake event table uses the TIMESTAMP_NTZ data type for this column.
"""

timestamp = Column(TIMESTAMP, nullable=False)
"""
The timestamp when the span concluded. This is a UNIX timestamp in milliseconds.
Note: The Snowflake event table uses the TIMESTAMP_NTZ data type for this column.
"""

trace = Column(JSON, nullable=False)
"""
Contains the span context, including the trace_id, parent_id, and span_id for the span.
"""

@classmethod
def parse(
cls,
obj: event_schema.Event,
redact_keys: bool = False,
) -> ORM.EventTable:
return cls(
event_id=obj.event_id,
record=obj.record,
record_attributes=obj.record_attributes,
record_type=obj.record_type,
resource_attributes=obj.resource_attributes,
start_timestamp=obj.start_timestamp,
timestamp=obj.timestamp,
trace=obj.trace,
)

configure_mappers() # IMPORTANT
# Without the above, orm class attributes which are defined using backref
# will not be visible, i.e. orm.AppDefinition.records.
Expand Down
Loading

0 comments on commit 226446d

Please sign in to comment.