diff --git a/scripts/run_local_otel_collector.sh b/scripts/run_local_otel_collector.sh new file mode 100755 index 00000000..b5dd1d7b --- /dev/null +++ b/scripts/run_local_otel_collector.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +set -x +set -eo pipefail + +# If an OpenTelemetry Collector container is already running, kill it. +RUNNING_OTEL_COLLECTOR_CONTAINER=$(docker ps --filter 'name=otel_collector' --format '{{.ID}}') +if [[ -n ${RUNNING_OTEL_COLLECTOR_CONTAINER} ]]; then + docker kill "${RUNNING_OTEL_COLLECTOR_CONTAINER}" +fi + +# Run the OpenTelemetry Collector in a new container. +# The `--mount` and `--env` options mount the local Default Application Credentials fetched after +# `gcloud auth login --update-adc` to ensure the OpenTelemetry Collector is able to authenticate to +# GCP from inside the Docker container. The `--user` option overrides the ID of the default user +# running the processes inside the container, to ensure `/etc/gcp/creds.json` is readable. Otherwise +# the default user of the Docker almost certainly has a different ID than the local user on the +# Docker host, and `~/.config/gcloud/application_default_credentials.json` can only be read by the +# current local user (600 permissions = rw-------). Finally, the `OTEL_BACKEND_PROJECT_ID` environment +# variable, used in `config.yaml`, tells the OpenTelemetry Collector which GCP project to export to. +# Define it prior to running this script, for example in `.env`, next to the other variables. +# Published ports: +# - 4317 = gRPC +# - 4318 = HTTP +docker run \ + --mount type=bind,source=${HOME}/.config/gcloud/application_default_credentials.json,target=/etc/gcp/creds.json,readonly \ + --env GOOGLE_APPLICATION_CREDENTIALS=/etc/gcp/creds.json \ + --user $(id --user) \ + --mount type=bind,source=$(pwd)/traces.yaml,target=/etc/otelcol-contrib/config.yaml,readonly \ + --env OTEL_BACKEND_PROJECT_ID \ + --publish 4317:4317 \ + --publish 4318:4318 \ + --name "otel_collector_$(date '+%s')" \ + --detach \ + otel/opentelemetry-collector-contrib + +# Configure the SLO Generator to send traces to the OpenTelemetry Collector... +export SEND_TRACES_TO_OTLP_EXPORTER=1 + +# ... and tail the OpenTelemetry Collector logs. +docker logs --follow $(docker ps --filter 'name=otel_collector' --format '{{.ID}}') diff --git a/scripts/traces.yaml b/scripts/traces.yaml new file mode 100644 index 00000000..d3d30511 --- /dev/null +++ b/scripts/traces.yaml @@ -0,0 +1,26 @@ +receivers: + otlp: + protocols: + grpc: + http: + +processors: + resourcedetection: + detectors: [env, gcp] + timeout: 2s + override: false + +exporters: + googlecloud: + project: ${env:OTEL_BACKEND_PROJECT_ID} + +extensions: + health_check: + +service: + extensions: [health_check] + pipelines: + traces: + receivers: [otlp] + processors: [resourcedetection] + exporters: [googlecloud] diff --git a/setup.cfg b/setup.cfg index 44003420..ca3881b8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -56,6 +56,9 @@ install_requires = click setuptools >=65.5.1 # https://pyup.io/v/52495/f17 (reported by `safety check`) certifi >=2023.07.22 # avoid CVE-2023-37920 (reported by `safety check`) + opentelemetry-api==1.24.0 # required to initialize OpenTelemetry + opentelemetry-sdk==1.24.0 # required to instrument the code + opentelemetry-exporter-otlp==1.24.0 # required to export to OpenTelemetry Collectors [options.packages.find] exclude = diff --git a/slo_generator/api/main.py b/slo_generator/api/main.py index 67cc8e4f..7201c892 100644 --- a/slo_generator/api/main.py +++ b/slo_generator/api/main.py @@ -26,6 +26,7 @@ import requests from flask import jsonify, make_response +from opentelemetry import trace from slo_generator.compute import compute, export from slo_generator.utils import get_exporters, load_config, setup_logging @@ -36,7 +37,10 @@ API_SIGNATURE_TYPE = os.environ["GOOGLE_FUNCTION_SIGNATURE_TYPE"] setup_logging() +tracer = trace.get_tracer(__name__) + +@tracer.start_as_current_span("run_compute") def run_compute(request): """Run slo-generator compute function. Can be configured to export data as well, using the `exporters` key of the SLO config. @@ -79,6 +83,7 @@ def run_compute(request): return reports +@tracer.start_as_current_span("run_export") def run_export(request): """Run slo-generator export function. Get the SLO report data from a request object. @@ -142,6 +147,7 @@ def run_export(request): return errors +@tracer.start_as_current_span("process_req") def process_req(request): """Process incoming request. diff --git a/slo_generator/backends/cloud_monitoring.py b/slo_generator/backends/cloud_monitoring.py index 8997957c..186c91aa 100644 --- a/slo_generator/backends/cloud_monitoring.py +++ b/slo_generator/backends/cloud_monitoring.py @@ -22,11 +22,14 @@ from collections import OrderedDict from google.cloud import monitoring_v3 +from opentelemetry import trace from slo_generator.constants import NO_DATA LOGGER = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + class CloudMonitoringBackend: """Backend for querying metrics from Cloud Monitoring. @@ -38,12 +41,15 @@ class CloudMonitoringBackend: if omitted. """ + @tracer.start_as_current_span("CloudMonitoringBackend") def __init__(self, project_id, client=None): - self.client = client - if client is None: - self.client = monitoring_v3.MetricServiceClient() + with tracer.start_as_current_span("Instantiate client"): + self.client = client + if client is None: + self.client = monitoring_v3.MetricServiceClient() self.parent = self.client.common_project_path(project_id) + @tracer.start_as_current_span("good_bad_ratio") def good_bad_ratio(self, timestamp, window, slo_config): """Query two timeseries, one containing 'good' events, one containing 'bad' events. @@ -94,8 +100,17 @@ def good_bad_ratio(self, timestamp, window, slo_config): f"Good events: {good_event_count} | " f"Bad events: {bad_event_count}" ) + trace.get_current_span().add_event( + "count", + { + "good": good_event_count, + "bad": bad_event_count, + }, + ) + return good_event_count, bad_event_count + @tracer.start_as_current_span("distribution_cut") def distribution_cut(self, timestamp, window, slo_config): """Query one timeseries of type 'exponential'. @@ -158,8 +173,16 @@ def distribution_cut(self, timestamp, window, slo_config): good_event_count = upper_events_count bad_event_count = lower_events_count + trace.get_current_span().add_event( + "count", + { + "good": good_event_count, + "bad": bad_event_count, + }, + ) return good_event_count, bad_event_count + @tracer.start_as_current_span("exponential_distribution_cut") def exponential_distribution_cut(self, *args, **kwargs): """Alias for `distribution_cut` method to allow for backwards compatibility. @@ -172,6 +195,7 @@ def exponential_distribution_cut(self, *args, **kwargs): ) return self.distribution_cut(*args, **kwargs) + @tracer.start_as_current_span("query") def query( # noqa: PLR0913 self, timestamp, diff --git a/slo_generator/cli.py b/slo_generator/cli.py index db84bec0..e8880aaf 100644 --- a/slo_generator/cli.py +++ b/slo_generator/cli.py @@ -23,17 +23,43 @@ from pathlib import Path import click +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor from pkg_resources import get_distribution from slo_generator import utils from slo_generator.compute import compute as _compute -from slo_generator.constants import LATEST_MAJOR_VERSION +from slo_generator.constants import LATEST_MAJOR_VERSION, SEND_TRACES_TO_OTLP_EXPORTER from slo_generator.migrations import migrator sys.path.append(os.getcwd()) # dynamic backend loading +# logging.basicConfig(level=logging.INFO) LOGGER = logging.getLogger(__name__) +# Set up Tracing with OpenTelemetry. +# TODO Refactor to `utils.setup_tracing()`, similar to `utils.setup_logging()`? +# --- +# Set the resource name that will show up in the traces. +resource = Resource( + attributes={ + "service.name": "slo-generator", + } +) +# Create a new tracer provider. +provider = TracerProvider(resource=resource) +# Batch ended spans and push them to the OTLP exporter if requested by the user. +if SEND_TRACES_TO_OTLP_EXPORTER: + processor = BatchSpanProcessor( + OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True) + ) + provider.add_span_processor(processor) +# Set the global default tracer provider. +trace.set_tracer_provider(provider) + @click.group(invoke_without_command=True) @click.option( diff --git a/slo_generator/compute.py b/slo_generator/compute.py index 1d489fe4..46bf03ff 100644 --- a/slo_generator/compute.py +++ b/slo_generator/compute.py @@ -21,13 +21,18 @@ import time from typing import Optional +from opentelemetry import trace + from slo_generator import constants, utils from slo_generator.migrations.migrator import report_v2tov1 from slo_generator.report import SLOReport LOGGER = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + +@tracer.start_as_current_span("compute") def compute( # noqa: PLR0913 slo_config: dict, config: dict, @@ -95,6 +100,7 @@ def compute( # noqa: PLR0913 return reports +@tracer.start_as_current_span("export") def export(data: dict, exporters: list, raise_on_error: bool = False) -> list: """Export data using selected exporters. diff --git a/slo_generator/constants.py b/slo_generator/constants.py index 9f4d759f..6200a8e0 100644 --- a/slo_generator/constants.py +++ b/slo_generator/constants.py @@ -27,6 +27,9 @@ COLORED_OUTPUT: int = int(os.environ.get("COLORED_OUTPUT", "0")) DRY_RUN: bool = bool(int(os.environ.get("DRY_RUN", "0"))) DEBUG: int = int(os.environ.get("DEBUG", "0")) +SEND_TRACES_TO_OTLP_EXPORTER: bool = bool( + int(os.environ.get("SEND_TRACES_TO_OTLP_EXPORTER", "0")) +) # Exporters supporting v2 SLO report format V2_EXPORTERS: tuple[str, ...] = ("Pubsub", "Cloudevent") diff --git a/slo_generator/exporters/base.py b/slo_generator/exporters/base.py index 882be59c..4ac4970b 100644 --- a/slo_generator/exporters/base.py +++ b/slo_generator/exporters/base.py @@ -20,8 +20,12 @@ import warnings from abc import ABCMeta, abstractmethod +from opentelemetry import trace + LOGGER = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + # Default metric labels exported by all metrics exporters DEFAULT_METRIC_LABELS = [ "error_budget_policy_step_name", @@ -67,6 +71,7 @@ class MetricsExporter: # pytype: disable=ignored-metaclass __metaclass__ = ABCMeta # pytype: disable=ignored-metaclass + @tracer.start_as_current_span("export") def export(self, data, **config): """Export metric data. Loops through metrics config and calls the child class `export_metric` method. @@ -104,6 +109,7 @@ class `export_metric` method. metric = self.build_metric(data, metric) self.export_metric(metric) + @tracer.start_as_current_span("build_metric") def build_metric(self, data, metric): """Build a metric from current data and metric configuration. Set the metric value labels and eventual alias. @@ -140,6 +146,7 @@ def build_metric(self, data, metric): metric["description"] = metric.get("description", "") return metric + @tracer.start_as_current_span("build_data_labels") @staticmethod def build_data_labels(data, labels): """Build data labels. Also handle nested labels (depth=1). diff --git a/slo_generator/exporters/cloud_monitoring.py b/slo_generator/exporters/cloud_monitoring.py index 4c7bf1be..6da1660d 100644 --- a/slo_generator/exporters/cloud_monitoring.py +++ b/slo_generator/exporters/cloud_monitoring.py @@ -19,11 +19,14 @@ import logging from google.cloud import monitoring_v3 +from opentelemetry import trace from .base import MetricsExporter LOGGER = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + class CloudMonitoringExporter(MetricsExporter): """Cloud Monitoring exporter class.""" @@ -31,9 +34,11 @@ class CloudMonitoringExporter(MetricsExporter): METRIC_PREFIX = "custom.googleapis.com/" REQUIRED_FIELDS = ["project_id"] + @tracer.start_as_current_span("CloudMonitoringExporter") def __init__(self): self.client = monitoring_v3.MetricServiceClient() + @tracer.start_as_current_span("export_metric") def export_metric(self, data: dict): """Export metric to Cloud Monitoring. @@ -45,6 +50,7 @@ def export_metric(self, data: dict): """ self.create_timeseries(data) + @tracer.start_as_current_span("create_timeseries") def create_timeseries(self, data: dict): """Create Cloud Monitoring timeseries. diff --git a/slo_generator/report.py b/slo_generator/report.py index 45dea439..d0153769 100644 --- a/slo_generator/report.py +++ b/slo_generator/report.py @@ -19,11 +19,15 @@ import logging from dataclasses import asdict, dataclass, field, fields +from opentelemetry import trace + from slo_generator import utils from slo_generator.constants import COLORED_OUTPUT, MIN_VALID_EVENTS, NO_DATA, Colors LOGGER = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + @dataclass(init=False) class SLOReport: @@ -86,6 +90,7 @@ class SLOReport: # Data validation errors: list[str] = field(default_factory=list) + @tracer.start_as_current_span("SLOReport") def __init__(self, config, backend, step, timestamp, client=None, delete=False): # noqa: PLR0913 # Init dataclass fields from SLO config and Error Budget Policy spec = config["spec"] @@ -122,6 +127,7 @@ def __init__(self, config, backend, step, timestamp, client=None, delete=False): if not self._post_validate(): self.valid = False + @tracer.start_as_current_span("build") def build(self, step, data): """Compute all data necessary to build the SLO report. @@ -178,6 +184,7 @@ def build(self, step, data): consequence_message=consequence_message, ) + @tracer.start_as_current_span("run_backend") def run_backend(self, config, backend, client=None, delete=False): """Get appropriate backend method from SLO configuration and run it on current SLO config and Error Budget Policy step. @@ -192,23 +199,25 @@ def run_backend(self, config, backend, client=None, delete=False): Returns: obj: Backend data. """ - # Grab backend class and method dynamically. - cls_name = backend.get("class") - method = config["spec"]["method"] - excluded_keys = ["class", "service_level_indicator", "name"] - backend_cfg = {k: v for k, v in backend.items() if k not in excluded_keys} - cls = utils.get_backend_cls(cls_name) - if not cls: - LOGGER.warning(f"{self.info} | Backend {cls_name} not loaded.") - self.valid = False - return None - instance = cls(client=client, **backend_cfg) - method = getattr(instance, method) - LOGGER.debug( - f"{self.info} | " - f"Using backend {cls_name}.{method.__name__} (from " - f"SLO config file)." - ) + with tracer.start_as_current_span("Get backend class and method"): + # Grab backend class and method dynamically. + cls_name = backend.get("class") + method = config["spec"]["method"] + excluded_keys = ["class", "service_level_indicator", "name"] + backend_cfg = {k: v for k, v in backend.items() if k not in excluded_keys} + cls = utils.get_backend_cls(cls_name) + if not cls: + LOGGER.warning(f"{self.info} | Backend {cls_name} not loaded.") + self.valid = False + return None + with tracer.start_as_current_span("Get instance and method"): + instance = cls(client=client, **backend_cfg) + method = getattr(instance, method) + LOGGER.debug( + f"{self.info} | " + f"Using backend {cls_name}.{method.__name__} (from " + f"SLO config file)." + ) # Delete mode activation. if delete and hasattr(instance, "delete"): @@ -217,13 +226,15 @@ def run_backend(self, config, backend, client=None, delete=False): # Run backend method and return results. try: - data = method(self.timestamp, self.window, config) - LOGGER.debug(f"{self.info} | Backend response: {data}") + with tracer.start_as_current_span("Call method"): + data = method(self.timestamp, self.window, config) + LOGGER.debug(f"{self.info} | Backend response: {data}") except Exception as exc: self.errors.append(utils.fmt_traceback(exc)) return None return data + @tracer.start_as_current_span("get_sli") def get_sli(self, data): """Compute SLI value and good / bad counts from the backend result. @@ -268,6 +279,7 @@ def to_json(self) -> dict: } return asdict(self) + @tracer.start_as_current_span("_validate") def _validate(self, data) -> bool: # noqa: PLR0911 """Validate backend results. Invalid data will result in SLO report not being built. @@ -352,6 +364,7 @@ def _validate(self, data) -> bool: # noqa: PLR0911 return True + @tracer.start_as_current_span("_post_validate") def _post_validate(self) -> bool: """Validate report after build.""" diff --git a/slo_generator/utils.py b/slo_generator/utils.py index b88534fc..77733f56 100644 --- a/slo_generator/utils.py +++ b/slo_generator/utils.py @@ -32,6 +32,7 @@ import yaml from dateutil import tz +from opentelemetry import trace from slo_generator.constants import DEBUG @@ -45,7 +46,10 @@ LOGGER = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) + +@tracer.start_as_current_span("load_configs") def load_configs( path: str, ctx: os._Environ = os.environ, kind: Optional[str] = None ) -> list: @@ -66,6 +70,7 @@ def load_configs( return [cfg for cfg in configs if cfg] +@tracer.start_as_current_span("load_config") def load_config( path: str, ctx: os._Environ = os.environ, kind: Optional[str] = None ) -> Optional[dict]: @@ -80,6 +85,8 @@ def load_config( Returns: dict: Config downloaded and parsed. """ + trace.get_current_span().set_attribute("path", path) + abspath = Path(path) try: if path.startswith("gs://"): @@ -110,6 +117,7 @@ def load_config( raise +@tracer.start_as_current_span("parse_config") def parse_config( path: Optional[str] = None, content=None, ctx: os._Environ = os.environ ): @@ -124,6 +132,8 @@ def parse_config( Returns: dict: Parsed YAML dictionary. """ + trace.get_current_span().set_attribute("path", path) + pattern = re.compile(r".*?\${(\w+)}.*?") def replace_env_vars(content, ctx) -> str: @@ -220,6 +230,7 @@ def get_human_time(timestamp: int, timezone: Optional[str] = None) -> str: return date_str +@tracer.start_as_current_span("get_exporters") def get_exporters(config: dict, spec: dict) -> list: """Get SLO exporters configs from spec and global config. @@ -247,6 +258,7 @@ def get_exporters(config: dict, spec: dict) -> list: return exporters +@tracer.start_as_current_span("get_backend") def get_backend(config: dict, spec: dict): """Get SLO backend config from spec and global config. @@ -272,6 +284,7 @@ def get_backend(config: dict, spec: dict): return backend_data +@tracer.start_as_current_span("get_error_budget_policy") def get_error_budget_policy(config: dict, spec: dict): """Get error budget policy from spec and global config. @@ -290,6 +303,7 @@ def get_error_budget_policy(config: dict, spec: dict): return all_ebp[spec_ebp] +@tracer.start_as_current_span("get_backend_cls") def get_backend_cls(backend: str): """Get backend class. @@ -303,6 +317,7 @@ def get_backend_cls(backend: str): return import_cls(backend, expected_type) +@tracer.start_as_current_span("get_exporter_cls") def get_exporter_cls(exporter: str): """Get exporter class. @@ -316,6 +331,7 @@ def get_exporter_cls(exporter: str): return import_cls(exporter, expected_type) +@tracer.start_as_current_span("import_cls") def import_cls(cls_name, expected_type): """Import class or method dynamically from full name. If `cls_name` is not part of the core, try import from local path (plugins). @@ -341,6 +357,7 @@ def import_cls(cls_name, expected_type): ) +@tracer.start_as_current_span("import_dynamic") def import_dynamic(package: str, name: str, prefix: str = "class"): """Import class or method dynamically from package and name. @@ -454,6 +471,7 @@ def str2bool(string: str) -> bool: raise argparse.ArgumentTypeError("Boolean value expected.") +@tracer.start_as_current_span("download_gcs_file") def download_gcs_file(url: str) -> dict: """Download config from GCS. @@ -463,6 +481,7 @@ def download_gcs_file(url: str) -> dict: Returns: dict: Loaded configuration. """ + trace.get_current_span().set_attribute("url", url) client = storage.Client() bucket, filepath = decode_gcs_url(url) bucket = client.get_bucket(bucket)