Skip to content

Commit

Permalink
Add manual spans for streaming queries
Browse files Browse the repository at this point in the history
Add manual spans to clarify where time is being used during streaming queries
  • Loading branch information
dhirving committed Nov 19, 2024
1 parent 632e7cd commit 2cff98f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 7 deletions.
40 changes: 36 additions & 4 deletions python/lsst/daf/butler/remote_butler/server/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,55 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Any
from collections.abc import Iterator
from contextlib import AbstractContextManager, contextmanager
from typing import Any, Protocol

try:
import sentry_sdk
except ImportError:
sentry_sdk = None


class TelemetryContext(Protocol):
def span(self, name: str) -> AbstractContextManager[None]: ...


class NullTelemetryContext(TelemetryContext):
@contextmanager
def span(self, name: str) -> Iterator[None]:
yield


class SentryTelemetryContext(TelemetryContext):
@contextmanager
def span(self, name: str) -> Iterator[None]:
with sentry_sdk.start_span(name=name):
yield

Check warning on line 52 in python/lsst/daf/butler/remote_butler/server/_telemetry.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/server/_telemetry.py#L51-L52

Added lines #L51 - L52 were not covered by tests


_telemetry_context: TelemetryContext = NullTelemetryContext()


def enable_telemetry() -> None:
"""Turn on upload of trace telemetry to Sentry, to allow performance
debugging of deployed server.
"""
try:
import sentry_sdk
except ImportError:
if sentry_sdk is None:
return

# Configuration will be pulled from SENTRY_* environment variables
# (see https://docs.sentry.io/platforms/python/configuration/options/).
# If SENTRY_DSN is not present, telemetry is disabled.
sentry_sdk.init(enable_tracing=True, traces_sampler=_decide_whether_to_sample_trace)

Check warning on line 68 in python/lsst/daf/butler/remote_butler/server/_telemetry.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/server/_telemetry.py#L68

Added line #L68 was not covered by tests

global _telemetry_context
_telemetry_context = SentryTelemetryContext()

Check warning on line 71 in python/lsst/daf/butler/remote_butler/server/_telemetry.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/server/_telemetry.py#L71

Added line #L71 was not covered by tests


def get_telemetry_context() -> TelemetryContext:
return _telemetry_context


def _decide_whether_to_sample_trace(context: dict[str, Any]) -> float:
asgi_scope = context.get("asgi_scope")

Check warning on line 79 in python/lsst/daf/butler/remote_butler/server/_telemetry.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/server/_telemetry.py#L79

Added line #L79 was not covered by tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

from ...._exceptions import ButlerUserError
from ..._errors import serialize_butler_user_error
from .._telemetry import get_telemetry_context

# Alias this function so we can mock it during unit tests.
_timeout = asyncio.timeout
Expand Down Expand Up @@ -135,9 +136,13 @@ async def _enqueue_query_pages(
queue. Send `None` to the queue when there is no more data to read.
"""
try:
async with contextmanager_in_threadpool(query.setup()) as ctx:
async for page in iterate_in_threadpool(query.execute(ctx)):
await queue.put(page)
telemetry = get_telemetry_context()
with telemetry.span("Execute query"):
async with contextmanager_in_threadpool(query.setup()) as ctx:
with telemetry.span("Read from DB and send results"):
async for page in iterate_in_threadpool(query.execute(ctx)):
with telemetry.span("Wait for caller to read results"):
await queue.put(page)
except ButlerUserError as e:
# If a user-facing error occurs, serialize it and send it to the
# client.
Expand Down

0 comments on commit 2cff98f

Please sign in to comment.