From 04264a04b18759c71bb106fc15a8f81e78122e68 Mon Sep 17 00:00:00 2001 From: Dustin Ngo Date: Wed, 22 Jan 2025 10:17:56 +0900 Subject: [PATCH] feat: Explicit OTEL protocol override (#6067) * Explicitly allow a "protocol" override * Wire up protocol args * Wire up override processors * Use documented OLTP transport protocol names * Add type check to `_missing_` method * Update README --- packages/phoenix-otel/README.md | 10 ++ .../phoenix-otel/src/phoenix/otel/otel.py | 91 ++++++++++++++++--- 2 files changed, 86 insertions(+), 15 deletions(-) diff --git a/packages/phoenix-otel/README.md b/packages/phoenix-otel/README.md index 8ff5d4eba3..e0af0cd4e2 100644 --- a/packages/phoenix-otel/README.md +++ b/packages/phoenix-otel/README.md @@ -61,6 +61,16 @@ from phoenix.otel import register tracer_provider = register(endpoint="http://localhost:6006/v1/traces") ``` +Additionally, the `protocol` argument can be used to enforce the OTLP transport protocol +regardless of the endpoint specified. This might be useful in cases such as when the GRPC +endpoint is bound to a different port than the default (4317). The valid protocols are: +"http/protobuf", and "grpc". + +``` +from phoenix.otel import register +tracer_provider = register(endpoint="http://localhost:9999", protocol="grpc") +``` + ### Additional configuration `register` can be configured with different keyword arguments: diff --git a/packages/phoenix-otel/src/phoenix/otel/otel.py b/packages/phoenix-otel/src/phoenix/otel/otel.py index c794e5121f..7a4407d7ab 100644 --- a/packages/phoenix-otel/src/phoenix/otel/otel.py +++ b/packages/phoenix-otel/src/phoenix/otel/otel.py @@ -2,7 +2,8 @@ import os import sys import warnings -from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast +from enum import Enum +from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union, cast from urllib.parse import ParseResult, urlparse from openinference.semconv.resource import ResourceAttributes as _ResourceAttributes @@ -31,6 +32,31 @@ PROJECT_NAME = _ResourceAttributes.PROJECT_NAME +class OTLPTransportProtocol(str, Enum): + HTTP_PROTOBUF = "http/protobuf" + GRPC = "grpc" + + @classmethod + def _missing_(cls, value: object) -> "OTLPTransportProtocol": + if not isinstance(value, str): + raise ValueError(f"Invalid protocol: {value}. Must be a string.") + if "http" in value: + raise ValueError( + ( + f"Invalid protocol: {value}. Must be one of {cls._valid_protocols_str()}. " + "Did you mean 'http/protobuf'?" + ) + ) + else: + raise ValueError( + (f"Invalid protocol: {value}. Must one of {cls._valid_protocols_str()}.") + ) + + @classmethod + def _valid_protocols_str(cls) -> str: + return "[" + ", ".join([f"'{protocol.value}'" for protocol in cls]) + "]" + + def register( *, endpoint: Optional[str] = None, @@ -38,6 +64,7 @@ def register( batch: bool = False, set_global_tracer_provider: bool = True, headers: Optional[Dict[str, str]] = None, + protocol: Optional[Literal["http/protobuf", "grpc"]] = None, verbose: bool = True, ) -> _TracerProvider: """ @@ -60,17 +87,19 @@ def register( global tracer provider. Defaults to True. headers (dict, optional): Optional headers to include in the request to the collector. If not provided, the `PHOENIX_CLIENT_HEADERS` environment variable will be used. + protocol (str, optional): The protocol to use for the collector endpoint. Must be either + "http/protobuf" or "grpc". If not provided, the protocol will be inferred. verbose (bool): If True, configuration details will be printed to stdout. """ project_name = project_name or get_env_project_name() resource = Resource.create({PROJECT_NAME: project_name}) - tracer_provider = TracerProvider(resource=resource, verbose=False) + tracer_provider = TracerProvider(resource=resource, verbose=False, protocol=protocol) span_processor: SpanProcessor if batch: - span_processor = BatchSpanProcessor(endpoint=endpoint, headers=headers) + span_processor = BatchSpanProcessor(endpoint=endpoint, headers=headers, protocol=protocol) else: - span_processor = SimpleSpanProcessor(endpoint=endpoint, headers=headers) + span_processor = SimpleSpanProcessor(endpoint=endpoint, headers=headers, protocol=protocol) tracer_provider.add_span_processor(span_processor) tracer_provider._default_processor = True @@ -105,11 +134,18 @@ class TracerProvider(_TracerProvider): used to infer which collector endpoint to use, defaults to the gRPC endpoint. When specifying the endpoint, the transport method (HTTP or gRPC) will be inferred from the URL. + protocol (str, optional): The protocol to use for the collector endpoint. Must be either + "http/protobuf" or "grpc". If not provided, the protocol will be inferred. verbose (bool): If True, configuration details will be printed to stdout. """ def __init__( - self, *args: Any, endpoint: Optional[str] = None, verbose: bool = True, **kwargs: Any + self, + *args: Any, + endpoint: Optional[str] = None, + protocol: Optional[Literal["http/protobuf", "grpc"]] = None, + verbose: bool = True, + **kwargs: Any, ): sig = _get_class_signature(_TracerProvider) bound_args = sig.bind_partial(*args, **kwargs) @@ -120,14 +156,19 @@ def __init__( ) super().__init__(*bound_args.args, **bound_args.kwargs) - parsed_url, endpoint = _normalized_endpoint(endpoint) + validated_protocol = OTLPTransportProtocol(protocol) + use_http = validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF + parsed_url, endpoint = _normalized_endpoint(endpoint, use_http=use_http) self._default_processor = False - if _maybe_http_endpoint(parsed_url): + if ( + _maybe_http_endpoint(parsed_url) + or validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF + ): http_exporter: SpanExporter = HTTPSpanExporter(endpoint=endpoint) self.add_span_processor(SimpleSpanProcessor(span_exporter=http_exporter)) self._default_processor = True - elif _maybe_grpc_endpoint(parsed_url): + elif _maybe_grpc_endpoint(parsed_url) or validated_protocol == OTLPTransportProtocol.GRPC: grpc_exporter: SpanExporter = GRPCSpanExporter(endpoint=endpoint) self.add_span_processor(SimpleSpanProcessor(span_exporter=grpc_exporter)) self._default_processor = True @@ -208,6 +249,8 @@ class SimpleSpanProcessor(_SimpleSpanProcessor): headers (dict, optional): Optional headers to include in the request to the collector. If not provided, the `PHOENIX_CLIENT_HEADERS` or `OTEL_EXPORTER_OTLP_HEADERS` environment variable will be used. + protocol (str, optional): The protocol to use for the collector endpoint. Must be either + "http/protobuf" or "grpc". If not provided, the protocol will be inferred. """ def __init__( @@ -215,12 +258,20 @@ def __init__( span_exporter: Optional[SpanExporter] = None, endpoint: Optional[str] = None, headers: Optional[Dict[str, str]] = None, + protocol: Optional[Literal["http/protobuf", "grpc"]] = None, ): if span_exporter is None: - parsed_url, endpoint = _normalized_endpoint(endpoint) - if _maybe_http_endpoint(parsed_url): + validated_protocol = OTLPTransportProtocol(protocol) + use_http = validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF + parsed_url, endpoint = _normalized_endpoint(endpoint, use_http=use_http) + if ( + _maybe_http_endpoint(parsed_url) + or validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF + ): span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) - elif _maybe_grpc_endpoint(parsed_url): + elif ( + _maybe_grpc_endpoint(parsed_url) or validated_protocol == OTLPTransportProtocol.GRPC + ): span_exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers) else: warnings.warn("Could not infer collector endpoint protocol, defaulting to HTTP.") @@ -253,6 +304,8 @@ class BatchSpanProcessor(_BatchSpanProcessor): headers (dict, optional): Optional headers to include in the request to the collector. If not provided, the `PHOENIX_CLIENT_HEADERS` or `OTEL_EXPORTER_OTLP_HEADERS` environment variable will be used. + protocol (str, optional): The protocol to use for the collector endpoint. Must be either + "http/protobuf" or "grpc". If not provided, the protocol will be inferred. max_queue_size (int, optional): The maximum queue size. schedule_delay_millis (float, optional): The delay between two consecutive exports in milliseconds. @@ -265,12 +318,20 @@ def __init__( span_exporter: Optional[SpanExporter] = None, endpoint: Optional[str] = None, headers: Optional[Dict[str, str]] = None, + protocol: Optional[Literal["http/protobuf", "grpc"]] = None, ): if span_exporter is None: - parsed_url, endpoint = _normalized_endpoint(endpoint) - if _maybe_http_endpoint(parsed_url): + validated_protocol = OTLPTransportProtocol(protocol) + use_http = validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF + parsed_url, endpoint = _normalized_endpoint(endpoint, use_http=use_http) + if ( + _maybe_http_endpoint(parsed_url) + or validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF + ): span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers) - elif _maybe_grpc_endpoint(parsed_url): + elif ( + _maybe_grpc_endpoint(parsed_url) or validated_protocol == OTLPTransportProtocol.GRPC + ): span_exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers) else: warnings.warn("Could not infer collector endpoint protocol, defaulting to HTTP.") @@ -394,7 +455,7 @@ def _maybe_grpc_endpoint(parsed_endpoint: ParseResult) -> bool: def _exporter_transport(exporter: SpanExporter) -> str: if isinstance(exporter, _HTTPSpanExporter): - return "HTTP" + return "HTTP + protobuf" if isinstance(exporter, _GRPCSpanExporter): return "gRPC" else: