Skip to content

Commit

Permalink
Add manual spans for streaming queries
Browse files Browse the repository at this point in the history
Add manual spans to clarify where time is being used during streaming queries
  • Loading branch information
dhirving committed Nov 19, 2024
1 parent 632e7cd commit 8a5d6e6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
40 changes: 36 additions & 4 deletions python/lsst/daf/butler/remote_butler/server/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,56 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Any
from collections.abc import Iterator
from contextlib import AbstractContextManager, contextmanager
from typing import Any, Protocol

try:
import sentry_sdk
except ImportError:
sentry_sdk = None


class TelemetryContext(Protocol):
def span(self, name: str) -> AbstractContextManager[None]: ...


class NullTelemetryContext(TelemetryContext):
@contextmanager
def span(self, name: str) -> Iterator[None]:
yield


class SentryTelemetryContext(TelemetryContext):
@contextmanager
def span(self, name: str) -> Iterator[None]:
with sentry_sdk.start_span(name=name):
yield


_telemetry_context: TelemetryContext = NullTelemetryContext()


def enable_telemetry() -> None:
"""Turn on upload of trace telemetry to Sentry, to allow performance
debugging of deployed server.
"""
try:
import sentry_sdk
except ImportError:
if sentry_sdk is None:
return

global _telemetry_context
_telemetry_context = SentryTelemetryContext()

# Configuration will be pulled from SENTRY_* environment variables
# (see https://docs.sentry.io/platforms/python/configuration/options/).
# If SENTRY_DSN is not present, telemetry is disabled.
sentry_sdk.init(enable_tracing=True, traces_sampler=_decide_whether_to_sample_trace)


def get_telemetry_context() -> TelemetryContext:
return _telemetry_context


def _decide_whether_to_sample_trace(context: dict[str, Any]) -> float:
asgi_scope = context.get("asgi_scope")
if asgi_scope is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

from ...._exceptions import ButlerUserError
from ..._errors import serialize_butler_user_error
from .._telemetry import get_telemetry_context

# Alias this function so we can mock it during unit tests.
_timeout = asyncio.timeout
Expand Down Expand Up @@ -135,9 +136,12 @@ async def _enqueue_query_pages(
queue. Send `None` to the queue when there is no more data to read.
"""
try:
telemetry = get_telemetry_context()
async with contextmanager_in_threadpool(query.setup()) as ctx:
async for page in iterate_in_threadpool(query.execute(ctx)):
await queue.put(page)
with telemetry.span("Execute query and send results"):
async for page in iterate_in_threadpool(query.execute(ctx)):
with telemetry.span("Wait for caller to read results"):
await queue.put(page)
except ButlerUserError as e:
# If a user-facing error occurs, serialize it and send it to the
# client.
Expand Down

0 comments on commit 8a5d6e6

Please sign in to comment.