From 1066290e0d5e6d2b8e39ffb729a72875dda3b624 Mon Sep 17 00:00:00 2001 From: mendess Date: Wed, 22 Nov 2023 11:14:51 +0000 Subject: [PATCH] Remove ReportsProcessed DO --- daphne_worker/src/config.rs | 16 - daphne_worker/src/durable/mod.rs | 1 - .../src/durable/reports_processed.rs | 317 ------------------ daphne_worker/src/roles/aggregator.rs | 31 +- daphne_worker_test/wrangler.toml | 5 - docker/wrangler.toml | 5 - 6 files changed, 2 insertions(+), 373 deletions(-) delete mode 100644 daphne_worker/src/durable/reports_processed.rs diff --git a/daphne_worker/src/config.rs b/daphne_worker/src/config.rs index 57dcc3130..e6a409bb1 100644 --- a/daphne_worker/src/config.rs +++ b/daphne_worker/src/config.rs @@ -121,10 +121,6 @@ pub(crate) struct DaphneWorkerConfig { /// configured by the Leader. pub(crate) helper_state_store_garbage_collect_after_secs: Option, - /// Additional time to wait before deletng an instance of ReportsProcessed. Added to the value - /// of the `report_storage_epoch_duration` field of the global DAP configuration. - pub(crate) processed_alarm_safety_interval: Duration, - /// Metrics push configuration. metrics_push_config: Option, } @@ -281,17 +277,6 @@ impl DaphneWorkerConfig { None }; - let processed_alarm_safety_interval = Duration::from_secs( - env.var("DAP_PROCESSED_ALARM_SAFETY_INTERVAL")? - .to_string() - .parse() - .map_err(|err| { - worker::Error::RustError(format!( - "Failed to parse DAP_PROCESSED_ALARM_SAFETY_INTERVAL: {err}" - )) - })?, - ); - const DAP_METRICS_PUSH_SERVER_URL: &str = "DAP_METRICS_PUSH_SERVER_URL"; const DAP_METRICS_PUSH_BEARER_TOKEN: &str = "DAP_METRICS_PUSH_BEARER_TOKEN"; let metrics_push_config = match ( @@ -331,7 +316,6 @@ impl DaphneWorkerConfig { taskprov, default_version, helper_state_store_garbage_collect_after_secs, - processed_alarm_safety_interval, metrics_push_config, }) } diff --git a/daphne_worker/src/durable/mod.rs b/daphne_worker/src/durable/mod.rs index 8d6b06e75..4e828a2f2 100644 --- a/daphne_worker/src/durable/mod.rs +++ b/daphne_worker/src/durable/mod.rs @@ -8,7 +8,6 @@ pub(crate) mod leader_agg_job_queue; pub(crate) mod leader_batch_queue; pub(crate) mod leader_col_job_queue; pub(crate) mod reports_pending; -pub(crate) mod reports_processed; use crate::{ int_err, now, diff --git a/daphne_worker/src/durable/reports_processed.rs b/daphne_worker/src/durable/reports_processed.rs deleted file mode 100644 index 1d5dd5a9a..000000000 --- a/daphne_worker/src/durable/reports_processed.rs +++ /dev/null @@ -1,317 +0,0 @@ -// Copyright (c) 2022 Cloudflare, Inc. All rights reserved. -// SPDX-License-Identifier: BSD-3-Clause - -use crate::{ - config::DaphneWorkerConfig, - durable::{create_span_from_request, state_get, BINDING_DAP_REPORTS_PROCESSED}, - initialize_tracing, int_err, -}; -use daphne::{ - messages::{ReportId, ReportMetadata, TransitionFailure}, - vdaf::{ - EarlyReportState, EarlyReportStateConsumed, EarlyReportStateInitialized, VdafPrepMessage, - VdafPrepState, VdafVerifyKey, - }, - DapError, VdafConfig, -}; -use futures::{future::try_join_all, StreamExt, TryStreamExt}; -use prio::codec::{CodecError, ParameterizedDecode}; -use serde::{Deserialize, Serialize}; -use std::{borrow::Cow, collections::HashSet, future::ready, ops::ControlFlow, time::Duration}; -use tracing::Instrument; -use worker::{ - async_trait, durable_object, js_sys, wasm_bindgen, wasm_bindgen_futures, worker_sys, Env, - Method, Request, Response, Result, State, -}; - -use super::{req_parse, state_set_if_not_exists, Alarmed, DapDurableObject, GarbageCollectable}; - -pub(crate) const DURABLE_REPORTS_PROCESSED_INITIALIZE: &str = - "/internal/do/reports_processed/initialize"; -pub(crate) const DURABLE_REPORTS_PROCESSED_INITIALIZED: &str = - "/internal/do/reports_processed/initialized"; - -/// Durable Object (DO) for tracking which reports have been processed. -/// -/// This object defines a single API endpoint, `DURABLE_REPORTS_PROCESSED_MARK_AGGREGATED`, which -/// is used to mark a set of reports as aggregated. It returns the set of reports in that have -/// already been aggregated (and thus need to be rejected by the caller). -/// -/// The schema for stored report IDs is as follows: -/// -/// ```text -/// processed/ -> bool -/// ``` -/// -/// where `` is the hex-encoded report ID. -#[durable_object] -pub struct ReportsProcessed { - #[allow(dead_code)] - state: State, - env: Env, - config: DaphneWorkerConfig, - touched: bool, - alarmed: bool, -} - -#[derive(Debug, Clone)] -struct ReportIdKey<'s>(&'s ReportId, String); - -impl<'id> From<&'id ReportId> for ReportIdKey<'id> { - fn from(id: &'id ReportId) -> Self { - ReportIdKey(id, format!("processed/{}", id.to_hex())) - } -} - -#[durable_object] -impl DurableObject for ReportsProcessed { - fn new(state: State, env: Env) -> Self { - initialize_tracing(&env); - let config = - DaphneWorkerConfig::from_worker_env(&env).expect("failed to load configuration"); - Self { - state, - env, - config, - touched: false, - alarmed: false, - } - } - - async fn fetch(&mut self, req: Request) -> Result { - let span = create_span_from_request(&req); - self.handle(req).instrument(span).await - } - - async fn alarm(&mut self) -> Result { - self.state.storage().delete_all().await?; - self.alarmed = false; - self.touched = false; - Response::from_json(&()) - } -} - -impl ReportsProcessed { - async fn handle(&mut self, req: Request) -> Result { - let mut req = match self - .schedule_for_garbage_collection(req, BINDING_DAP_REPORTS_PROCESSED) - .await? - { - ControlFlow::Continue(req) => req, - // This req was a GC request and as such we must return from this function. - ControlFlow::Break(_) => return Response::from_json(&()), - }; - - self.ensure_alarmed( - Duration::from_secs(self.config.global.report_storage_epoch_duration) - .saturating_add(self.config.processed_alarm_safety_interval), - ) - .await?; - - match (req.path().as_ref(), req.method()) { - (DURABLE_REPORTS_PROCESSED_INITIALIZED, Method::Post) => { - let to_mark = req_parse::>(&mut req).await?; - let state = &self.state; - let replays = futures::stream::iter(&to_mark) - .map(|id| async move { - state_set_if_not_exists(state, &format!("processed/{id}"), &true) - .await - .map(|o| o.is_some().then_some(id)) - }) - .buffer_unordered(usize::MAX) - .try_filter_map(|replay| ready(Ok(replay))) - .try_collect::>() - .await?; - - Response::from_json(&replays) - } - // Initialize a report: - // * Ensure the report wasn't replayed - // * Ensure the report won't be included in a batch that was already collected - // * Initialize VDAF preparation. - // - // Idempotent - // Input: `ReportsProcessedReq` - // Output: `ReportsProcessedResp` - (DURABLE_REPORTS_PROCESSED_INITIALIZE, Method::Post) => { - let reports_processed_request: ReportsProcessedReq = req_parse(&mut req).await?; - let result = try_join_all( - reports_processed_request - .consumed_reports - .iter() - .filter(|consumed_report| consumed_report.is_ready()) - .map(|consumed_report| async { - if let Some(exists) = state_get::( - &self.state, - &format!("processed/{}", consumed_report.metadata().id.to_hex()), - ) - .await? - { - if exists { - return Result::Ok(Some(consumed_report.metadata().id)); - } - } - Ok(None) - }), - ) - .await?; - let replayed_reports = result.into_iter().flatten().collect::>(); - - let initialized_reports = reports_processed_request - .consumed_reports - .into_iter() - .map(|consumed_report| { - if replayed_reports.contains(&consumed_report.metadata().id) { - Ok(consumed_report.into_initialized_rejected_due_to( - TransitionFailure::ReportReplayed, - )) - } else { - EarlyReportStateInitialized::initialize( - reports_processed_request.is_leader, - &reports_processed_request.vdaf_verify_key, - &reports_processed_request.vdaf_config, - consumed_report, - ) - } - }) - .collect::, DapError>>() - .map_err(|e| { - int_err(format!( - "ReportsProcessed: failed to initialize a report: {e}" - )) - })?; - - Response::from_json(&ReportsProcessedResp { - is_leader: reports_processed_request.is_leader, - vdaf_config: reports_processed_request.vdaf_config, - initialized_reports, - }) - } - - _ => Err(int_err(format!( - "ReportsProcessed: unexpected request: method={:?}; path={:?}", - req.method(), - req.path() - ))), - } - } -} - -impl DapDurableObject for ReportsProcessed { - #[inline(always)] - fn state(&self) -> &State { - &self.state - } - - #[inline(always)] - fn deployment(&self) -> crate::config::DaphneWorkerDeployment { - self.config.deployment - } -} - -#[async_trait::async_trait] -impl Alarmed for ReportsProcessed { - #[inline(always)] - fn alarmed(&mut self) -> &mut bool { - &mut self.alarmed - } -} - -#[async_trait::async_trait(?Send)] -impl GarbageCollectable for ReportsProcessed { - #[inline(always)] - fn touched(&mut self) -> &mut bool { - &mut self.touched - } - - #[inline(always)] - fn env(&self) -> &Env { - &self.env - } -} - -#[derive(Serialize, Deserialize)] -pub(crate) struct ReportsProcessedReq<'req> { - pub(crate) is_leader: bool, - pub(crate) vdaf_verify_key: VdafVerifyKey, - pub(crate) vdaf_config: VdafConfig, - pub(crate) consumed_reports: Vec>, -} - -#[derive(Serialize, Deserialize)] -#[serde(try_from = "ShadowReportsProcessedResp")] -pub(crate) struct ReportsProcessedResp<'req> { - pub(crate) is_leader: bool, - pub(crate) vdaf_config: VdafConfig, - pub(crate) initialized_reports: Vec>, -} - -// we need this custom deserializer because VdafPrepState and VdafPrepMessage don't implement -// Decode, only ParameterizedDecode. -#[derive(Deserialize)] -struct ShadowReportsProcessedResp { - pub(crate) is_leader: bool, - pub(crate) vdaf_config: VdafConfig, - pub(crate) initialized_reports: Vec, -} - -#[derive(Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum EarlyReportStateInitializedOwned { - Ready { - metadata: ReportMetadata, - #[serde(with = "hex")] - public_share: Vec, - #[serde(with = "hex")] - state: Vec, - #[serde(with = "hex")] - message: Vec, - }, - Rejected { - metadata: ReportMetadata, - failure: TransitionFailure, - }, -} - -impl TryFrom for ReportsProcessedResp<'_> { - type Error = CodecError; - - fn try_from(other: ShadowReportsProcessedResp) -> std::result::Result { - let initialized_reports = other - .initialized_reports - .into_iter() - .map(|initialized_report| match initialized_report { - EarlyReportStateInitializedOwned::Ready { - metadata, - public_share, - state, - message, - } => { - let state = VdafPrepState::get_decoded_with_param( - &(&other.vdaf_config, other.is_leader), - &state, - )?; - let message = VdafPrepMessage::get_decoded_with_param(&state, &message)?; - - Ok(EarlyReportStateInitialized::Ready { - metadata: Cow::Owned(metadata), - public_share: Cow::Owned(public_share), - state, - message, - }) - } - EarlyReportStateInitializedOwned::Rejected { metadata, failure } => { - Ok(EarlyReportStateInitialized::Rejected { - metadata: Cow::Owned(metadata), - failure, - }) - } - }) - .collect::, CodecError>>()?; - Ok(Self { - is_leader: other.is_leader, - vdaf_config: other.vdaf_config, - initialized_reports, - }) - } -} diff --git a/daphne_worker/src/roles/aggregator.rs b/daphne_worker/src/roles/aggregator.rs index f566a9d9a..d1d003c73 100644 --- a/daphne_worker/src/roles/aggregator.rs +++ b/daphne_worker/src/roles/aggregator.rs @@ -12,9 +12,7 @@ use crate::{ DURABLE_AGGREGATE_STORE_GET, DURABLE_AGGREGATE_STORE_MARK_COLLECTED, DURABLE_AGGREGATE_STORE_MERGE, }, - durable_name_agg_store, - reports_processed::DURABLE_REPORTS_PROCESSED_INITIALIZED, - BINDING_DAP_AGGREGATE_STORE, BINDING_DAP_REPORTS_PROCESSED, + durable_name_agg_store, BINDING_DAP_AGGREGATE_STORE, }, now, }; @@ -93,7 +91,7 @@ impl DapReportInitializer for DaphneWorker<'_> { let min_time = self.least_valid_report_time(self.get_current_time()); let max_time = self.greatest_valid_report_time(self.get_current_time()); - let mut initialized_reports = consumed_reports + let initialized_reports = consumed_reports .into_iter() .map(|consumed_report| { let metadata = consumed_report.metadata(); @@ -123,31 +121,6 @@ impl DapReportInitializer for DaphneWorker<'_> { .collect::, DapError>>() .map_err(|e| fatal_error!(err = ?e, "failed to initialize a report"))?; - let replayed_reports_check = futures::stream::iter(reports_processed_request_data) - .map(|(durable_name, reports)| async { - durable - .post::<_, HashSet>( - BINDING_DAP_REPORTS_PROCESSED, - DURABLE_REPORTS_PROCESSED_INITIALIZED, - durable_name, - reports, - ) - .await - }) - .buffer_unordered(usize::MAX) - .try_fold(HashSet::new(), |mut acc, replays| async { - acc.extend(replays); - Ok(acc) - }) - .await - .map_err(|e| fatal_error!(err = ?e, "checking for replayed reports"))?; - - for rep in &mut initialized_reports { - if replayed_reports_check.contains(&rep.metadata().id) { - rep.reject_due_to(TransitionFailure::ReportReplayed); - } - } - Ok(initialized_reports) } } diff --git a/daphne_worker_test/wrangler.toml b/daphne_worker_test/wrangler.toml index fb687926c..8f2ed019f 100644 --- a/daphne_worker_test/wrangler.toml +++ b/daphne_worker_test/wrangler.toml @@ -36,7 +36,6 @@ DAP_GLOBAL_CONFIG = """{ "supported_hpke_kems": ["x25519_hkdf_sha256"], "allow_taskprov": true }""" -DAP_PROCESSED_ALARM_SAFETY_INTERVAL = "300" DAP_DEPLOYMENT = "dev" DAP_TASKPROV_HPKE_COLLECTOR_CONFIG = """{ "id": 23, @@ -65,7 +64,6 @@ bindings = [ { name = "DAP_LEADER_COL_JOB_QUEUE", class_name = "LeaderCollectionJobQueue" }, { name = "DAP_GARBAGE_COLLECTOR", class_name = "GarbageCollector" }, { name = "DAP_REPORTS_PENDING", class_name = "ReportsPending" }, - { name = "DAP_REPORTS_PROCESSED", class_name = "ReportsProcessed" }, ] @@ -116,7 +114,6 @@ DAP_GLOBAL_CONFIG = """{ "supported_hpke_kems": ["x25519_hkdf_sha256"], "allow_taskprov": true }""" -DAP_PROCESSED_ALARM_SAFETY_INTERVAL = "300" DAP_DEPLOYMENT = "dev" DAP_TASKPROV_HPKE_COLLECTOR_CONFIG = """{ "id": 23, @@ -139,7 +136,6 @@ bindings = [ { name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" }, { name = "DAP_HELPER_STATE_STORE", class_name = "HelperStateStore" }, { name = "DAP_GARBAGE_COLLECTOR", class_name = "GarbageCollector" }, - { name = "DAP_REPORTS_PROCESSED", class_name = "ReportsProcessed" }, ] @@ -166,5 +162,4 @@ new_classes = [ "LeaderCollectionJobQueue", "GarbageCollector", "ReportsPending", - "ReportsProcessed", ] diff --git a/docker/wrangler.toml b/docker/wrangler.toml index effb831c6..337c28875 100644 --- a/docker/wrangler.toml +++ b/docker/wrangler.toml @@ -36,7 +36,6 @@ DAP_GLOBAL_CONFIG = """{ "supported_hpke_kems": ["x25519_hkdf_sha256"], "taskprov_version": "v02" }""" -DAP_PROCESSED_ALARM_SAFETY_INTERVAL = "300" DAP_DEPLOYMENT = "dev" DAP_TASKPROV_HPKE_COLLECTOR_CONFIG = """{ "id": 23, @@ -65,7 +64,6 @@ bindings = [ { name = "DAP_LEADER_COL_JOB_QUEUE", class_name = "LeaderCollectionJobQueue" }, { name = "DAP_GARBAGE_COLLECTOR", class_name = "GarbageCollector" }, { name = "DAP_REPORTS_PENDING", class_name = "ReportsPending" }, - { name = "DAP_REPORTS_PROCESSED", class_name = "ReportsProcessed" }, ] @@ -117,7 +115,6 @@ DAP_GLOBAL_CONFIG = """{ "allow_taskprov": true, "taskprov_version": "v02" }""" -DAP_PROCESSED_ALARM_SAFETY_INTERVAL = "300" DAP_DEPLOYMENT = "dev" DAP_TASKPROV_HPKE_COLLECTOR_CONFIG = """{ "id": 23, @@ -140,7 +137,6 @@ bindings = [ { name = "DAP_AGGREGATE_STORE", class_name = "AggregateStore" }, { name = "DAP_HELPER_STATE_STORE", class_name = "HelperStateStore" }, { name = "DAP_GARBAGE_COLLECTOR", class_name = "GarbageCollector" }, - { name = "DAP_REPORTS_PROCESSED", class_name = "ReportsProcessed" }, ] @@ -167,5 +163,4 @@ new_classes = [ "LeaderCollectionJobQueue", "GarbageCollector", "ReportsPending", - "ReportsProcessed", ]