Skip to content

Commit

Permalink
draft09: Make max batch size optional for fixed-size queries
Browse files Browse the repository at this point in the history
  • Loading branch information
cjpatton committed Dec 4, 2023
1 parent 50d5811 commit 482996d
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 14 deletions.
21 changes: 14 additions & 7 deletions daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ impl DapGlobalConfig {
}

/// DAP Query configuration.
//
// TODO(cjpatton) Once we implement maximum batch lifetime, put the parameter here.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
Expand All @@ -210,9 +208,13 @@ pub enum DapQueryConfig {
/// specified by the query.
TimeInterval,

/// The "fixed-size" query type. The Leader partitions the reports into arbitary batches of
/// roughly the same size.
FixedSize { max_batch_size: u64 },
/// The "fixed-size" query type where by the Leader assigns reports to arbitrary batches
/// identified by batch IDs. This type includes an optional maximum batch size: if set, then
/// Aggregators are meant to stop aggregating reports when this limit is reached.
FixedSize {
#[serde(default)]
max_batch_size: Option<u64>,
},
}

impl DapQueryConfig {
Expand Down Expand Up @@ -720,8 +722,9 @@ impl DapTaskConfig {
report_count: u64,
) -> Result<bool, DapAbort> {
match self.query {
DapQueryConfig::TimeInterval => (),
DapQueryConfig::FixedSize { max_batch_size } => {
DapQueryConfig::FixedSize {
max_batch_size: Some(max_batch_size),
} => {
if report_count > max_batch_size {
return Err(DapAbort::InvalidBatchSize {
detail: format!(
Expand All @@ -731,6 +734,10 @@ impl DapTaskConfig {
});
}
}
DapQueryConfig::TimeInterval
| DapQueryConfig::FixedSize {
max_batch_size: None,
} => (),
};

Ok(report_count >= self.min_batch_size)
Expand Down
8 changes: 6 additions & 2 deletions daphne/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ mod test {
time_precision: Self::TASK_TIME_PRECISION,
expiration: now + Self::TASK_TIME_PRECISION,
min_batch_size: 1,
query: DapQueryConfig::FixedSize { max_batch_size: 2 },
query: DapQueryConfig::FixedSize {
max_batch_size: Some(2),
},
vdaf: vdaf_config.clone(),
vdaf_verify_key: VdafVerifyKey::Prio3(rng.gen()),
method: Default::default(),
Expand Down Expand Up @@ -1806,7 +1808,9 @@ mod test {
DapTaskParameters {
version,
min_batch_size: 1,
query: DapQueryConfig::FixedSize { max_batch_size: 2 },
query: DapQueryConfig::FixedSize {
max_batch_size: Some(2),
},
vdaf: vdaf_config,
..Default::default()
}
Expand Down
7 changes: 5 additions & 2 deletions daphne/src/taskprov.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,11 @@ fn url_from_bytes(task_id: &TaskId, url_bytes: &[u8]) -> Result<Url, DapAbort> {
impl DapQueryConfig {
fn try_from_taskprov(task_id: &TaskId, var: QueryConfigVar) -> Result<Self, DapAbort> {
match var {
QueryConfigVar::FixedSize { max_batch_size: 0 } => Ok(DapQueryConfig::FixedSize {
max_batch_size: None,
}),
QueryConfigVar::FixedSize { max_batch_size } => Ok(DapQueryConfig::FixedSize {
max_batch_size: max_batch_size.into(),
max_batch_size: Some(max_batch_size.into()),
}),
QueryConfigVar::TimeInterval => Ok(DapQueryConfig::TimeInterval),
QueryConfigVar::NotImplemented { typ, .. } => Err(DapAbort::InvalidTask {
Expand Down Expand Up @@ -323,7 +326,7 @@ impl TryFrom<&DapQueryConfig> for messages::taskprov::QueryConfigVar {
DapQueryConfig::TimeInterval => messages::taskprov::QueryConfigVar::TimeInterval,
DapQueryConfig::FixedSize { max_batch_size } => {
messages::taskprov::QueryConfigVar::FixedSize {
max_batch_size: (*max_batch_size).try_into().map_err(|_| {
max_batch_size: max_batch_size.unwrap_or(0).try_into().map_err(|_| {
fatal_error!(err = "task max batch size is too large for taskprov")
})?,
}
Expand Down
3 changes: 1 addition & 2 deletions daphne_worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,8 +996,7 @@ impl<'srv> DaphneWorker<'srv> {
let query = match (cmd.query_type, cmd.max_batch_size) {
(1, None) => DapQueryConfig::TimeInterval,
(1, Some(..)) => return Err(int_err("command failed: unexpected max batch size")),
(2, Some(max_batch_size)) => DapQueryConfig::FixedSize { max_batch_size },
(2, None) => return Err(int_err("command failed: missing max batch size")),
(2, max_batch_size) => DapQueryConfig::FixedSize { max_batch_size },
_ => return Err(int_err("command failed: unrecognized query type")),
};

Expand Down
2 changes: 1 addition & 1 deletion daphne_worker_test/tests/e2e/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl TestRunner {
Self::with(
version,
&DapQueryConfig::FixedSize {
max_batch_size: MAX_BATCH_SIZE,
max_batch_size: Some(MAX_BATCH_SIZE),
},
)
.await
Expand Down

0 comments on commit 482996d

Please sign in to comment.