Skip to content

Commit

Permalink
Remove replay checking from initialize_reports
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Nov 24, 2023
1 parent 760b6c4 commit 93e1e94
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 130 deletions.
4 changes: 2 additions & 2 deletions daphne/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::{
#[async_trait(?Send)]
pub trait DapReportInitializer {
/// Initialize a sequence of reports that are in the "consumed" state by performing the early
/// validation steps (check if the report was replayed, belongs to a batch that has been
/// collected) and initializing VDAF preparation.
/// validation steps (belongs to a batch that has been collected) and initializing VDAF
/// preparation.
async fn initialize_reports<'req>(
&self,
is_leader: bool,
Expand Down
81 changes: 8 additions & 73 deletions daphne_worker/src/durable/reports_processed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@

use crate::{
config::DaphneWorkerConfig,
durable::{create_span_from_request, state_get, BINDING_DAP_REPORTS_PROCESSED},
durable::{create_span_from_request, BINDING_DAP_REPORTS_PROCESSED},
initialize_tracing, int_err,
};
use daphne::{
messages::{ReportId, ReportMetadata, TransitionFailure},
vdaf::{
EarlyReportState, EarlyReportStateConsumed, EarlyReportStateInitialized, VdafPrepMessage,
VdafPrepState, VdafVerifyKey,
EarlyReportStateConsumed, EarlyReportStateInitialized, VdafPrepMessage, VdafPrepState,
VdafVerifyKey,
},
DapError, VdafConfig,
VdafConfig,
};
use futures::{future::try_join_all, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use prio::codec::{CodecError, ParameterizedDecode};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, collections::HashSet, future::ready, ops::ControlFlow, time::Duration};
use std::{borrow::Cow, future::ready, ops::ControlFlow, time::Duration};
use tracing::Instrument;
use worker::{
async_trait, durable_object, js_sys, wasm_bindgen, wasm_bindgen_futures, worker_sys, Env,
Expand All @@ -26,9 +26,7 @@ use worker::{

use super::{req_parse, state_set_if_not_exists, Alarmed, DapDurableObject, GarbageCollectable};

pub(crate) const DURABLE_REPORTS_PROCESSED_INITIALIZE: &str =
"/internal/do/reports_processed/initialize";
pub(crate) const DURABLE_REPORTS_PROCESSED_INITIALIZED: &str =
pub(crate) const DURABLE_REPORTS_INITIALIZED_REGISTER: &str =
"/internal/do/reports_processed/initialized";

/// Durable Object (DO) for tracking which reports have been processed.
Expand Down Expand Up @@ -109,7 +107,7 @@ impl ReportsProcessed {
.await?;

match (req.path().as_ref(), req.method()) {
(DURABLE_REPORTS_PROCESSED_INITIALIZED, Method::Post) => {
(DURABLE_REPORTS_INITIALIZED_REGISTER, Method::Post) => {
let to_mark = req_parse::<Vec<ReportId>>(&mut req).await?;
let state = &self.state;
let replays = futures::stream::iter(&to_mark)
Expand All @@ -125,69 +123,6 @@ impl ReportsProcessed {

Response::from_json(&replays)
}
// Initialize a report:
// * Ensure the report wasn't replayed
// * Ensure the report won't be included in a batch that was already collected
// * Initialize VDAF preparation.
//
// Idempotent
// Input: `ReportsProcessedReq`
// Output: `ReportsProcessedResp`
(DURABLE_REPORTS_PROCESSED_INITIALIZE, Method::Post) => {
let reports_processed_request: ReportsProcessedReq = req_parse(&mut req).await?;
let result = try_join_all(
reports_processed_request
.consumed_reports
.iter()
.filter(|consumed_report| consumed_report.is_ready())
.map(|consumed_report| async {
if let Some(exists) = state_get::<bool>(
&self.state,
&format!("processed/{}", consumed_report.metadata().id.to_hex()),
)
.await?
{
if exists {
return Result::Ok(Some(consumed_report.metadata().id));
}
}
Ok(None)
}),
)
.await?;
let replayed_reports = result.into_iter().flatten().collect::<HashSet<ReportId>>();

let initialized_reports = reports_processed_request
.consumed_reports
.into_iter()
.map(|consumed_report| {
if replayed_reports.contains(&consumed_report.metadata().id) {
Ok(consumed_report.into_initialized_rejected_due_to(
TransitionFailure::ReportReplayed,
))
} else {
EarlyReportStateInitialized::initialize(
reports_processed_request.is_leader,
&reports_processed_request.vdaf_verify_key,
&reports_processed_request.vdaf_config,
consumed_report,
)
}
})
.collect::<std::result::Result<Vec<EarlyReportStateInitialized>, DapError>>()
.map_err(|e| {
int_err(format!(
"ReportsProcessed: failed to initialize a report: {e}"
))
})?;

Response::from_json(&ReportsProcessedResp {
is_leader: reports_processed_request.is_leader,
vdaf_config: reports_processed_request.vdaf_config,
initialized_reports,
})
}

_ => Err(int_err(format!(
"ReportsProcessed: unexpected request: method={:?}; path={:?}",
req.method(),
Expand Down
59 changes: 6 additions & 53 deletions daphne_worker/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use crate::{
DURABLE_AGGREGATE_STORE_GET, DURABLE_AGGREGATE_STORE_MARK_COLLECTED,
DURABLE_AGGREGATE_STORE_MERGE,
},
durable_name_agg_store,
reports_processed::DURABLE_REPORTS_PROCESSED_INITIALIZED,
BINDING_DAP_AGGREGATE_STORE, BINDING_DAP_REPORTS_PROCESSED,
durable_name_agg_store, BINDING_DAP_AGGREGATE_STORE,
},
now,
};
Expand Down Expand Up @@ -48,34 +46,14 @@ impl DapReportInitializer for DaphneWorker<'_> {
let span = task_config
.as_ref()
.batch_span_for_meta(part_batch_sel, consumed_reports.iter())?;
let mut reports_processed_request_data = HashMap::new();
let collected_reports = {
let task_id_hex = task_id.to_hex();

let mut agg_store_request_names = Vec::new();
for (bucket, ((), report_ids_and_time)) in span.iter() {
for (id, time) in report_ids_and_time {
let durable_name = self.config().durable_name_report_store(
task_config.as_ref(),
&task_id_hex,
id,
*time,
);

reports_processed_request_data
.entry(durable_name)
.or_insert_with(Vec::new)
.push(id);
}
agg_store_request_names.push((
bucket,
durable_name_agg_store(task_config.version, &task_id_hex, bucket),
));
}

// Send AggregateStore requests.
futures::stream::iter(agg_store_request_names)
.map(|(bucket, durable_name)| {
futures::stream::iter(span.iter())
.map(|(bucket, _)| {
let durable_name =
durable_name_agg_store(task_config.version, &task_id_hex, bucket);
durable
.get(
BINDING_DAP_AGGREGATE_STORE,
Expand All @@ -93,7 +71,7 @@ impl DapReportInitializer for DaphneWorker<'_> {
let min_time = self.least_valid_report_time(self.get_current_time());
let max_time = self.greatest_valid_report_time(self.get_current_time());

let mut initialized_reports = consumed_reports
let initialized_reports = consumed_reports
.into_iter()
.map(|consumed_report| {
let metadata = consumed_report.metadata();
Expand Down Expand Up @@ -123,31 +101,6 @@ impl DapReportInitializer for DaphneWorker<'_> {
.collect::<Result<Vec<_>, DapError>>()
.map_err(|e| fatal_error!(err = ?e, "failed to initialize a report"))?;

let replayed_reports_check = futures::stream::iter(reports_processed_request_data)
.map(|(durable_name, reports)| async {
durable
.post::<_, HashSet<ReportId>>(
BINDING_DAP_REPORTS_PROCESSED,
DURABLE_REPORTS_PROCESSED_INITIALIZED,
durable_name,
reports,
)
.await
})
.buffer_unordered(usize::MAX)
.try_fold(HashSet::new(), |mut acc, replays| async {
acc.extend(replays);
Ok(acc)
})
.await
.map_err(|e| fatal_error!(err = ?e, "checking for replayed reports"))?;

for rep in &mut initialized_reports {
if replayed_reports_check.contains(&rep.metadata().id) {
rep.reject_due_to(TransitionFailure::ReportReplayed);
}
}

Ok(initialized_reports)
}
}
Expand Down
1 change: 0 additions & 1 deletion daphne_worker_test/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ bindings = [
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
{ name = "DAP_HELPER_STATE_STORE", class_name = "HelperStateStore" },
{ name = "DAP_GARBAGE_COLLECTOR", class_name = "GarbageCollector" },
{ name = "DAP_REPORTS_PROCESSED", class_name = "ReportsProcessed" },
]


Expand Down
1 change: 0 additions & 1 deletion docker/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ bindings = [
{ name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" },
{ name = "DAP_HELPER_STATE_STORE", class_name = "HelperStateStore" },
{ name = "DAP_GARBAGE_COLLECTOR", class_name = "GarbageCollector" },
{ name = "DAP_REPORTS_PROCESSED", class_name = "ReportsProcessed" },
]


Expand Down

0 comments on commit 93e1e94

Please sign in to comment.