From 482996d7e3a44a3753298e887242d88df229000f Mon Sep 17 00:00:00 2001 From: Christopher Patton Date: Fri, 1 Dec 2023 09:40:24 -0800 Subject: [PATCH] draft09: Make max batch size optional for fixed-size queries --- daphne/src/lib.rs | 21 ++++++++++++++------- daphne/src/roles/mod.rs | 8 ++++++-- daphne/src/taskprov.rs | 7 +++++-- daphne_worker/src/config.rs | 3 +-- daphne_worker_test/tests/e2e/test_runner.rs | 2 +- 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/daphne/src/lib.rs b/daphne/src/lib.rs index c7cbc627b..aa5d05477 100644 --- a/daphne/src/lib.rs +++ b/daphne/src/lib.rs @@ -200,8 +200,6 @@ impl DapGlobalConfig { } /// DAP Query configuration. -// -// TODO(cjpatton) Once we implement maximum batch lifetime, put the parameter here. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] #[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))] @@ -210,9 +208,13 @@ pub enum DapQueryConfig { /// specified by the query. TimeInterval, - /// The "fixed-size" query type. The Leader partitions the reports into arbitary batches of - /// roughly the same size. - FixedSize { max_batch_size: u64 }, + /// The "fixed-size" query type where by the Leader assigns reports to arbitrary batches + /// identified by batch IDs. This type includes an optional maximum batch size: if set, then + /// Aggregators are meant to stop aggregating reports when this limit is reached. + FixedSize { + #[serde(default)] + max_batch_size: Option, + }, } impl DapQueryConfig { @@ -720,8 +722,9 @@ impl DapTaskConfig { report_count: u64, ) -> Result { match self.query { - DapQueryConfig::TimeInterval => (), - DapQueryConfig::FixedSize { max_batch_size } => { + DapQueryConfig::FixedSize { + max_batch_size: Some(max_batch_size), + } => { if report_count > max_batch_size { return Err(DapAbort::InvalidBatchSize { detail: format!( @@ -731,6 +734,10 @@ impl DapTaskConfig { }); } } + DapQueryConfig::TimeInterval + | DapQueryConfig::FixedSize { + max_batch_size: None, + } => (), }; Ok(report_count >= self.min_batch_size) diff --git a/daphne/src/roles/mod.rs b/daphne/src/roles/mod.rs index c5eb44c6e..1f68a6f69 100644 --- a/daphne/src/roles/mod.rs +++ b/daphne/src/roles/mod.rs @@ -307,7 +307,9 @@ mod test { time_precision: Self::TASK_TIME_PRECISION, expiration: now + Self::TASK_TIME_PRECISION, min_batch_size: 1, - query: DapQueryConfig::FixedSize { max_batch_size: 2 }, + query: DapQueryConfig::FixedSize { + max_batch_size: Some(2), + }, vdaf: vdaf_config.clone(), vdaf_verify_key: VdafVerifyKey::Prio3(rng.gen()), method: Default::default(), @@ -1806,7 +1808,9 @@ mod test { DapTaskParameters { version, min_batch_size: 1, - query: DapQueryConfig::FixedSize { max_batch_size: 2 }, + query: DapQueryConfig::FixedSize { + max_batch_size: Some(2), + }, vdaf: vdaf_config, ..Default::default() } diff --git a/daphne/src/taskprov.rs b/daphne/src/taskprov.rs index 1ed99ea46..9fc0e654c 100644 --- a/daphne/src/taskprov.rs +++ b/daphne/src/taskprov.rs @@ -205,8 +205,11 @@ fn url_from_bytes(task_id: &TaskId, url_bytes: &[u8]) -> Result { impl DapQueryConfig { fn try_from_taskprov(task_id: &TaskId, var: QueryConfigVar) -> Result { match var { + QueryConfigVar::FixedSize { max_batch_size: 0 } => Ok(DapQueryConfig::FixedSize { + max_batch_size: None, + }), QueryConfigVar::FixedSize { max_batch_size } => Ok(DapQueryConfig::FixedSize { - max_batch_size: max_batch_size.into(), + max_batch_size: Some(max_batch_size.into()), }), QueryConfigVar::TimeInterval => Ok(DapQueryConfig::TimeInterval), QueryConfigVar::NotImplemented { typ, .. } => Err(DapAbort::InvalidTask { @@ -323,7 +326,7 @@ impl TryFrom<&DapQueryConfig> for messages::taskprov::QueryConfigVar { DapQueryConfig::TimeInterval => messages::taskprov::QueryConfigVar::TimeInterval, DapQueryConfig::FixedSize { max_batch_size } => { messages::taskprov::QueryConfigVar::FixedSize { - max_batch_size: (*max_batch_size).try_into().map_err(|_| { + max_batch_size: max_batch_size.unwrap_or(0).try_into().map_err(|_| { fatal_error!(err = "task max batch size is too large for taskprov") })?, } diff --git a/daphne_worker/src/config.rs b/daphne_worker/src/config.rs index 8c0609366..4ef273fed 100644 --- a/daphne_worker/src/config.rs +++ b/daphne_worker/src/config.rs @@ -996,8 +996,7 @@ impl<'srv> DaphneWorker<'srv> { let query = match (cmd.query_type, cmd.max_batch_size) { (1, None) => DapQueryConfig::TimeInterval, (1, Some(..)) => return Err(int_err("command failed: unexpected max batch size")), - (2, Some(max_batch_size)) => DapQueryConfig::FixedSize { max_batch_size }, - (2, None) => return Err(int_err("command failed: missing max batch size")), + (2, max_batch_size) => DapQueryConfig::FixedSize { max_batch_size }, _ => return Err(int_err("command failed: unrecognized query type")), }; diff --git a/daphne_worker_test/tests/e2e/test_runner.rs b/daphne_worker_test/tests/e2e/test_runner.rs index e67c0aabb..d5dee9d8b 100644 --- a/daphne_worker_test/tests/e2e/test_runner.rs +++ b/daphne_worker_test/tests/e2e/test_runner.rs @@ -59,7 +59,7 @@ impl TestRunner { Self::with( version, &DapQueryConfig::FixedSize { - max_batch_size: MAX_BATCH_SIZE, + max_batch_size: Some(MAX_BATCH_SIZE), }, ) .await