Skip to content

Commit

Permalink
Port observations metrics over to open telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Apr 26, 2024
1 parent 85c3c93 commit 8f35368
Showing 1 changed file with 56 additions and 46 deletions.
102 changes: 56 additions & 46 deletions oonipipeline/src/oonipipeline/temporal/activities/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
make_db_rows,
maybe_delete_prev_range,
)
import statsd

from opentelemetry import trace

from temporalio import activity


Expand Down Expand Up @@ -69,61 +71,70 @@ def make_observations_for_file_entry_batch(
):
netinfodb = NetinfoDB(datadir=data_dir, download=False)
tbatch = PerfTimer()

tracer = trace.get_tracer(__name__)

current_span = trace.get_current_span()
with ClickhouseConnection(clickhouse, row_buffer_size=row_buffer_size) as db:
statsd_client = statsd.StatsClient("localhost", 8125)
ccs = ccs_set(probe_cc)
idx = 0
for bucket_name, s3path, ext, fe_size in file_entry_batch:
log.info(f"processing file s3://{bucket_name}/{s3path}")
t = PerfTimer()
try:
for msmt_dict in stream_measurements(
bucket_name=bucket_name, s3path=s3path, ext=ext
):
# Legacy cans don't allow us to pre-filter on the probe_cc, so
# we need to check for probe_cc consistency in here.
if ccs and msmt_dict["probe_cc"] not in ccs:
continue
msmt = None
try:
t = PerfTimer()
msmt = load_measurement(msmt_dict)
if not msmt.test_keys:
failure_count = 0
# Nest the traced span within the current span
with current_span, tracer.start_as_current_span(
"MakeObservations:stream_file_entry"
) as span:
log.info(f"processing file s3://{bucket_name}/{s3path}")
t = PerfTimer()
try:
for msmt_dict in stream_measurements(
bucket_name=bucket_name, s3path=s3path, ext=ext
):
# Legacy cans don't allow us to pre-filter on the probe_cc, so
# we need to check for probe_cc consistency in here.
if ccs and msmt_dict["probe_cc"] not in ccs:
continue
msmt = None
try:
t = PerfTimer()
msmt = load_measurement(msmt_dict)
if not msmt.test_keys:
log.error(
f"measurement with empty test_keys: ({msmt.measurement_uid})",
exc_info=True,
)
continue
write_observations_to_db(msmt, netinfodb, db, bucket_date)
idx += 1
except Exception as exc:
msmt_str = msmt_dict.get("report_id", None)
if msmt:
msmt_str = msmt.measurement_uid
log.error(
f"measurement with empty test_keys: ({msmt.measurement_uid})",
exc_info=True,
f"failed at idx: {idx} ({msmt_str})", exc_info=True
)
continue
write_observations_to_db(msmt, netinfodb, db, bucket_date)
# following types ignored due to https://github.com/jsocol/pystatsd/issues/146
statsd_client.timing("oonidata.make_observations.timed", t.ms, rate=0.1) # type: ignore
statsd_client.incr("oonidata.make_observations.msmt_count", rate=0.1) # type: ignore
idx += 1
except Exception as exc:
msmt_str = msmt_dict.get("report_id", None)
if msmt:
msmt_str = msmt.measurement_uid
log.error(f"failed at idx: {idx} ({msmt_str})", exc_info=True)

if fast_fail:
db.close()
raise exc
log.info(f"done processing file s3://{bucket_name}/{s3path}")
except Exception as exc:
log.error(
f"failed to stream measurements from s3://{bucket_name}/{s3path}"
)
log.error(exc)
statsd_client.timing("oonidata.dataclient.stream_file_entry.timed", t.ms, rate=0.1) # type: ignore
statsd_client.gauge("oonidata.dataclient.file_entry.kb_per_sec.gauge", fe_size / 1024 / t.s, rate=0.1) # type: ignore
statsd_client.timing("oonidata.dataclient.batch.timed", tbatch.ms) # type: ignore
failure_count += 1

if fast_fail:
db.close()
raise exc
log.info(f"done processing file s3://{bucket_name}/{s3path}")
except Exception as exc:
log.error(
f"failed to stream measurements from s3://{bucket_name}/{s3path}"
)
log.error(exc)
span.set_attribute("kb_per_sec", fe_size / 1024 / t.s)
span.set_attribute("fe_size", fe_size)
span.set_attribute("failure_count", failure_count)
span.set_attribute("bucket_name", bucket_name)

current_span.set_attribute("total_runtime_ms", tbatch.ms)
return idx


@activity.defn
def make_observation_in_day(params: MakeObservationsParams) -> dict:
statsd_client = statsd.StatsClient("localhost", 8125)

day = datetime.strptime(params.bucket_date, "%Y-%m-%d").date()

with ClickhouseConnection(params.clickhouse, row_buffer_size=10_000) as db:
Expand Down Expand Up @@ -171,7 +182,6 @@ def make_observation_in_day(params: MakeObservationsParams) -> dict:
log.info(
f"finished processing all batches in {total_t.pretty} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)"
)
statsd_client.timing("oonidata.dataclient.daily.timed", total_t.ms)

if len(prev_ranges) > 0:
with ClickhouseConnection(params.clickhouse, row_buffer_size=10_000) as db:
Expand Down

0 comments on commit 8f35368

Please sign in to comment.