Skip to content

Commit

Permalink
Remove replay checking from initialize_reports
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Dec 4, 2023
1 parent cac5074 commit c23fd3a
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 460 deletions.
4 changes: 2 additions & 2 deletions daphne/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::{
#[async_trait(?Send)]
pub trait DapReportInitializer {
/// Initialize a sequence of reports that are in the "consumed" state by performing the early
/// validation steps (check if the report was replayed, belongs to a batch that has been
/// collected) and initializing VDAF preparation.
/// validation steps (belongs to a batch that has been collected) and initializing VDAF
/// preparation.
async fn initialize_reports<'req>(
&self,
is_leader: bool,
Expand Down
1 change: 1 addition & 0 deletions daphne/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ pub trait DapLeader<S>: DapAuthorizedSender<S> + DapAggregator<S> {
metrics,
)
.await?;

let (state, agg_job_init_req) = match transition {
DapLeaderAggregationJobTransition::Continued(state, agg_job_init_req) => {
(state, agg_job_init_req)
Expand Down
43 changes: 0 additions & 43 deletions daphne/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,49 +1100,6 @@ mod test {

async_test_versions! { handle_agg_job_req_transition_continue }

async fn handle_agg_job_req_failure_report_replayed(version: DapVersion) {
let t = Test::new(version);
let task_id = &t.time_interval_task_id;

let report = t.gen_test_report(task_id).await;
let req = t
.gen_test_agg_job_init_req(task_id, version, vec![report.clone()])
.await;

// Add dummy data to report store backend. This is done in a new scope so that the lock on the
// report store is released before running the test.
{
let mut guard = t
.helper
.report_store
.lock()
.expect("report_store: failed to lock");
let report_store = guard.entry(*task_id).or_default();
report_store.processed.insert(report.report_metadata.id);
}

// Get AggregationJobResp and then extract the transition data from inside.
let agg_job_resp = AggregationJobResp::get_decoded(
&t.helper.handle_agg_job_req(&req).await.unwrap().payload,
)
.unwrap();
let transition = &agg_job_resp.transitions[0];

// Expect failure due to report store marked as collected.
assert_matches!(
transition.var,
TransitionVar::Failed(TransitionFailure::ReportReplayed)
);

assert_metrics_include!(t.helper_registry, {
r#"report_counter{env="test_helper",host="helper.org",status="rejected_report_replayed"}"#: 1,
r#"inbound_request_counter{env="test_helper",host="helper.org",type="aggregate"}"#: 1,
r#"aggregation_job_counter{env="test_helper",host="helper.org",status="started"}"#: 1,
});
}

async_test_versions! { handle_agg_job_req_failure_report_replayed }

async fn handle_agg_job_req_failure_batch_collected(version: DapVersion) {
let t = Test::new(version);
let task_id = &t.time_interval_task_id;
Expand Down
27 changes: 5 additions & 22 deletions daphne/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,10 @@ impl MockAggregator {
/// Conducts checks on a received report to see whether:
/// 1) the report falls into a batch that has been already collected, or
/// 2) the report has been submitted by the client in the past.
fn check_report_early_fail(
fn check_report_has_been_collected(
&self,
task_id: &TaskId,
bucket: &DapBatchBucket,
id: &ReportId,
) -> Option<TransitionFailure> {
// Check AggStateStore to see whether the report is part of a batch that has already
// been collected.
Expand All @@ -735,16 +734,6 @@ impl MockAggregator {
return Some(TransitionFailure::BatchCollected);
}

// Check whether the same report has been submitted in the past.
let mut guard = self
.report_store
.lock()
.expect("report_store: failed to lock");
let report_store = guard.entry(*task_id).or_default();
if report_store.processed.contains(id) {
return Some(TransitionFailure::ReportReplayed);
}

None
}

Expand Down Expand Up @@ -948,7 +937,8 @@ impl DapReportInitializer for MockAggregator {
for (bucket, ((), report_ids_and_time)) in span.iter() {
for (id, _) in report_ids_and_time {
// Check whether Report has been collected or replayed.
if let Some(transition_failure) = self.check_report_early_fail(task_id, bucket, id)
if let Some(transition_failure) =
self.check_report_has_been_collected(task_id, bucket)
{
early_fails.insert(*id, transition_failure);
};
Expand Down Expand Up @@ -1240,21 +1230,14 @@ impl DapLeader<BearerToken> for MockAggregator {
.await
.expect("could not determine batch for report");

// Check whether Report has been collected or replayed.
if let Some(transition_failure) =
self.check_report_early_fail(task_id, &bucket, &report.report_metadata.id)
{
return Err(DapError::Transition(transition_failure));
};

// Store Report for future processing.
let mut guard = self
.report_store
.lock()
.expect("report_store: failed to lock");
let queue = guard
.get_mut(task_id)
.expect("report_store: unrecognized task")
.entry(*task_id)
.or_default()
.pending
.entry(bucket)
.or_default();
Expand Down
16 changes: 0 additions & 16 deletions daphne_worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ pub(crate) struct DaphneWorkerConfig {
/// configured by the Leader.
pub(crate) helper_state_store_garbage_collect_after_secs: Option<Duration>,

/// Additional time to wait before deletng an instance of ReportsProcessed. Added to the value
/// of the `report_storage_epoch_duration` field of the global DAP configuration.
pub(crate) processed_alarm_safety_interval: Duration,

/// Metrics push configuration.
metrics_push_config: Option<MetricsPushConfig>,
}
Expand Down Expand Up @@ -281,17 +277,6 @@ impl DaphneWorkerConfig {
None
};

let processed_alarm_safety_interval = Duration::from_secs(
env.var("DAP_PROCESSED_ALARM_SAFETY_INTERVAL")?
.to_string()
.parse()
.map_err(|err| {
worker::Error::RustError(format!(
"Failed to parse DAP_PROCESSED_ALARM_SAFETY_INTERVAL: {err}"
))
})?,
);

const DAP_METRICS_PUSH_SERVER_URL: &str = "DAP_METRICS_PUSH_SERVER_URL";
const DAP_METRICS_PUSH_BEARER_TOKEN: &str = "DAP_METRICS_PUSH_BEARER_TOKEN";
let metrics_push_config = match (
Expand Down Expand Up @@ -331,7 +316,6 @@ impl DaphneWorkerConfig {
taskprov,
default_version,
helper_state_store_garbage_collect_after_secs,
processed_alarm_safety_interval,
metrics_push_config,
})
}
Expand Down
1 change: 0 additions & 1 deletion daphne_worker/src/durable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub(crate) mod leader_agg_job_queue;
pub(crate) mod leader_batch_queue;
pub(crate) mod leader_col_job_queue;
pub(crate) mod reports_pending;
pub(crate) mod reports_processed;

use crate::{
int_err, now,
Expand Down
Loading

0 comments on commit c23fd3a

Please sign in to comment.