Skip to content

Commit

Permalink
feat: Explicit OTEL protocol override (#6067)
Browse files Browse the repository at this point in the history
* Explicitly allow a "protocol" override

* Wire up protocol args

* Wire up override processors

* Use documented OLTP transport protocol names

* Add type check to `_missing_` method

* Update README
  • Loading branch information
anticorrelator authored Jan 22, 2025
1 parent 9f28551 commit 04264a0
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 15 deletions.
10 changes: 10 additions & 0 deletions packages/phoenix-otel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ from phoenix.otel import register
tracer_provider = register(endpoint="http://localhost:6006/v1/traces")
```

Additionally, the `protocol` argument can be used to enforce the OTLP transport protocol
regardless of the endpoint specified. This might be useful in cases such as when the GRPC
endpoint is bound to a different port than the default (4317). The valid protocols are:
"http/protobuf", and "grpc".

```
from phoenix.otel import register
tracer_provider = register(endpoint="http://localhost:9999", protocol="grpc")
```

### Additional configuration

`register` can be configured with different keyword arguments:
Expand Down
91 changes: 76 additions & 15 deletions packages/phoenix-otel/src/phoenix/otel/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
import sys
import warnings
from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union, cast
from urllib.parse import ParseResult, urlparse

from openinference.semconv.resource import ResourceAttributes as _ResourceAttributes
Expand Down Expand Up @@ -31,13 +32,39 @@
PROJECT_NAME = _ResourceAttributes.PROJECT_NAME


class OTLPTransportProtocol(str, Enum):
HTTP_PROTOBUF = "http/protobuf"
GRPC = "grpc"

@classmethod
def _missing_(cls, value: object) -> "OTLPTransportProtocol":
if not isinstance(value, str):
raise ValueError(f"Invalid protocol: {value}. Must be a string.")
if "http" in value:
raise ValueError(
(
f"Invalid protocol: {value}. Must be one of {cls._valid_protocols_str()}. "
"Did you mean 'http/protobuf'?"
)
)
else:
raise ValueError(
(f"Invalid protocol: {value}. Must one of {cls._valid_protocols_str()}.")
)

@classmethod
def _valid_protocols_str(cls) -> str:
return "[" + ", ".join([f"'{protocol.value}'" for protocol in cls]) + "]"


def register(
*,
endpoint: Optional[str] = None,
project_name: Optional[str] = None,
batch: bool = False,
set_global_tracer_provider: bool = True,
headers: Optional[Dict[str, str]] = None,
protocol: Optional[Literal["http/protobuf", "grpc"]] = None,
verbose: bool = True,
) -> _TracerProvider:
"""
Expand All @@ -60,17 +87,19 @@ def register(
global tracer provider. Defaults to True.
headers (dict, optional): Optional headers to include in the request to the collector.
If not provided, the `PHOENIX_CLIENT_HEADERS` environment variable will be used.
protocol (str, optional): The protocol to use for the collector endpoint. Must be either
"http/protobuf" or "grpc". If not provided, the protocol will be inferred.
verbose (bool): If True, configuration details will be printed to stdout.
"""

project_name = project_name or get_env_project_name()
resource = Resource.create({PROJECT_NAME: project_name})
tracer_provider = TracerProvider(resource=resource, verbose=False)
tracer_provider = TracerProvider(resource=resource, verbose=False, protocol=protocol)
span_processor: SpanProcessor
if batch:
span_processor = BatchSpanProcessor(endpoint=endpoint, headers=headers)
span_processor = BatchSpanProcessor(endpoint=endpoint, headers=headers, protocol=protocol)
else:
span_processor = SimpleSpanProcessor(endpoint=endpoint, headers=headers)
span_processor = SimpleSpanProcessor(endpoint=endpoint, headers=headers, protocol=protocol)
tracer_provider.add_span_processor(span_processor)
tracer_provider._default_processor = True

Expand Down Expand Up @@ -105,11 +134,18 @@ class TracerProvider(_TracerProvider):
used to infer which collector endpoint to use, defaults to the gRPC endpoint. When
specifying the endpoint, the transport method (HTTP or gRPC) will be inferred from the
URL.
protocol (str, optional): The protocol to use for the collector endpoint. Must be either
"http/protobuf" or "grpc". If not provided, the protocol will be inferred.
verbose (bool): If True, configuration details will be printed to stdout.
"""

def __init__(
self, *args: Any, endpoint: Optional[str] = None, verbose: bool = True, **kwargs: Any
self,
*args: Any,
endpoint: Optional[str] = None,
protocol: Optional[Literal["http/protobuf", "grpc"]] = None,
verbose: bool = True,
**kwargs: Any,
):
sig = _get_class_signature(_TracerProvider)
bound_args = sig.bind_partial(*args, **kwargs)
Expand All @@ -120,14 +156,19 @@ def __init__(
)
super().__init__(*bound_args.args, **bound_args.kwargs)

parsed_url, endpoint = _normalized_endpoint(endpoint)
validated_protocol = OTLPTransportProtocol(protocol)
use_http = validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF
parsed_url, endpoint = _normalized_endpoint(endpoint, use_http=use_http)
self._default_processor = False

if _maybe_http_endpoint(parsed_url):
if (
_maybe_http_endpoint(parsed_url)
or validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF
):
http_exporter: SpanExporter = HTTPSpanExporter(endpoint=endpoint)
self.add_span_processor(SimpleSpanProcessor(span_exporter=http_exporter))
self._default_processor = True
elif _maybe_grpc_endpoint(parsed_url):
elif _maybe_grpc_endpoint(parsed_url) or validated_protocol == OTLPTransportProtocol.GRPC:
grpc_exporter: SpanExporter = GRPCSpanExporter(endpoint=endpoint)
self.add_span_processor(SimpleSpanProcessor(span_exporter=grpc_exporter))
self._default_processor = True
Expand Down Expand Up @@ -208,19 +249,29 @@ class SimpleSpanProcessor(_SimpleSpanProcessor):
headers (dict, optional): Optional headers to include in the request to the collector.
If not provided, the `PHOENIX_CLIENT_HEADERS` or `OTEL_EXPORTER_OTLP_HEADERS`
environment variable will be used.
protocol (str, optional): The protocol to use for the collector endpoint. Must be either
"http/protobuf" or "grpc". If not provided, the protocol will be inferred.
"""

def __init__(
self,
span_exporter: Optional[SpanExporter] = None,
endpoint: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
protocol: Optional[Literal["http/protobuf", "grpc"]] = None,
):
if span_exporter is None:
parsed_url, endpoint = _normalized_endpoint(endpoint)
if _maybe_http_endpoint(parsed_url):
validated_protocol = OTLPTransportProtocol(protocol)
use_http = validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF
parsed_url, endpoint = _normalized_endpoint(endpoint, use_http=use_http)
if (
_maybe_http_endpoint(parsed_url)
or validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF
):
span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers)
elif _maybe_grpc_endpoint(parsed_url):
elif (
_maybe_grpc_endpoint(parsed_url) or validated_protocol == OTLPTransportProtocol.GRPC
):
span_exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers)
else:
warnings.warn("Could not infer collector endpoint protocol, defaulting to HTTP.")
Expand Down Expand Up @@ -253,6 +304,8 @@ class BatchSpanProcessor(_BatchSpanProcessor):
headers (dict, optional): Optional headers to include in the request to the collector.
If not provided, the `PHOENIX_CLIENT_HEADERS` or `OTEL_EXPORTER_OTLP_HEADERS`
environment variable will be used.
protocol (str, optional): The protocol to use for the collector endpoint. Must be either
"http/protobuf" or "grpc". If not provided, the protocol will be inferred.
max_queue_size (int, optional): The maximum queue size.
schedule_delay_millis (float, optional): The delay between two consecutive exports in
milliseconds.
Expand All @@ -265,12 +318,20 @@ def __init__(
span_exporter: Optional[SpanExporter] = None,
endpoint: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
protocol: Optional[Literal["http/protobuf", "grpc"]] = None,
):
if span_exporter is None:
parsed_url, endpoint = _normalized_endpoint(endpoint)
if _maybe_http_endpoint(parsed_url):
validated_protocol = OTLPTransportProtocol(protocol)
use_http = validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF
parsed_url, endpoint = _normalized_endpoint(endpoint, use_http=use_http)
if (
_maybe_http_endpoint(parsed_url)
or validated_protocol == OTLPTransportProtocol.HTTP_PROTOBUF
):
span_exporter = HTTPSpanExporter(endpoint=endpoint, headers=headers)
elif _maybe_grpc_endpoint(parsed_url):
elif (
_maybe_grpc_endpoint(parsed_url) or validated_protocol == OTLPTransportProtocol.GRPC
):
span_exporter = GRPCSpanExporter(endpoint=endpoint, headers=headers)
else:
warnings.warn("Could not infer collector endpoint protocol, defaulting to HTTP.")
Expand Down Expand Up @@ -394,7 +455,7 @@ def _maybe_grpc_endpoint(parsed_endpoint: ParseResult) -> bool:

def _exporter_transport(exporter: SpanExporter) -> str:
if isinstance(exporter, _HTTPSpanExporter):
return "HTTP"
return "HTTP + protobuf"
if isinstance(exporter, _GRPCSpanExporter):
return "gRPC"
else:
Expand Down

0 comments on commit 04264a0

Please sign in to comment.