Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add histogram of report shares by VDAF parameters #3166

Merged
merged 1 commit into from
May 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -8,8 +8,10 @@ use crate::{
AggregationJobWriter, AggregationJobWriterMetrics, InitialWrite,
ReportAggregationUpdate as _, WritableReportAggregation,
},
error::{handle_ping_pong_error, ReportRejection, ReportRejectionReason},
error::{BatchMismatch, OptOutReason},
error::{
handle_ping_pong_error, BatchMismatch, OptOutReason, ReportRejection,
ReportRejectionReason,
},
query_type::{CollectableQueryType, UploadableQueryType},
report_writer::{ReportWriteBatcher, WritableReport},
},
@@ -19,7 +21,10 @@ use crate::{
},
config::TaskprovConfig,
diagnostic::AggregationJobInitForbiddenMutationEvent,
metrics::{aggregate_step_failure_counter, report_aggregation_success_counter},
metrics::{
aggregate_step_failure_counter, aggregated_report_share_dimension_histogram,
report_aggregation_success_counter,
},
};
use backoff::{backoff::Backoff, Notify};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
@@ -163,13 +168,18 @@ struct AggregatorMetrics {
/// Counters tracking the number of failures to step client reports through the aggregation
/// process.
aggregate_step_failure_counter: Counter<u64>,
/// Histogram tracking the VDAF type and dimension of successfully-aggregated reports.
aggregated_report_share_dimension_histogram: Histogram<u64>,
}

impl AggregatorMetrics {
fn for_aggregation_job_writer(&self) -> AggregationJobWriterMetrics {
AggregationJobWriterMetrics {
report_aggregation_success_counter: self.report_aggregation_success_counter.clone(),
aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(),
aggregated_report_share_dimension_histogram: self
.aggregated_report_share_dimension_histogram
.clone(),
}
}
}
@@ -283,6 +293,8 @@ impl<C: Clock> Aggregator<C> {

let report_aggregation_success_counter = report_aggregation_success_counter(meter);
let aggregate_step_failure_counter = aggregate_step_failure_counter(meter);
let aggregated_report_share_dimension_histogram =
aggregated_report_share_dimension_histogram(meter);

let global_hpke_keypairs = GlobalHpkeKeypairCache::new(
datastore.clone(),
@@ -302,6 +314,7 @@ impl<C: Clock> Aggregator<C> {
upload_decode_failure_counter,
report_aggregation_success_counter,
aggregate_step_failure_counter,
aggregated_report_share_dimension_histogram,
},
global_hpke_keypairs,
peer_aggregators,
28 changes: 20 additions & 8 deletions aggregator/src/aggregator/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::aggregator::{
aggregate_step_failure_counter,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite, WritableReportAggregation,
use crate::{
aggregator::{
aggregate_step_failure_counter,
aggregation_job_writer::{
AggregationJobWriter, AggregationJobWriterMetrics, UpdateWrite,
WritableReportAggregation,
},
error::handle_ping_pong_error,
http_handlers::AGGREGATION_JOB_ROUTE,
query_type::CollectableQueryType,
report_aggregation_success_counter, send_request_to_helper, Error, RequestBody,
},
error::handle_ping_pong_error,
http_handlers::AGGREGATION_JOB_ROUTE,
query_type::CollectableQueryType,
report_aggregation_success_counter, send_request_to_helper, Error, RequestBody,
metrics::aggregated_report_share_dimension_histogram,
};
use anyhow::{anyhow, Result};
use backoff::backoff::Backoff;
@@ -64,6 +68,8 @@ pub struct AggregationJobDriver<B> {
#[derivative(Debug = "ignore")]
aggregate_step_failure_counter: Counter<u64>,
#[derivative(Debug = "ignore")]
aggregated_report_share_dimension_histogram: Histogram<u64>,
#[derivative(Debug = "ignore")]
job_cancel_counter: Counter<u64>,
#[derivative(Debug = "ignore")]
job_retry_counter: Counter<u64>,
@@ -83,6 +89,8 @@ where
) -> Self {
let aggregation_success_counter = report_aggregation_success_counter(meter);
let aggregate_step_failure_counter = aggregate_step_failure_counter(meter);
let aggregated_report_share_dimension_histogram =
aggregated_report_share_dimension_histogram(meter);

let job_cancel_counter = meter
.u64_counter("janus_job_cancellations")
@@ -112,6 +120,7 @@ where
backoff,
aggregation_success_counter,
aggregate_step_failure_counter,
aggregated_report_share_dimension_histogram,
job_cancel_counter,
job_retry_counter,
http_request_duration_histogram,
@@ -914,6 +923,9 @@ where
Some(AggregationJobWriterMetrics {
report_aggregation_success_counter: self.aggregation_success_counter.clone(),
aggregate_step_failure_counter: self.aggregate_step_failure_counter.clone(),
aggregated_report_share_dimension_histogram: self
.aggregated_report_share_dimension_histogram
.clone(),
}),
);
let new_step = aggregation_job.step().increment();
126 changes: 123 additions & 3 deletions aggregator/src/aggregator/aggregation_job_writer.rs
Original file line number Diff line number Diff line change
@@ -16,15 +16,21 @@ use janus_aggregator_core::{
query_type::AccumulableQueryType,
task::AggregatorTask,
};
#[cfg(feature = "fpvec_bounded_l2")]
use janus_core::vdaf::Prio3FixedPointBoundedL2VecSumBitSize;
use janus_core::{
report_id::ReportIdChecksumExt as _,
time::{Clock, IntervalExt},
vdaf::VdafInstance,
};
use janus_messages::{
AggregationJobId, Interval, PrepareError, PrepareResp, PrepareStepResult, ReportId,
ReportIdChecksum, Time,
};
use opentelemetry::{metrics::Counter, KeyValue};
use opentelemetry::{
metrics::{Counter, Histogram},
KeyValue,
};
use prio::{codec::Encode, vdaf};
use rand::{thread_rng, Rng as _};
use std::{borrow::Cow, collections::HashMap, marker::PhantomData, sync::Arc};
@@ -52,6 +58,7 @@ where
pub struct AggregationJobWriterMetrics {
pub report_aggregation_success_counter: Counter<u64>,
pub aggregate_step_failure_counter: Counter<u64>,
pub aggregated_report_share_dimension_histogram: Histogram<u64>,
}

#[allow(private_bounds)]
@@ -673,12 +680,125 @@ where
match ra_batch_aggregation.merged_with(batch_aggregation) {
Ok(merged_batch_aggregation) => {
self.writer.update_metrics(|metrics| {
metrics.report_aggregation_success_counter.add(1, &[])
metrics.report_aggregation_success_counter.add(1, &[]);

use VdafInstance::*;
match self.writer.task.vdaf() {
Prio3Count => metrics
.aggregated_report_share_dimension_histogram
.record(1, &[KeyValue::new("type", "Prio3Count")]),

Prio3Sum { bits } => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Prio3Sum")],
)
}

Prio3SumVec {
bits,
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits)
.unwrap_or(u64::MAX)
.saturating_mul(
u64::try_from(*length).unwrap_or(u64::MAX),
),
&[KeyValue::new("type", "Prio3SumVec")],
)
}

Prio3SumVecField64MultiproofHmacSha256Aes128 {
proofs: _,
bits,
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits)
.unwrap_or(u64::MAX)
.saturating_mul(
u64::try_from(*length).unwrap_or(u64::MAX),
),
&[KeyValue::new(
"type",
"Prio3SumVecField64MultiproofHmacSha256Aes128",
)],
)
}

Prio3Histogram {
length,
chunk_length: _,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Prio3Histogram")],
)
}

#[cfg(feature = "fpvec_bounded_l2")]
Prio3FixedPointBoundedL2VecSum {
bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize16,
dp_strategy: _,
length,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length)
.unwrap_or(u64::MAX)
.saturating_mul(16),
&[KeyValue::new(
"type",
"Prio3FixedPointBoundedL2VecSum",
)],
)
}

#[cfg(feature = "fpvec_bounded_l2")]
Prio3FixedPointBoundedL2VecSum {
bitsize: Prio3FixedPointBoundedL2VecSumBitSize::BitSize32,
dp_strategy: _,
length,
} => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*length)
.unwrap_or(u64::MAX)
.saturating_mul(32),
&[KeyValue::new(
"type",
"Prio3FixedPointBoundedL2VecSum",
)],
)
}

Poplar1 { bits } => {
metrics.aggregated_report_share_dimension_histogram.record(
u64::try_from(*bits).unwrap_or(u64::MAX),
&[KeyValue::new("type", "Poplar1")],
)
}

#[cfg(feature = "test-util")]
Fake { rounds: _ } | FakeFailsPrepInit | FakeFailsPrepStep => {
metrics
.aggregated_report_share_dimension_histogram
.record(0, &[KeyValue::new("type", "Fake")])
}
_ => metrics
.aggregated_report_share_dimension_histogram
.record(0, &[KeyValue::new("type", "unknown")]),
}
});
*batch_aggregation = merged_batch_aggregation
}
Err(err) => {
warn!(report_id = %report_aggregation.report_id(), ?err, "Couldn't update batch aggregation");
warn!(
report_id = %report_aggregation.report_id(),
?err,
"Couldn't update batch aggregation",
);
self.writer.update_metrics(|metrics| {
metrics
.aggregate_step_failure_counter
Loading