From e6876fd2add6f9b6e3e4b142de45a96e68dc6dc8 Mon Sep 17 00:00:00 2001 From: Marc Klingen Date: Mon, 4 Mar 2024 01:04:12 +0100 Subject: [PATCH] docs: add docstrings to core client and pdoc for sdk reference generation (#370) --------- Co-authored-by: ChrisTho23 <110739558+ChrisTho23@users.noreply.github.com> --- .github/workflows/ci.yml | 2 + .gitignore | 9 +- .vscode/extensions.json | 6 + README.md | 16 + ci.ruff.toml | 4 + langfuse/client.py | 964 +++++++++++++++++++++++++++++++++++++-- poetry.lock | 91 +++- pyproject.toml | 6 +- ruff.toml | 15 + 9 files changed, 1068 insertions(+), 45 deletions(-) create mode 100644 .vscode/extensions.json create mode 100644 ci.ruff.toml create mode 100644 ruff.toml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3b3ed2405..7005f5fa3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,8 @@ jobs: steps: - uses: actions/checkout@v3 - uses: chartboost/ruff-action@v1 + with: + args: --config ci.ruff.toml ci: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 7205ba8f4..a065025d5 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ dist/ *.egg-info/ .pytest_cache/ +playground + # pyenv .python-version @@ -24,5 +26,8 @@ dmypy.json .DS_Store -playground -tests/mocks/llama-index-storage \ No newline at end of file +# Docs +docs + +# Llamaindex +tests/mocks/llama-index-storage diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 000000000..bcb67206b --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,6 @@ +{ + "recommendations": [ + "njpwerner.autodocstring", + "charliermarsh.ruff" + ] +} diff --git a/README.md b/README.md index 88050c6fc..a7c8b196a 100644 --- a/README.md +++ b/README.md @@ -79,3 +79,19 @@ poetry run pre-commit install - Create PyPi API token: https://pypi.org/manage/account/token/ - Setup: `poetry config pypi-token.pypi your-api-token` 9. Create a release on GitHub with the changelog + +### SDK Reference + +Note: The generated SDK reference is currently work in progress. + +The SDK reference is generated via pdoc. To update the reference, run the following command: + +```sh +poetry run pdoc -o docs/ --docformat google langfuse +``` + +You need to have all extra dependencies installed to generate the reference. + +```sh +poetry install --all-extras +``` diff --git a/ci.ruff.toml b/ci.ruff.toml new file mode 100644 index 000000000..fe975a8f1 --- /dev/null +++ b/ci.ruff.toml @@ -0,0 +1,4 @@ +# This is the Ruff config used in CI. +# In development, ruff.toml is used instead. + +target-version = 'py38' \ No newline at end of file diff --git a/langfuse/client.py b/langfuse/client.py index 37b0f3246..8346ea475 100644 --- a/langfuse/client.py +++ b/langfuse/client.py @@ -53,7 +53,37 @@ class Langfuse(object): + """Langfuse Python client. + + Attributes: + log (logging.Logger): Logger for the Langfuse client. + base_url (str): Base URL of the Langfuse API, serving as the root address for API endpoint construction. + httpx_client (httpx.Client): HTTPX client utilized for executing requests to the Langfuse API. + client (FernLangfuse): Core interface for Langfuse API interaction. + task_manager (TaskManager): Task Manager dedicated to handling asynchronous tasks. + release (str): Identifies the release number or hash of the application. + prompt_cache (PromptCache): A cache for efficiently storing and retrieving PromptClient instances. + + Example: + Initiating the Langfuse client should always be first step to use Langfuse. + ```python + import os + from langfuse import Langfuse + + # Set the public and secret keys as environment variables + os.environ['LANGFUSE_PUBLIC_KEY'] = public_key + os.environ['LANGFUSE_SECRET_KEY'] = secret_key + + # Initialize the Langfuse client using the credentials + langfuse = Langfuse() + ``` + """ + log = logging.getLogger("langfuse") + """Logger for the Langfuse client.""" + + host: str + """Host of Langfuse API.""" def __init__( self, @@ -65,11 +95,44 @@ def __init__( threads: int = 1, flush_at: int = 15, flush_interval: float = 0.5, - max_retries=3, - timeout=10, # seconds + max_retries: int = 3, + timeout: int = 10, # seconds sdk_integration: str = "default", httpx_client: Optional[httpx.Client] = None, ): + """Initialize the Langfuse client. + + Args: + public_key: Public API key of Langfuse project. Can be set via `LANGFUSE_PUBLIC_KEY` environment variable. + secret_key: Secret API key of Langfuse project. Can be set via `LANGFUSE_SECRET_KEY` environment variable. + host: Host of Langfuse API. Can be set via `LANGFUSE_HOST` environment variable. Defaults to `https://cloud.langfuse.com`. + release: Release number/hash of the application to provide analytics grouped by release. Can be set via `LANGFUSE_RELEASE` environment variable. + debug: Enables debug mode for more verbose logging. Can be set via `LANGFUSE_DEBUG` environment variable. + threads: Number of consumer threads to execute network requests. Helps scaling the SDK for high load. Only increase this if you run into scaling issues. + flush_at: Max batch size that's sent to the API. + flush_interval: Max delay until a new batch is sent to the API. + max_retries: Max number of retries in case of API/network errors. + timeout: Timeout of API requests in seconds. + httpx_client: Pass your own httpx client for more customizability of requests. + sdk_integration: Used by intgerations that wrap the Langfuse SDK to add context for debugging and support. Not to be used directly. + + Raises: + ValueError: If public_key or secret_key are not set and not found in environment variables. + + Example: + Initiating the Langfuse client should always be first step to use Langfuse. + ```python + import os + from langfuse import Langfuse + + # Set the public and secret keys as environment variables + os.environ['LANGFUSE_PUBLIC_KEY'] = public_key + os.environ['LANGFUSE_SECRET_KEY'] = secret_key + + # Initialize the Langfuse client using the credentials + langfuse = Langfuse() + ``` + """ set_debug = debug if debug else (os.getenv("LANGFUSE_DEBUG", "False") == "True") if set_debug is True: @@ -95,7 +158,7 @@ def __init__( if not public_key: self.log.warning("public_key is not set.") raise ValueError( - "public_key is required, set as parameter or environment variable 'LANGFUSE_PUBLIC_KEY'" + "public_key is required, set as a parameter or environment variable 'LANGFUSE_PUBLIC_KEY'" ) if not secret_key: @@ -141,11 +204,11 @@ def __init__( self.trace_id = None - self.release = self.get_release_value(release) + self.release = self._get_release_value(release) self.prompt_cache = PromptCache() - def get_release_value(self, release: Optional[str] = None) -> Optional[str]: + def _get_release_value(self, release: Optional[str] = None) -> Optional[str]: if release: return release elif "LANGFUSE_RELEASE" in os.environ: @@ -153,13 +216,23 @@ def get_release_value(self, release: Optional[str] = None) -> Optional[str]: else: return get_common_release_envs() - def get_trace_id(self): + def get_trace_id(self) -> str: + """Get the current trace id.""" return self.trace_id - def get_trace_url(self): + def get_trace_url(self) -> str: + """Get the URL of the current trace to view it in the Langfuse UI.""" return f"{self.base_url}/trace/{self.trace_id}" def get_dataset(self, name: str) -> "DatasetClient": + """Fetch a dataset by its name. + + Args: + name (str): The name of the dataset to fetch. + + Returns: + DatasetClient: The dataset with the given name. + """ try: self.log.debug(f"Getting datasets {name}") dataset = self.client.datasets.get(dataset_name=name) @@ -172,6 +245,7 @@ def get_dataset(self, name: str) -> "DatasetClient": raise e def get_dataset_item(self, id: str) -> "DatasetItemClient": + """Get the dataset item with the given id.""" try: self.log.debug(f"Getting dataset item {id}") dataset_item = self.client.dataset_items.get(id=id) @@ -181,6 +255,14 @@ def get_dataset_item(self, id: str) -> "DatasetItemClient": raise e def auth_check(self) -> bool: + """Check if the provided credentials (public and secret key) are valid. + + Raises: + Exception: If no projects were found for the provided credentials. + + Note: + This method is blocking. It is discouraged to use it in production code. + """ try: projects = self.client.projects.get() self.log.debug( @@ -201,6 +283,15 @@ def get_dataset_run( dataset_name: str, dataset_run_name: str, ) -> DatasetRun: + """Get a dataset run. + + Args: + dataset_name: Name of the dataset. + dataset_run_name: Name of the dataset run. + + Returns: + DatasetRun: The dataset run. + """ try: self.log.debug( f"Getting dataset runs for dataset {dataset_name} and run {dataset_run_name}" @@ -213,6 +304,14 @@ def get_dataset_run( raise e def create_dataset(self, name: str) -> Dataset: + """Create a dataset with the given name on Langfuse. + + Args: + name: Name of the dataset to create. + + Returns: + Dataset: The created dataset as returned by the Langfuse API. + """ try: body = CreateDatasetRequest(name=name) self.log.debug(f"Creating datasets {body}") @@ -228,8 +327,32 @@ def create_dataset_item( expected_output: Optional[Any] = None, id: Optional[str] = None, ) -> DatasetItem: - """ - Creates a dataset item. Upserts if an item with id already exists. + """Create a dataset item. + + Upserts if an item with id already exists. + + Args: + dataset_name: Name of the dataset in which the dataset item should be created. + input: Input data. Can contain any dict, list or scalar. + expected_output: Expected output data. Defaults to None. Can contain any dict, list or scalar. + id: Id of the dataset item. Defaults to None. + + Returns: + DatasetItem: The created dataset item as returned by the Langfuse API. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Uploading items to the Langfuse dataset named "capital_cities" + langfuse.create_dataset_item( + dataset_name="capital_cities", + input={"input": {"country": "Italy"}}, + expected_output={"expected_output": "Rome"} + ) + ``` """ try: body = CreateDatasetItemRequest( @@ -248,6 +371,17 @@ def get_trace( self, id: str, ) -> TraceWithFullDetails: + """Get a trace via the Langfuse API by its id. + + Args: + id: The id of the trace to fetch. + + Returns: + TraceWithFullDetails: The trace with full details as returned by the Langfuse API. + + Raises: + Exception: If the trace with the given id could not be found within the authenticated project or if an error occurred during the request. + """ try: self.log.debug(f"Getting trace {id}") return self.client.trace.get(id) @@ -266,6 +400,23 @@ def get_observations( parent_observation_id: typing.Optional[str] = None, type: typing.Optional[str] = None, ) -> ObservationsViews: + """Get a list of observations in the current project matching the given parameters. + + Args: + page (Optional[int]): Page number of the observations to return. Defaults to None. + limit (Optional[int]): Maximum number of observations to return. Defaults to None. + name (Optional[str]): Name of the observations to return. Defaults to None. + user_id (Optional[str]): User identifier. Defaults to None. + trace_id (Optional[str]): Trace identifier. Defaults to None. + parent_observation_id (Optional[str]): Parent observation identifier. Defaults to None. + type (Optional[str]): Type of the observation. Defaults to None. + + Returns: + List of ObservationsViews: List of observations in the project matching the given parameters. + + Raises: + Exception: If an error occurred during the request. + """ try: self.log.debug( f"Getting observations... {page}, {limit}, {name}, {user_id}, {trace_id}, {parent_observation_id}, {type}" @@ -293,6 +444,22 @@ def get_generations( trace_id: typing.Optional[str] = None, parent_observation_id: typing.Optional[str] = None, ) -> ObservationsViews: + """Get a list of generations in the current project matching the given parameters. + + Args: + page (Optional[int]): Page number of the generations to return. Defaults to None. + limit (Optional[int]): Maximum number of generations to return. Defaults to None. + name (Optional[str]): Name of the generations to return. Defaults to None. + user_id (Optional[str]): User identifier of the generations to return. Defaults to None. + trace_id (Optional[str]): Trace identifier of the generations to return. Defaults to None. + parent_observation_id (Optional[str]): Parent observation identifier of the generations to return. Defaults to None. + + Returns: + List of ObservationsViews: List of generations in the project matching the given parameters. + + Raises: + Exception: If an error occurred during the request. + """ return self.get_observations( page=page, limit=limit, @@ -307,6 +474,14 @@ def get_observation( self, id: str, ) -> Observation: + """Get an observation in the current project with the given identifier. + + Args: + id: The identifier of the observation to fetch. + + Raises: + Exception: If the observation with the given id could not be found within the authenticated project or if an error occurred during the request. + """ try: self.log.debug(f"Getting observation {id}") return self.client.observations.get(id) @@ -321,28 +496,26 @@ def get_prompt( *, cache_ttl_seconds: Optional[int] = None, ) -> PromptClient: - """ - Retrieves a prompt by its name and optionally its version, with support for additional options. + """Get a prompt. This method attempts to fetch the requested prompt from the local cache. If the prompt is not found in the cache or if the cached prompt has expired, it will try to fetch the prompt from the server again and update the cache. If fetching the new prompt fails, and there is an expired prompt in the cache, it will return the expired prompt as a fallback. - Parameters: - - name (str): The name of the prompt to retrieve. - - version (Optional[int]): The version of the prompt. If not specified, the latest version is assumed. - - cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a - keyword argument. If 'cache_ttl_seconds' is not specified, a default TTL of 60 seconds is used. + Args: + name (str): The name of the prompt to retrieve. + version (Optional[int]): The version of the prompt. If not specified, the `active` version is returned. + cache_ttl_seconds: Optional[int]: Time-to-live in seconds for caching the prompt. Must be specified as a + keyword argument. If not set, defaults to 60 seconds. Returns: - - PromptClient: The prompt object retrieved from the cache or directly fetched if not cached or expired. + PromptClient: The prompt object retrieved from the cache or directly fetched if not cached or expired. Raises: - - Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an - expired prompt in the cache, in which case it logs a warning and returns the expired prompt. + Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an + expired prompt in the cache, in which case it logs a warning and returns the expired prompt. """ - self.log.debug(f"Getting prompt {name}, version {version or 'latest'}") if not name: @@ -397,6 +570,17 @@ def _fetch_prompt_and_update_cache( def create_prompt( self, *, name: str, prompt: str, is_active: bool, config: Optional[Any] = None ) -> PromptClient: + """Create a new prompt in Langfuse. + + Args: + name : The name of the prompt to be created. + prompt : The content of the prompt to be created. + is_active : A flag indicating whether the prompt is active or not. + config: Additional structured data to be saved with the prompt. Defaults to None. + + Returns: + PromptClient: The prompt. + """ try: self.log.debug(f"Creating prompt {name}, version {version}") @@ -431,6 +615,38 @@ def trace( public: typing.Optional[bool] = None, **kwargs, ) -> "StatefulTraceClient": + """Create a trace. + + Args: + id: The id of the trace can be set, defaults to a random id. Set it to link traces to external systems or when creating a distributed trace. Traces are upserted on id. + name: Identifier of the trace. Useful for sorting/filtering in the UI. + input: The input of the trace. Can be any JSON object. + output: The output of the trace. Can be any JSON object. + metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. + user_id: The id of the user that triggered the execution. Used to provide user-level analytics. + session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. + version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. + release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. + tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. + timestamp: The timestamp of the trace. Defaults to the current time if not provided. + public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. + **kwargs: Additional keyword arguments that can be included in the trace. + + Returns: + StatefulTraceClient: The created trace. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + trace = langfuse.trace( + name="example-application", + user_id="user-1234") + ) + ``` + """ new_id = id or str(uuid.uuid4()) self.trace_id = new_id try: @@ -483,6 +699,41 @@ def score( observation_id: typing.Optional[str] = None, **kwargs, ) -> "StatefulClient": + """Create a score attached to a trace (and optionally an observation). + + Args: + name (str): Identifier of the score. + value (float): The value of the score. Can be any number, often standardized to 0..1 + trace_id (str): The id of the trace to which the score should be attached. + comment (Optional[str]): Additional context/explanation of the score. + observation_id (Optional[str]): The id of the observation to which the score should be attached. + id (Optional[str]): The id of the score. If not provided, a new UUID is generated. + **kwargs: Additional keyword arguments to include in the score. + + Returns: + StatefulClient: Either the associated observation (if observation_id is provided) or the trace (if observation_id is not provided). + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name="example-application") + + # Get id of created trace + trace_id = trace.id + + # Add score to the trace + trace = langfuse.score( + trace_id=trace_id, + name="user-explicit-feedback", + value=1, + comment="I like how personalized the response is" + ) + ``` + """ trace_id = trace_id or self.trace_id or str(uuid.uuid4()) new_id = id or str(uuid.uuid4()) try: @@ -527,18 +778,58 @@ def span( *, id: typing.Optional[str] = None, trace_id: typing.Optional[str] = None, + parent_observation_id: typing.Optional[str] = None, name: typing.Optional[str] = None, start_time: typing.Optional[dt.datetime] = None, end_time: typing.Optional[dt.datetime] = None, metadata: typing.Optional[typing.Any] = None, - input: typing.Optional[typing.Any] = None, - output: typing.Optional[typing.Any] = None, level: typing.Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]] = None, status_message: typing.Optional[str] = None, - parent_observation_id: typing.Optional[str] = None, + input: typing.Optional[typing.Any] = None, + output: typing.Optional[typing.Any] = None, version: typing.Optional[str] = None, **kwargs, ) -> "StatefulSpanClient": + """Create a span. + + A span represents durations of units of work in a trace. + Usually, you want to add a span nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. + + If no trace_id is provided, a new trace is created just for this span. + + Args: + id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id. + trace_id (Optional[str]): The trace ID associated with this span. If not provided, a new UUID is generated. + parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. + name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. + start_time (Optional[datetime]): The time at which the span started, defaults to the current time. + end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. + metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. + input (Optional[dict]): The input to the span. Can be any JSON object. + output (Optional[dict]): The output to the span. Can be any JSON object. + version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. + **kwargs: Additional keyword arguments to include in the span. + + Returns: + StatefulSpanClient: The created span. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + trace = langfuse.trace(name = "llm-feature") + + # Create a span + retrieval = langfuse.span(name = "retrieval", trace_id = trace.id) + + # Create a nested span + nested_span = langfuse.span(name = "retrieval", trace_id = trace.id, parent_observation_id = retrieval.id) + ``` + """ new_span_id = id or str(uuid.uuid4()) new_trace_id = trace_id or str(uuid.uuid4()) self.trace_id = new_trace_id @@ -592,6 +883,7 @@ def event( *, id: typing.Optional[str] = None, trace_id: typing.Optional[str] = None, + parent_observation_id: typing.Optional[str] = None, name: typing.Optional[str] = None, start_time: typing.Optional[dt.datetime] = None, metadata: typing.Optional[typing.Any] = None, @@ -599,10 +891,45 @@ def event( output: typing.Optional[typing.Any] = None, level: typing.Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]] = None, status_message: typing.Optional[str] = None, - parent_observation_id: typing.Optional[str] = None, version: typing.Optional[str] = None, **kwargs, ) -> "StatefulSpanClient": + """Create an event. + + An event represents a discrete event in a trace. + Usually, you want to add a event nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. + + If no trace_id is provided, a new trace is created just for this event. + + Args: + id (Optional[str]): The id of the event can be set, otherwise a random id is generated. + trace_id (Optional[str]): The trace ID associated with this event. If not provided, a new trace is created just for this event. + parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. + name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI. + start_time (Optional[datetime]): The time at which the event started, defaults to the current time. + metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API. + input (Optional[Any]): The input to the event. Can be any JSON object. + output (Optional[Any]): The output to the event. Can be any JSON object. + level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event. + version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging. + **kwargs: Additional keyword arguments to include in the event. + + Returns: + StatefulSpanClient: The created event. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + trace = langfuse.trace(name = "llm-feature") + + # Create an event + retrieval = langfuse.event(name = "retrieval", trace_id = trace.id) + ``` + """ event_id = id or str(uuid.uuid4()) new_trace_id = trace_id or str(uuid.uuid4()) self.trace_id = new_trace_id @@ -653,15 +980,15 @@ def generation( *, id: typing.Optional[str] = None, trace_id: typing.Optional[str] = None, + parent_observation_id: typing.Optional[str] = None, name: typing.Optional[str] = None, start_time: typing.Optional[dt.datetime] = None, end_time: typing.Optional[dt.datetime] = None, + completion_start_time: typing.Optional[dt.datetime] = None, metadata: typing.Optional[typing.Any] = None, level: typing.Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]] = None, status_message: typing.Optional[str] = None, - parent_observation_id: typing.Optional[str] = None, version: typing.Optional[str] = None, - completion_start_time: typing.Optional[dt.datetime] = None, model: typing.Optional[str] = None, model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, input: typing.Optional[typing.Any] = None, @@ -670,6 +997,54 @@ def generation( prompt: typing.Optional[PromptClient] = None, **kwargs, ) -> "StatefulGenerationClient": + """Create a generation. + + A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI. + + Usually, you want to add a generation nested within a trace. Optionally you can nest it within another observation by providing a parent_observation_id. + + If no trace_id is provided, a new trace is created just for this generation. + + Args: + id (Optional[str]): The id of the generation can be set, defaults to random id. + trace_id (Optional[str]): The trace ID associated with this generation. If not provided, a new trace is created + parent_observation_id (Optional[str]): The ID of the parent observation, if applicable. + name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. + start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. + end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. + completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. + metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. + version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. + model (Optional[str]): The name of the model used for the generation. + model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. + input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. + output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. + usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. + prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. + **kwargs: Additional keyword arguments to include in the generation. + + Returns: + StatefulGenerationClient: The created generation. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a generation in Langfuse + generation = langfuse.generation( + name="summary-generation", + model="gpt-3.5-turbo", + model_parameters={"maxTokens": "1000", "temperature": "0.9"}, + input=[{"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Please generate a summary of the following documents ..."}], + metadata={"interface": "whatsapp"} + ) + ``` + """ new_trace_id = trace_id or str(uuid.uuid4()) new_generation_id = id or str(uuid.uuid4()) self.trace_id = new_trace_id @@ -756,24 +1131,44 @@ def _generate_trace(self, trace_id: str, name: str): self.log.debug(f"Creating trace {event}...") self.task_manager.add_task(event) - # On program exit, allow the consumer thread to exit cleanly. - # This prevents exceptions and a messy shutdown when the - # interpreter is destroyed before the daemon thread finishes - # execution. However, it is *not* the same as flushing the queue! - # To guarantee all messages have been delivered, you'll still need to call flush(). def join(self): + """End the consumer threads once the queue is empty and blocks execution until finished. + + On program exit, allow the consumer thread to exit cleanly. + + This prevents exceptions and a messy shutdown when the interpreter is destroyed before the daemon thread finishes execution. However, it is *not* the same as flushing the queue! + + To guarantee all messages have been delivered, you'll still need to call flush(). + """ try: return self.task_manager.join() except Exception as e: self.log.exception(e) def flush(self): + """Force a flush from the internal queue to the server. + + This method should be used when exiting the program to ensure all queued events are. It blocks until the queue is empty. The method logs the total number of items approximately flushed due to potential variations caused by threading. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Some operations with Langfuse + + # Flushing all events to end Langfuse cleanly + langfuse.flush() + ``` + """ try: return self.task_manager.flush() except Exception as e: self.log.exception(e) def shutdown(self): + """Initiate a graceful shutdown of the task manager, ensuring all tasks are flushed and processed.""" try: return self.task_manager.shutdown() except Exception as e: @@ -781,11 +1176,31 @@ def shutdown(self): class StateType(Enum): + """Enum to distinguish observation and trace states. + + Attributes: + OBSERVATION (int): Observation state. + TRACE (int): Trace state. + """ + OBSERVATION = 1 TRACE = 0 class StatefulClient(object): + """Base class for handling stateful operations in the Langfuse system. + + This client is capable of creating different nested Langfuse objects like spans, generations, scores, and events, + associating them with either an observation or a trace based on the specified state type. + + Attributes: + client (FernLangfuse): Core interface for Langfuse API interactions. + id (str): Unique identifier of the stateful client (either observation or trace). + state_type (StateType): Enum indicating whether the client is an observation or a trace. + trace_id (str): Id of the trace associated with the stateful client. + task_manager (TaskManager): Manager handling asynchronous tasks for the client. + """ + log = logging.getLogger("langfuse") def __init__( @@ -796,6 +1211,15 @@ def __init__( trace_id: str, task_manager: TaskManager, ): + """Initialize the StatefulClient. + + Args: + client (FernLangfuse): Core interface for Langfuse API interactions. + id (str): Unique identifier of the stateful client (either observation or trace). + state_type (StateType): Enum indicating whether the client is an observation or a trace. + trace_id (str): Id of the trace associated with the stateful client. + task_manager (TaskManager): Manager handling asynchronous tasks for the client. + """ self.client = client self.trace_id = trace_id self.id = id @@ -835,6 +1259,51 @@ def generation( prompt: typing.Optional[PromptClient] = None, **kwargs, ) -> "StatefulGenerationClient": + """Create a generation nested within the current observation or trace. + + A generation is a span that is used to log generations of AI models. They contain additional metadata about the model, the prompt/completion, the cost of executing the model and are specifically rendered in the langfuse UI. + + Args: + id (Optional[str]): The id of the generation can be set, defaults to random id. + name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. + start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. + end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. + completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. + metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. + version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. + model (Optional[str]): The name of the model used for the generation. + model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. + input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. + output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. + usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. + prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. + **kwargs: Additional keyword arguments to include in the generation. + + Returns: + StatefulGenerationClient: The created generation. Use this client to update the generation or create additional nested observations. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Create a nested generation in Langfuse + generation = trace.generation( + name="summary-generation", + model="gpt-3.5-turbo", + model_parameters={"maxTokens": "1000", "temperature": "0.9"}, + input=[{"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Please generate a summary of the following documents ..."}], + metadata={"interface": "whatsapp"} + ) + ``` + """ generation_id = id or str(uuid.uuid4()) try: generation_body = { @@ -896,6 +1365,39 @@ def span( version: typing.Optional[str] = None, **kwargs, ) -> "StatefulSpanClient": + """Create a span nested within the current observation or trace. + + A span represents durations of units of work in a trace. + + Args: + id (Optional[str]): The id of the span can be set, otherwise a random id is generated. Spans are upserted on id. + name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. + start_time (Optional[datetime]): The time at which the span started, defaults to the current time. + end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. + metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. + input (Optional[dict]): The input to the span. Can be any JSON object. + output (Optional[dict]): The output to the span. Can be any JSON object. + version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. + **kwargs: Additional keyword arguments to include in the span. + + Returns: + StatefulSpanClient: The created span. Use this client to update the span or create additional nested observations. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Create a span + retrieval = langfuse.span(name = "retrieval") + ``` + """ span_id = id or str(uuid.uuid4()) try: span_body = { @@ -946,6 +1448,35 @@ def score( comment: typing.Optional[str] = None, **kwargs, ) -> "StatefulClient": + """Create a score attached for the current observation or trace. + + Args: + name (str): Identifier of the score. + value (float): The value of the score. Can be any number, often standardized to 0..1 + comment (Optional[str]): Additional context/explanation of the score. + id (Optional[str]): The id of the score. If not provided, a new UUID is generated. + **kwargs: Additional keyword arguments to include in the score. + + Returns: + StatefulClient: The current observation or trace for which the score was created. Passthrough for chaining. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name="example-application") + + # Add score to the trace + trace = trace.score( + name="user-explicit-feedback", + value=1, + comment="I like how personalized the response is" + ) + ``` + """ score_id = id or str(uuid.uuid4()) try: new_score = { @@ -999,6 +1530,38 @@ def event( version: typing.Optional[str] = None, **kwargs, ) -> "StatefulClient": + """Create an event nested within the current observation or trace. + + An event represents a discrete event in a trace. + + Args: + id (Optional[str]): The id of the event can be set, otherwise a random id is generated. + name (Optional[str]): Identifier of the event. Useful for sorting/filtering in the UI. + start_time (Optional[datetime]): The time at which the event started, defaults to the current time. + metadata (Optional[Any]): Additional metadata of the event. Can be any JSON object. Metadata is merged when being updated via the API. + input (Optional[Any]): The input to the event. Can be any JSON object. + output (Optional[Any]): The output to the event. Can be any JSON object. + level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the event. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the event. Additional field for context of the event. E.g. the error message of an error event. + version (Optional[str]): The version of the event type. Used to understand how changes to the event type affect metrics. Useful in debugging. + **kwargs: Additional keyword arguments to include in the event. + + Returns: + StatefulSpanClient: The created event. Use this client to update the event or create additional nested observations. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Create an event + retrieval = trace.event(name = "retrieval") + ``` + """ event_id = id or str(uuid.uuid4()) try: event_body = { @@ -1036,10 +1599,24 @@ def event( ) def get_trace_url(self): + """Get the URL to see the current trace in the Langfuse UI.""" return f"{self.client._client_wrapper._base_url}/trace/{self.trace_id}" class StatefulGenerationClient(StatefulClient): + """Class for handling stateful operations of generations in the Langfuse system. Inherits from StatefulClient. + + This client extends the capabilities of the StatefulClient to specifically handle generation, + allowing for the creation, update, and termination of generation processes in Langfuse. + + Attributes: + client (FernLangfuse): Core interface for Langfuse API interaction. + id (str): Unique identifier of the generation. + state_type (StateType): Type of the stateful entity (observation or trace). + trace_id (str): Id of trace associated with the generation. + task_manager (TaskManager): Manager for handling asynchronous tasks. + """ + log = logging.getLogger("langfuse") def __init__( @@ -1050,6 +1627,7 @@ def __init__( trace_id: str, task_manager: TaskManager, ): + """Initialize the StatefulGenerationClient.""" super().__init__(client, id, state_type, trace_id, task_manager) # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY @@ -1059,11 +1637,11 @@ def update( name: typing.Optional[str] = None, start_time: typing.Optional[dt.datetime] = None, end_time: typing.Optional[dt.datetime] = None, + completion_start_time: typing.Optional[dt.datetime] = None, metadata: typing.Optional[typing.Any] = None, level: typing.Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]] = None, status_message: typing.Optional[str] = None, version: typing.Optional[str] = None, - completion_start_time: typing.Optional[dt.datetime] = None, model: typing.Optional[str] = None, model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, input: typing.Optional[typing.Any] = None, @@ -1072,6 +1650,44 @@ def update( prompt: typing.Optional[PromptClient] = None, **kwargs, ) -> "StatefulGenerationClient": + """Update the generation. + + Args: + name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. + start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. + end_time (Optional[datetime.datetime]): The time at which the generation ended. Automatically set by `generation.end()`. + completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. + metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. + version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. + model (Optional[str]): The name of the model used for the generation. + model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. + input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. + output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. + usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. + prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. + **kwargs: Additional keyword arguments to include in the generation. + + Returns: + StatefulGenerationClient: The updated generation. Passthrough for chaining. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Create a nested generation in Langfuse + generation = trace.generation(name="summary-generation") + + # Update the generation + generation = generation.update(metadata={"interface": "whatsapp"}) + ``` + """ try: generation_body = { "id": self.id, @@ -1122,11 +1738,11 @@ def end( name: typing.Optional[str] = None, start_time: typing.Optional[dt.datetime] = None, end_time: typing.Optional[dt.datetime] = None, + completion_start_time: typing.Optional[dt.datetime] = None, metadata: typing.Optional[typing.Any] = None, level: typing.Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]] = None, status_message: typing.Optional[str] = None, version: typing.Optional[str] = None, - completion_start_time: typing.Optional[dt.datetime] = None, model: typing.Optional[str] = None, model_parameters: typing.Optional[typing.Dict[str, MapValue]] = None, input: typing.Optional[typing.Any] = None, @@ -1135,6 +1751,44 @@ def end( prompt: typing.Optional[PromptClient] = None, **kwargs, ) -> "StatefulGenerationClient": + """End the generation, optionally updating its properties. + + Args: + name (Optional[str]): Identifier of the generation. Useful for sorting/filtering in the UI. + start_time (Optional[datetime.datetime]): The time at which the generation started, defaults to the current time. + end_time (Optional[datetime.datetime]): Automatically set to the current time. Can be overridden to set a custom end time. + completion_start_time (Optional[datetime.datetime]): The time at which the completion started (streaming). Set it to get latency analytics broken down into time until completion started and completion duration. + metadata (Optional[dict]): Additional metadata of the generation. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[str]): The level of the generation. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the generation. Additional field for context of the event. E.g. the error message of an error event. + version (Optional[str]): The version of the generation type. Used to understand how changes to the span type affect metrics. Useful in debugging. + model (Optional[str]): The name of the model used for the generation. + model_parameters (Optional[dict]): The parameters of the model used for the generation; can be any key-value pairs. + input (Optional[dict]): The prompt used for the generation. Can be any string or JSON object. + output (Optional[dict]): The completion generated by the model. Can be any string or JSON object. + usage (Optional[dict]): The usage object supports the OpenAi structure with {`promptTokens`, `completionTokens`, `totalTokens`} and a more generic version {`input`, `output`, `total`, `unit`, `inputCost`, `outputCost`, `totalCost`} where unit can be of value `"TOKENS"`, `"CHARACTERS"`, `"MILLISECONDS"`, `"SECONDS"`, or `"IMAGES"`. Refer to the docs on how to [automatically infer](https://langfuse.com/docs/model-usage-and-cost) token usage and costs in Langfuse. + prompt (Optional[PromptClient]): The Langfuse prompt object used for the generation. + **kwargs: Additional keyword arguments to include in the generation. + + Returns: + StatefulGenerationClient: The ended generation. Passthrough for chaining. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Create a nested generation in Langfuse + generation = trace.generation(name="summary-generation") + + # End the generation and update its properties + generation = generation.end(metadata={"interface": "whatsapp"}) + ``` + """ return self.update( name=name, start_time=start_time, @@ -1155,6 +1809,16 @@ def end( class StatefulSpanClient(StatefulClient): + """Class for handling stateful operations of spans in the Langfuse system. Inherits from StatefulClient. + + Attributes: + client (FernLangfuse): Core interface for Langfuse API interaction. + id (str): Unique identifier of the span. + state_type (StateType): Type of the stateful entity (observation or trace). + trace_id (str): Id of trace associated with the span. + task_manager (TaskManager): Manager for handling asynchronous tasks. + """ + log = logging.getLogger("langfuse") def __init__( @@ -1165,6 +1829,7 @@ def __init__( trace_id: str, task_manager: TaskManager, ): + """Initialize the StatefulSpanClient.""" super().__init__(client, id, state_type, trace_id, task_manager) # WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY @@ -1182,6 +1847,39 @@ def update( version: typing.Optional[str] = None, **kwargs, ) -> "StatefulSpanClient": + """Update the span. + + Args: + name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. + start_time (Optional[datetime]): The time at which the span started, defaults to the current time. + end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. + metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. + input (Optional[dict]): The input to the span. Can be any JSON object. + output (Optional[dict]): The output to the span. Can be any JSON object. + version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. + **kwargs: Additional keyword arguments to include in the span. + + Returns: + StatefulSpanClient: The updated span. Passthrough for chaining. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Create a nested span in Langfuse + span = trace.span(name="retrieval") + + # Update the span + span = span.update(metadata={"interface": "whatsapp"}) + ``` + """ try: span_body = { "id": self.id, @@ -1232,6 +1930,39 @@ def end( version: typing.Optional[str] = None, **kwargs, ) -> "StatefulSpanClient": + """End the span, optionally updating its properties. + + Args: + name (Optional[str]): Identifier of the span. Useful for sorting/filtering in the UI. + start_time (Optional[datetime]): The time at which the span started, defaults to the current time. + end_time (Optional[datetime]): The time at which the span ended. Automatically set by `span.end()`. + metadata (Optional[dict]): Additional metadata of the span. Can be any JSON object. Metadata is merged when being updated via the API. + level (Optional[Literal["DEBUG", "DEFAULT", "WARNING", "ERROR"]]): The level of the span. Can be `DEBUG`, `DEFAULT`, `WARNING` or `ERROR`. Used for sorting/filtering of traces with elevated error levels and for highlighting in the UI. + status_message (Optional[str]): The status message of the span. Additional field for context of the event. E.g. the error message of an error event. + input (Optional[dict]): The input to the span. Can be any JSON object. + output (Optional[dict]): The output to the span. Can be any JSON object. + version (Optional[str]): The version of the span type. Used to understand how changes to the span type affect metrics. Useful in debugging. + **kwargs: Additional keyword arguments to include in the span. + + Returns: + StatefulSpanClient: The updated span. Passthrough for chaining. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Create a nested span in Langfuse + span = trace.span(name="retrieval") + + # End the span and update its properties + span = span.end(metadata={"interface": "whatsapp"}) + ``` + """ try: span_body = { "name": name, @@ -1259,6 +1990,11 @@ def end( ) def get_langchain_handler(self): + """Get langchain callback handler associated with the current span. + + Returns: + CallbackHandler: An instance of CallbackHandler linked to this StatefulSpanClient. + """ from langfuse.callback import CallbackHandler return CallbackHandler(stateful_client=self) @@ -1270,6 +2006,16 @@ def get_llama_index_handler(self): class StatefulTraceClient(StatefulClient): + """Class for handling stateful operations of traces in the Langfuse system. Inherits from StatefulClient. + + Attributes: + client (FernLangfuse): Core interface for Langfuse API interaction. + id (str): Unique identifier of the trace. + state_type (StateType): Type of the stateful entity (observation or trace). + trace_id (str): The trace ID associated with this client. + task_manager (TaskManager): Manager for handling asynchronous tasks. + """ + log = logging.getLogger("langfuse") def __init__( @@ -1280,6 +2026,7 @@ def __init__( trace_id: str, task_manager: TaskManager, ): + """Initialize the StatefulTraceClient.""" super().__init__(client, id, state_type, trace_id, task_manager) self.task_manager = task_manager @@ -1290,6 +2037,7 @@ def update( user_id: typing.Optional[str] = None, session_id: typing.Optional[str] = None, version: typing.Optional[str] = None, + release: typing.Optional[str] = None, input: typing.Optional[typing.Any] = None, output: typing.Optional[typing.Any] = None, metadata: typing.Optional[typing.Any] = None, @@ -1297,6 +2045,43 @@ def update( public: typing.Optional[bool] = None, **kwargs, ) -> "StatefulTraceClient": + """Update the trace. + + Args: + name: Identifier of the trace. Useful for sorting/filtering in the UI. + input: The input of the trace. Can be any JSON object. + output: The output of the trace. Can be any JSON object. + metadata: Additional metadata of the trace. Can be any JSON object. Metadata is merged when being updated via the API. + user_id: The id of the user that triggered the execution. Used to provide user-level analytics. + session_id: Used to group multiple traces into a session in Langfuse. Use your own session/thread identifier. + version: The version of the trace type. Used to understand how changes to the trace type affect metrics. Useful in debugging. + release: The release identifier of the current deployment. Used to understand how changes of different deployments affect metrics. Useful in debugging. + tags: Tags are used to categorize or label traces. Traces can be filtered by tags in the UI and GET API. Tags can also be changed in the UI. Tags are merged and never deleted via the API. + public: You can make a trace `public` to share it via a public link. This allows others to view the trace without needing to log in or be members of your Langfuse project. + **kwargs: Additional keyword arguments that can be included in the trace. + + Returns: + StatefulTraceClient: The updated trace. Passthrough for chaining. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace( + name="example-application", + user_id="user-1234") + ) + + # Update the trace + trace = trace.update( + output={"result": "success"}, + metadata={"interface": "whatsapp"} + ) + ``` + """ try: trace_body = { "id": self.id, @@ -1305,6 +2090,7 @@ def update( "sessionId": session_id or kwargs.get("sessionId", None), # backward compatibility "version": version, + "release": release, "input": input, "output": output, "metadata": metadata, @@ -1336,6 +2122,30 @@ def update( ) def get_langchain_handler(self): + """Get langchain callback handler associated with the current trace. + + This method creates and returns a CallbackHandler instance, linking it with the current + trace. Use this if you want to group multiple Langchain runs within a single trace. + + Raises: + ImportError: If the 'langchain' module is not installed, indicating missing functionality. + + Returns: + CallbackHandler: Langchain callback handler linked to the current trace. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + # Create a trace + trace = langfuse.trace(name = "llm-feature") + + # Get a langchain callback handler + handler = trace.get_langchain_handler() + ``` + """ try: from langfuse.callback import CallbackHandler @@ -1348,10 +2158,44 @@ def get_langchain_handler(self): self.log.exception(e) def getNewHandler(self): + """Alias for the `get_langchain_handler` method. Retrieves a callback handler for the trace. Deprecated.""" return self.get_langchain_handler() class DatasetItemClient: + """Class for managing dataset items in Langfuse. + + Args: + id (str): Unique identifier of the dataset item. + status (DatasetStatus): The status of the dataset item. Can be either 'ACTIVE' or 'ARCHIVED'. + input (Any): Input data of the dataset item. + expected_output (Optional[Any]): Expected output of the dataset item. + source_observation_id (Optional[str]): Identifier of the source observation. + dataset_id (str): Identifier of the dataset to which this item belongs. + created_at (datetime): Timestamp of dataset item creation. + updated_at (datetime): Timestamp of the last update to the dataset item. + langfuse (Langfuse): Instance of Langfuse client for API interactions. + + Example: + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + dataset = langfuse.get_dataset("") + + for item in dataset.items: + # Generate a completion using the input of every item + completion, generation = llm_app.run(item.input) + + # Evaluate the completion + generation.score( + name="example-score", + value=1 + ) + ``` + """ + id: str status: DatasetStatus input: typing.Any @@ -1364,6 +2208,7 @@ class DatasetItemClient: langfuse: Langfuse def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse): + """Initialize the DatasetItemClient.""" self.id = dataset_item.id self.status = dataset_item.status self.input = dataset_item.input @@ -1376,11 +2221,23 @@ def __init__(self, dataset_item: DatasetItem, langfuse: Langfuse): self.langfuse = langfuse def flush(self, observation: StatefulClient, run_name: str): - # flush the queue before creating the dataset run item - # to ensure that all events are persistet. + """Flushes an observations task manager's queue. + + Used before creating a dataset run item to ensure all events are persistent. + + Args: + observation (StatefulClient): The observation client associated with the dataset item. + run_name (str): The name of the dataset run. + """ observation.task_manager.flush() def link(self, observation: typing.Union[StatefulClient, str], run_name: str): + """Link the dataset item to observation within a specific dataset run. Creates a dataset run item. + + Args: + observation (Union[StatefulClient, str]): The observation to link, either as a client or as an ID. + run_name (str): The name of the dataset run. + """ observation_id = None if isinstance(observation, StatefulClient): @@ -1406,6 +2263,16 @@ def link(self, observation: typing.Union[StatefulClient, str], run_name: str): ) def get_langchain_handler(self, *, run_name: str): + """Create and get a langchain callback handler linked to this dataset item. + + Creates a trace and a span, linked to the trace, and returns a Langchain CallbackHandler to the span. + + Args: + run_name (str): The name of the dataset run to be used in the callback handler. + + Returns: + CallbackHandler: An instance of CallbackHandler linked to the created span. + """ from langfuse.callback import CallbackHandler metadata = { @@ -1424,6 +2291,32 @@ def get_langchain_handler(self, *, run_name: str): class DatasetClient: + """Class for managing datasets in Langfuse. + + Attributes: + id (str): Unique identifier of the dataset. + name (str): Name of the dataset. + project_id (str): Identifier of the project to which the dataset belongs. + dataset_name (str): Name of the dataset. + created_at (datetime): Timestamp of dataset creation. + updated_at (datetime): Timestamp of the last update to the dataset. + items (List[DatasetItemClient]): List of dataset items associated with the dataset. + runs (List[str]): List of dataset runs associated with the dataset. + + Example: + Print the input of each dataset item in a dataset. + ```python + from langfuse import Langfuse + + langfuse = Langfuse() + + dataset = langfuse.get_dataset("") + + for item in dataset.items: + print(item.input) + ``` + """ + id: str name: str project_id: str @@ -1434,6 +2327,7 @@ class DatasetClient: runs: typing.List[str] def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]): + """Initialize the DatasetClient.""" self.id = dataset.id self.name = dataset.name self.project_id = dataset.project_id diff --git a/poetry.lock b/poetry.lock index d60a1b23b..82ee6d5a4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -186,6 +186,21 @@ typing-extensions = {version = ">=4", markers = "python_version < \"3.11\""} [package.extras] tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"] +[[package]] +name = "astunparse" +version = "1.6.3" +description = "An AST unparser for Python" +optional = false +python-versions = "*" +files = [ + {file = "astunparse-1.6.3-py2.py3-none-any.whl", hash = "sha256:c2652417f2c8b5bb325c885ae329bdf3f86424075c4fd1a128674bc6fba4b8e8"}, + {file = "astunparse-1.6.3.tar.gz", hash = "sha256:5ad93a8456f0d084c3456d059fd9a92cce667963232cbf763eac3bc5b7940872"}, +] + +[package.dependencies] +six = ">=1.6.1,<2.0" +wheel = ">=0.23.0,<1.0" + [[package]] name = "async-timeout" version = "4.0.3" @@ -322,8 +337,8 @@ files = [ jmespath = ">=0.7.1,<2.0.0" python-dateutil = ">=2.1,<3.0.0" urllib3 = [ - {version = ">=1.25.4,<1.27", markers = "python_version < \"3.10\""}, {version = ">=1.25.4,<2.1", markers = "python_version >= \"3.10\""}, + {version = ">=1.25.4,<1.27", markers = "python_version < \"3.10\""}, ] [package.extras] @@ -1022,12 +1037,12 @@ files = [ google-auth = ">=2.14.1,<3.0.dev0" googleapis-common-protos = ">=1.56.2,<2.0.dev0" grpcio = [ - {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, + {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, ] grpcio-status = [ - {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, + {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, ] protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0.dev0" requests = ">=2.18.0,<3.0.0.dev0" @@ -1720,6 +1735,23 @@ files = [ {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, ] +[[package]] +name = "jinja2" +version = "3.1.3" +description = "A very fast and expressive template engine." +optional = false +python-versions = ">=3.7" +files = [ + {file = "Jinja2-3.1.3-py3-none-any.whl", hash = "sha256:7d6d50dd97d52cbc355597bd845fabfbac3f551e1f99619e39a35ce8c370b5fa"}, + {file = "Jinja2-3.1.3.tar.gz", hash = "sha256:ac8bd6544d4bb2c9792bf3a159e80bba8fda7f07e81bc3aed565432d5925ba90"}, +] + +[package.dependencies] +MarkupSafe = ">=2.0" + +[package.extras] +i18n = ["Babel (>=2.7)"] + [[package]] name = "jmespath" version = "1.0.1" @@ -3094,8 +3126,8 @@ files = [ [package.dependencies] numpy = [ - {version = ">=1.20.3", markers = "python_version < \"3.10\""}, {version = ">=1.23.2", markers = "python_version >= \"3.11\""}, + {version = ">=1.20.3", markers = "python_version < \"3.10\""}, {version = ">=1.21.0", markers = "python_version >= \"3.10\" and python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" @@ -3125,6 +3157,26 @@ sql-other = ["SQLAlchemy (>=1.4.16)"] test = ["hypothesis (>=6.34.2)", "pytest (>=7.3.2)", "pytest-asyncio (>=0.17.0)", "pytest-xdist (>=2.2.0)"] xml = ["lxml (>=4.6.3)"] +[[package]] +name = "pdoc" +version = "14.4.0" +description = "API Documentation for Python Projects" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pdoc-14.4.0-py3-none-any.whl", hash = "sha256:6ea4fe07620b1f7601e2708a307a257636ec206e20b5611640b30f2e3cab47d6"}, + {file = "pdoc-14.4.0.tar.gz", hash = "sha256:c92edc425429ccbe287ace2a027953c24f13de53eab484c1a6d31ca72dd2fda9"}, +] + +[package.dependencies] +astunparse = {version = "*", markers = "python_version < \"3.9\""} +Jinja2 = ">=2.11.0" +MarkupSafe = "*" +pygments = ">=2.12.0" + +[package.extras] +dev = ["hypothesis", "mypy", "pdoc-pyo3-sample-library (==1.0.11)", "pygments (>=2.14.0)", "pytest", "pytest-cov", "pytest-timeout", "ruff", "tox", "types-pygments"] + [[package]] name = "pillow" version = "10.2.0" @@ -3500,6 +3552,21 @@ files = [ [package.dependencies] typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" +[[package]] +name = "pygments" +version = "2.17.2" +description = "Pygments is a syntax highlighting package written in Python." +optional = false +python-versions = ">=3.7" +files = [ + {file = "pygments-2.17.2-py3-none-any.whl", hash = "sha256:b27c2826c47d0f3219f29554824c30c5e8945175d888647acd804ddd04af846c"}, + {file = "pygments-2.17.2.tar.gz", hash = "sha256:da46cec9fd2de5be3a8a784f434e4c4ab670b4ff54d605c4c2717e9d49c4c367"}, +] + +[package.extras] +plugins = ["importlib-metadata"] +windows-terminal = ["colorama (>=0.4.6)"] + [[package]] name = "pymongo" version = "4.6.2" @@ -4997,6 +5064,20 @@ MarkupSafe = ">=2.1.1" [package.extras] watchdog = ["watchdog (>=2.3)"] +[[package]] +name = "wheel" +version = "0.42.0" +description = "A built-package format for Python" +optional = false +python-versions = ">=3.7" +files = [ + {file = "wheel-0.42.0-py3-none-any.whl", hash = "sha256:177f9c9b0d45c47873b619f5b650346d632cdc35fb5e4d25058e09c9e581433d"}, + {file = "wheel-0.42.0.tar.gz", hash = "sha256:c45be39f7882c9d34243236f2d63cbd58039e360f85d0913425fbd7ceea617a8"}, +] + +[package.extras] +test = ["pytest (>=6.0.0)", "setuptools (>=65)"] + [[package]] name = "wrapt" version = "1.14.0" @@ -5195,4 +5276,4 @@ llama-index = ["llama-index"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "b3d0df9e70b3374cea0a4de6e3e802933048db0aa5ff8e8178fca3c8177baeb1" +content-hash = "b3127d21ba989a55851a4ba1caaeb312883e86371096e27db46a9b12aca4aba6" diff --git a/pyproject.toml b/pyproject.toml index e69925f03..597129d3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,9 @@ pymongo = "^4.6.1" llama-index-llms-anthropic = "^0.1.1" bson = "^0.5.10" +[tool.poetry.group.docs.dependencies] +pdoc = "^14.4.0" + [tool.poetry.extras] langchain = ["langchain"] llama-index = ["llama-index"] @@ -53,9 +56,6 @@ llama-index = ["llama-index"] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" -[tool.ruff] -target-version = 'py38' - [tool.pytest.ini_options] log_cli = true diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 000000000..1a09b7ffd --- /dev/null +++ b/ruff.toml @@ -0,0 +1,15 @@ +# This is the Ruff config used locally. +# In CI, ci.ruff.toml is used instead. + +target-version = 'py38' + +[lint] +select = [ + # Enforce the use of Google-style docstrings. + "D", + # Augment the convention by requiring an imperative mood for all docstrings. + "D401", +] + +[lint.pydocstyle] +convention = "google" \ No newline at end of file