Skip to content

Commit

Permalink
Enable taskprov tests for draft07
Browse files Browse the repository at this point in the history
  • Loading branch information
cjpatton committed Nov 22, 2023
1 parent fb09b07 commit 6cd6ab7
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 66 deletions.
1 change: 1 addition & 0 deletions daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ impl Extend<(DapBatchBucket, (ReportId, Time))> for DapAggregateSpan<()> {

/// Per-task DAP parameters.
#[derive(Clone, Deserialize, Serialize)]
#[cfg_attr(test, derive(PartialEq, Debug))]
pub struct DapTaskConfig {
/// The protocol version (i.e., which draft).
pub version: DapVersion,
Expand Down
159 changes: 96 additions & 63 deletions daphne/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,18 @@ async fn resolve_taskprov<S>(
mod test {
use super::{DapAggregator, DapAuthorizedSender, DapHelper, DapLeader};
use crate::{
assert_metrics_include, async_test_version, async_test_versions,
assert_metrics_include, async_test_versions,
auth::BearerToken,
constants::DapMediaType,
hpke::{HpkeDecrypter, HpkeKemId, HpkeReceiverConfig},
messages::{
taskprov, AggregateShareReq, AggregationJobContinueReq, AggregationJobInitReq,
AggregationJobResp, BatchId, BatchSelector, Collection, CollectionJobId, CollectionReq,
Extension, HpkeCiphertext, Interval, PartialBatchSelector, Query, Report, ReportId,
ReportMetadata, TaskId, Time, Transition, TransitionFailure, TransitionVar,
self, encode_base64url, AggregateShareReq, AggregationJobContinueReq,
AggregationJobInitReq, AggregationJobResp, BatchId, BatchSelector, Collection,
CollectionJobId, CollectionReq, Extension, HpkeCiphertext, Interval,
PartialBatchSelector, Query, Report, ReportId, ReportMetadata, TaskId, Time,
Transition, TransitionFailure, TransitionVar,
},
test_versions,
taskprov, test_versions,
testing::{AggStore, MockAggregator, MockAggregatorReportSelector},
vdaf::VdafVerifyKey,
DapAbort, DapAggregateShare, DapBatchBucket, DapCollectJob, DapGlobalConfig,
Expand Down Expand Up @@ -1754,85 +1755,117 @@ mod test {

async fn e2e_taskprov(version: DapVersion) {
let t = Test::new(version);
let vdaf = VdafConfig::Prio2 { dimension: 10 };

// Create the upload extension.
let taskprov_ext_payload = taskprov::TaskConfig {
task_info: "cool task".as_bytes().to_vec(),
leader_url: taskprov::UrlBytes {
bytes: b"https://leader.com/".to_vec(),
},
helper_url: taskprov::UrlBytes {
bytes: b"http://helper.org:8788/".to_vec(),
},
query_config: taskprov::QueryConfig {
time_precision: 3600,
max_batch_query_count: 1,
min_batch_size: 1,
var: taskprov::QueryConfigVar::FixedSize { max_batch_size: 2 },
},
task_expiration: t.now + 86400 * 14,
vdaf_config: taskprov::VdafConfig {
dp_config: taskprov::DpConfig::None,
var: taskprov::VdafTypeVar::Prio2 { dimension: 10 },
},
}
.get_encoded_with_param(&version);
let taskprov_id = super::taskprov::compute_task_id(version, &taskprov_ext_payload);
let (task_id, task_config, taskprov_advertisement, taskprov_report_extension_payload) = {
// Author: Specify the taskprov config.
let taskprov_config = messages::taskprov::TaskConfig {
task_info: "cool task".as_bytes().to_vec(),
leader_url: messages::taskprov::UrlBytes {
bytes: b"https://leader.com/".to_vec(),
},
helper_url: messages::taskprov::UrlBytes {
bytes: b"http://helper.org:8788/".to_vec(),
},
query_config: messages::taskprov::QueryConfig {
time_precision: 3600,
max_batch_query_count: 1,
min_batch_size: 1,
var: messages::taskprov::QueryConfigVar::FixedSize { max_batch_size: 2 },
},
task_expiration: t.now + 86400 * 14,
vdaf_config: messages::taskprov::VdafConfig {
dp_config: messages::taskprov::DpConfig::None,
var: messages::taskprov::VdafTypeVar::Prio2 { dimension: 10 },
},
};

// Client: Send upload request to Leader.
let encoded_taskprov_config = taskprov_config.get_encoded_with_param(&version);
let task_id = taskprov::compute_task_id(version, &encoded_taskprov_config);

// Compute the DAP task config.
let task_config = DapTaskConfig::try_from_taskprov(
version,
&task_id,
taskprov_config,
&t.leader.taskprov_vdaf_verify_key_init,
&t.leader.collector_hpke_config,
)
.unwrap();

let (taskprov_advertisement, taskprov_report_extension_payload) = match version {
DapVersion::Draft07 => {
(Some(encode_base64url(&encoded_taskprov_config)), Vec::new())
}
// draft02 compatibility: The taskprov config is advertised in an HTTP header in
// the latest draft. In draft02, it is carried by a report extension.
DapVersion::Draft02 => (None, encoded_taskprov_config),
};

(
task_id,
task_config,
taskprov_advertisement,
taskprov_report_extension_payload,
)
};

// Clients: Send upload request to Leader.
let hpke_config_list = [
t.leader
.get_hpke_config_for(version, Some(&taskprov_id))
.get_hpke_config_for(version, Some(&task_id))
.await
.unwrap()
.as_ref()
.clone(),
t.helper
.get_hpke_config_for(version, Some(&taskprov_id))
.get_hpke_config_for(version, Some(&task_id))
.await
.unwrap()
.as_ref()
.clone(),
];
let report = vdaf
.produce_report_with_extensions(
&hpke_config_list,
t.now,
&taskprov_id,
DapMeasurement::U32Vec(vec![1; 10]),
vec![Extension::Taskprov {
payload: taskprov_ext_payload,
}],
version,
)
.unwrap();
for _ in 0..task_config.min_batch_size {
let report = task_config
.vdaf
.produce_report_with_extensions(
&hpke_config_list,
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
vec![Extension::Taskprov {
payload: taskprov_report_extension_payload.clone(),
}],
version,
)
.unwrap();

let req = DapRequest {
version,
media_type: DapMediaType::Report,
task_id: Some(taskprov_id),
resource: DapResource::Undefined,
payload: report.get_encoded_with_param(&version),
url: Url::parse("https://leader.com/upload").unwrap(),
..Default::default()
};
t.leader.handle_upload_req(&req).await.unwrap();
let req = DapRequest {
version,
media_type: DapMediaType::Report,
task_id: Some(task_id),
resource: DapResource::Undefined,
payload: report.get_encoded_with_param(&version),
url: Url::parse("https://leader.com/upload").unwrap(),
taskprov: taskprov_advertisement.clone(),
..Default::default()
};
t.leader.handle_upload_req(&req).await.unwrap();
}

// Leader: Run aggregation job.
t.run_agg_job(&taskprov_id).await.unwrap();
t.run_agg_job(&task_id).await.unwrap();

// The Leader is now configured with the task.
let task_config = t.leader.unchecked_get_task_config(&taskprov_id).await;
assert_eq!(
t.leader.unchecked_get_task_config(&task_id).await,
task_config
);

// Collector: Create collection job and poll result.
let query = Query::FixedSizeByBatchId {
batch_id: t
.leader
.current_batch_id(&taskprov_id, &task_config)
.unwrap(),
batch_id: t.leader.current_batch_id(&task_id, &task_config).unwrap(),
};
t.run_col_job(&taskprov_id, &query).await.unwrap();
t.run_col_job(&task_id, &query).await.unwrap();

assert_metrics_include!(t.helper_registry, {
r#"inbound_request_counter{env="test_helper",host="helper.org",type="aggregate"}"#: 2,
Expand All @@ -1848,7 +1881,7 @@ mod test {
});
}

async_test_version! { e2e_taskprov, Draft02 }
async_test_versions! { e2e_taskprov }

fn early_metadata_checks(version: DapVersion) {
let t = Test::new(version);
Expand Down
5 changes: 4 additions & 1 deletion daphne/src/vdaf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ pub(crate) enum VdafError {
/// A VDAF verification key.
#[derive(Clone, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
#[cfg_attr(
any(test, feature = "test-utils"),
derive(deepsize::DeepSizeOf, PartialEq, Debug)
)]
pub enum VdafVerifyKey {
Prio3(#[serde(with = "hex")] [u8; VDAF_VERIFY_KEY_SIZE_PRIO3]),
Prio2(#[serde(with = "hex")] [u8; VDAF_VERIFY_KEY_SIZE_PRIO2]),
Expand Down
4 changes: 2 additions & 2 deletions daphne_worker_test/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! End-to-end tests for daphne.
use super::test_runner::{TestRunner, MIN_BATCH_SIZE, TIME_PRECISION};
use daphne::{
async_test_version, async_test_versions,
async_test_versions,
constants::DapMediaType,
messages::{
taskprov::{
Expand Down Expand Up @@ -1416,4 +1416,4 @@ async fn leader_collect_taskprov_ok(version: DapVersion) {
);
}

async_test_version! { leader_collect_taskprov_ok, Draft02 }
async_test_versions! { leader_collect_taskprov_ok }

0 comments on commit 6cd6ab7

Please sign in to comment.