From 5f13168ff8d543928dc7ddd4c37de02f4aff2ca8 Mon Sep 17 00:00:00 2001 From: Brandon Pitman Date: Mon, 18 Nov 2024 15:13:14 -0800 Subject: [PATCH] Implement task start time. Note that DAP specifies the parameters as "task start time" and "task duration", rather than Janus' "task start time" and "task end time". Janus implements things this way so that either parameter can be left unspecified. --- aggregator/src/aggregator.rs | 24 ++-- aggregator/src/aggregator/error.rs | 10 +- .../aggregator/http_handlers/tests/report.rs | 18 +-- aggregator/src/aggregator/problem_details.rs | 2 +- aggregator/src/aggregator/report_writer.rs | 3 +- aggregator/src/aggregator/taskprov_tests.rs | 22 ++-- aggregator/src/aggregator/upload_tests.rs | 115 +++++++++++++++--- aggregator/src/binaries/janus_cli.rs | 4 +- aggregator/src/cache.rs | 30 ++--- aggregator_api/src/models.rs | 13 +- aggregator_api/src/routes.rs | 11 +- aggregator_api/src/tests.rs | 87 ++++++++----- aggregator_core/src/datastore.rs | 70 ++++++----- aggregator_core/src/datastore/models.rs | 28 +++-- aggregator_core/src/datastore/tests.rs | 25 ++-- aggregator_core/src/task.rs | 112 ++++++++++++----- core/src/time.rs | 9 +- db/00000000000001_initial_schema.up.sql | 25 ++-- docs/samples/tasks.yaml | 6 +- integration_tests/src/daphne.rs | 8 +- .../src/commands/janus_interop_aggregator.rs | 3 +- interop_binaries/src/lib.rs | 6 +- messages/src/lib.rs | 1 + messages/src/taskprov.rs | 24 ++-- messages/src/tests/aggregation.rs | 1 + 25 files changed, 433 insertions(+), 224 deletions(-) diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 901221e71..e60cf5902 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -724,7 +724,8 @@ impl Aggregator { task_config.query_config().query().try_into()?, vdaf_instance, vdaf_verify_key, - Some(*task_config.task_expiration()), + None, // TODO(#3636): update taskprov implementation to specify task start + Some(*task_config.task_end()), peer_aggregator.report_expiry_age().cloned(), task_config.query_config().min_batch_size() as u64, *task_config.query_config().time_precision(), @@ -763,7 +764,7 @@ impl Aggregator { /// Validate and authorize a taskprov request. Returns values necessary for determining whether /// we can opt into the task. This function might return an opt-out error for conditions that - /// are relevant for all DAP workflows (e.g. task expiration). + /// are relevant for all DAP workflows (e.g. task end). #[tracing::instrument(skip(self, aggregator_auth_token), err(level = Level::DEBUG))] async fn taskprov_authorize_request( &self, @@ -794,8 +795,8 @@ impl Aggregator { return Err(Error::UnauthorizedRequest(*task_id)); } - if self.clock.now() > *task_config.task_expiration() { - return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired)); + if self.clock.now() > *task_config.task_end() { + return Err(Error::InvalidTask(*task_id, OptOutReason::TaskEnded)); } debug!( @@ -1608,11 +1609,18 @@ impl VdafOps { return Err(reject_report(ReportRejectionReason::TooEarly).await?); } - // Reject reports after a task has expired. + // Reject reports before a task has started. + if let Some(task_start) = task.task_start() { + if report.metadata().time().is_before(task_start) { + return Err(reject_report(ReportRejectionReason::TaskNotStarted).await?); + } + } + + // Reject reports after a task has ended. // https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#section-4.4.2-20 - if let Some(task_expiration) = task.task_expiration() { - if report.metadata().time().is_after(task_expiration) { - return Err(reject_report(ReportRejectionReason::TaskExpired).await?); + if let Some(task_end) = task.task_end() { + if report.metadata().time().is_after(task_end) { + return Err(reject_report(ReportRejectionReason::TaskEnded).await?); } } diff --git a/aggregator/src/aggregator/error.rs b/aggregator/src/aggregator/error.rs index c763aef5e..bb73a7fe0 100644 --- a/aggregator/src/aggregator/error.rs +++ b/aggregator/src/aggregator/error.rs @@ -225,10 +225,11 @@ pub enum ReportRejectionReason { IntervalCollected, DecryptFailure, DecodeFailure, - TaskExpired, + TaskEnded, Expired, TooEarly, OutdatedHpkeConfig(HpkeConfigId), + TaskNotStarted, } impl ReportRejectionReason { @@ -239,12 +240,13 @@ impl ReportRejectionReason { } ReportRejectionReason::DecryptFailure => "Report share could not be decrypted.", ReportRejectionReason::DecodeFailure => "Report could not be decoded.", - ReportRejectionReason::TaskExpired => "Task has expired.", + ReportRejectionReason::TaskEnded => "Task has ended.", ReportRejectionReason::Expired => "Report timestamp is too old.", ReportRejectionReason::TooEarly => "Report timestamp is too far in the future.", ReportRejectionReason::OutdatedHpkeConfig(_) => { "Report is using an outdated HPKE configuration." } + ReportRejectionReason::TaskNotStarted => "Task has not started.", } } } @@ -260,8 +262,8 @@ impl Display for ReportRejectionReason { pub enum OptOutReason { #[error("this aggregator is not peered with the given {0} aggregator")] NoSuchPeer(Role), - #[error("task has expired")] - TaskExpired, + #[error("task has ended")] + TaskEnded, #[error("invalid task: {0}")] TaskParameters(#[from] task::Error), #[error("URL parse error: {0}")] diff --git a/aggregator/src/aggregator/http_handlers/tests/report.rs b/aggregator/src/aggregator/http_handlers/tests/report.rs index c9c29c74b..664132b0d 100644 --- a/aggregator/src/aggregator/http_handlers/tests/report.rs +++ b/aggregator/src/aggregator/http_handlers/tests/report.rs @@ -202,21 +202,21 @@ async fn upload_handler() { ) .await; - // Reports with timestamps past the task's expiration should be rejected. - let task_expire_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) - .with_task_expiration(Some(clock.now().add(&Duration::from_seconds(60)).unwrap())) + // Reports with timestamps past the task's end time should be rejected. + let task_end_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) + .with_task_end(Some(clock.now().add(&Duration::from_seconds(60)).unwrap())) .build(); - let leader_task_expire_soon = task_expire_soon.leader_view().unwrap(); + let leader_task_end_soon = task_end_soon.leader_view().unwrap(); datastore - .put_aggregator_task(&leader_task_expire_soon) + .put_aggregator_task(&leader_task_end_soon) .await .unwrap(); let report_2 = create_report( - &leader_task_expire_soon, + &leader_task_end_soon, &hpke_keypair, clock.now().add(&Duration::from_seconds(120)).unwrap(), ); - let mut test_conn = post(task_expire_soon.report_upload_uri().unwrap().path()) + let mut test_conn = post(task_end_soon.report_upload_uri().unwrap().path()) .with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE) .with_request_body(report_2.get_encoded().unwrap()) .run_async(&handler) @@ -226,8 +226,8 @@ async fn upload_handler() { Status::BadRequest, "reportRejected", "Report could not be processed.", - task_expire_soon.id(), - Some(ReportRejectionReason::TaskExpired.detail()), + task_end_soon.id(), + Some(ReportRejectionReason::TaskEnded.detail()), ) .await; diff --git a/aggregator/src/aggregator/problem_details.rs b/aggregator/src/aggregator/problem_details.rs index f5d7f5e36..5f380e4d5 100644 --- a/aggregator/src/aggregator/problem_details.rs +++ b/aggregator/src/aggregator/problem_details.rs @@ -207,7 +207,7 @@ mod tests { random(), random(), RealClock::default().now(), - ReportRejectionReason::TaskExpired + ReportRejectionReason::TaskEnded )) }), Some(DapProblemType::ReportRejected), diff --git a/aggregator/src/aggregator/report_writer.rs b/aggregator/src/aggregator/report_writer.rs index b5adf5058..eee774a02 100644 --- a/aggregator/src/aggregator/report_writer.rs +++ b/aggregator/src/aggregator/report_writer.rs @@ -344,10 +344,11 @@ impl TaskUploadCounters { ReportRejectionReason::IntervalCollected => entry.increment_interval_collected(), ReportRejectionReason::DecryptFailure => entry.increment_report_decrypt_failure(), ReportRejectionReason::DecodeFailure => entry.increment_report_decode_failure(), - ReportRejectionReason::TaskExpired => entry.increment_task_expired(), + ReportRejectionReason::TaskEnded => entry.increment_task_ended(), ReportRejectionReason::Expired => entry.increment_report_expired(), ReportRejectionReason::TooEarly => entry.increment_report_too_early(), ReportRejectionReason::OutdatedHpkeConfig(_) => entry.increment_report_outdated_key(), + ReportRejectionReason::TaskNotStarted => entry.increment_task_not_started(), } } diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index c4ab23ae3..9fcb770f4 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -154,7 +154,7 @@ where let time_precision = Duration::from_seconds(1); let min_batch_size = 1; - let task_expiration = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap(); + let task_end = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap(); let task_config = TaskConfig::new( Vec::from("foobar".as_bytes()), "https://leader.example.com/".as_bytes().try_into().unwrap(), @@ -164,7 +164,7 @@ where min_batch_size, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, vdaf_config, ) .unwrap(); @@ -185,7 +185,7 @@ where .with_leader_aggregator_endpoint(Url::parse("https://leader.example.com/").unwrap()) .with_helper_aggregator_endpoint(Url::parse("https://helper.example.com/").unwrap()) .with_vdaf_verify_key(vdaf_verify_key) - .with_task_expiration(Some(task_expiration)) + .with_task_end(Some(task_end)) .with_report_expiry_age(peer_aggregator.report_expiry_age().copied()) .with_min_batch_size(min_batch_size as u64) .with_time_precision(Duration::from_seconds(1)) @@ -560,7 +560,7 @@ async fn taskprov_aggregate_init_malformed_extension() { } #[tokio::test] -async fn taskprov_opt_out_task_expired() { +async fn taskprov_opt_out_task_ended() { let test = TaskprovTestCase::new().await; let (transcript, report_share, _) = test.next_report_share(); @@ -582,7 +582,7 @@ async fn taskprov_opt_out_task_expired() { .primary_aggregator_auth_token() .request_authentication(); - // Advance clock past task expiry. + // Advance clock past task end time. test.clock.advance(&Duration::from_hours(48).unwrap()); let mut test_conn = put(test @@ -631,7 +631,7 @@ async fn taskprov_opt_out_mismatched_task_id() { let aggregation_job_id: AggregationJobId = random(); - let task_expiration = test + let task_end = test .clock .now() .add(&Duration::from_hours(24).unwrap()) @@ -646,7 +646,7 @@ async fn taskprov_opt_out_mismatched_task_id() { 100, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, VdafConfig::new( DpConfig::new(DpMechanism::None), VdafType::Fake { rounds: 2 }, @@ -708,7 +708,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() { let aggregation_job_id: AggregationJobId = random(); - let task_expiration = test + let task_end = test .clock .now() .add(&Duration::from_hours(24).unwrap()) @@ -723,7 +723,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() { 100, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, VdafConfig::new( DpConfig::new(DpMechanism::None), VdafType::Fake { rounds: 2 }, @@ -786,7 +786,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() { let aggregation_job_id: AggregationJobId = random(); - let task_expiration = test + let task_end = test .clock .now() .add(&Duration::from_hours(24).unwrap()) @@ -801,7 +801,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() { 100, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, VdafConfig::new( DpConfig::new(DpMechanism::None), VdafType::Fake { rounds: 2 }, diff --git a/aggregator/src/aggregator/upload_tests.rs b/aggregator/src/aggregator/upload_tests.rs index f8a0d6a23..f432f711d 100644 --- a/aggregator/src/aggregator/upload_tests.rs +++ b/aggregator/src/aggregator/upload_tests.rs @@ -167,7 +167,9 @@ async fn upload() { assert_eq!( got_counter, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 1, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 1, 0, 0, 0 + )) ) } @@ -230,7 +232,9 @@ async fn upload_batch() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 100, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 100, 0, 0, 0 + )) ); } @@ -297,7 +301,9 @@ async fn upload_wrong_hpke_config_id() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 1, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 1, 0, 0, 0, 0 + )) ) } @@ -344,7 +350,9 @@ async fn upload_report_in_the_future_boundary_condition() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 1, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 1, 0, 0, 0 + )) ) } @@ -400,7 +408,9 @@ async fn upload_report_in_the_future_past_clock_skew() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 0, 1, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 0, 1, 0, 0 + )) ) } @@ -481,12 +491,14 @@ async fn upload_report_for_collected_batch() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(1, 0, 0, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 1, 0, 0, 0, 0, 0, 0, 0, 0 + )) ) } #[tokio::test] -async fn upload_report_task_expired() { +async fn upload_report_task_not_started() { let mut runtime_manager = TestRuntimeManager::new(); let UploadTest { aggregator, @@ -501,14 +513,77 @@ async fn upload_report_task_expired() { ) .await; + // Set the task start time to the future, and generate & upload a report from before that time. let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) - .with_task_expiration(Some(clock.now())) + .with_task_start(Some( + clock.now().add(&Duration::from_seconds(3600)).unwrap(), + )) .build() .leader_view() .unwrap(); datastore.put_aggregator_task(&task).await.unwrap(); - // Advance the clock to expire the task. + let report = create_report(&task, &hpke_keypair, clock.now()); + + // Try to upload the report, verify that we get the expected error. + let error = aggregator + .handle_upload(task.id(), &report.get_encoded().unwrap()) + .await + .unwrap_err(); + assert_matches!( + error.as_ref(), + Error::ReportRejected(rejection) => { + assert_eq!(task.id(), rejection.task_id()); + assert_eq!(report.metadata().id(), rejection.report_id()); + assert_eq!(report.metadata().time(), rejection.time()); + assert_matches!(rejection.reason(), ReportRejectionReason::TaskNotStarted); + } + ); + + // Wait for the report writer to have completed one write task. + runtime_manager + .wait_for_completed_tasks("aggregator", 1) + .await; + + let got_counters = datastore + .run_unnamed_tx(|tx| { + let task_id = *task.id(); + Box::pin(async move { tx.get_task_upload_counter(&task_id).await }) + }) + .await + .unwrap(); + assert_eq!( + got_counters, + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 0, 0, 1, 0 + )) + ) +} + +#[tokio::test] +async fn upload_report_task_ended() { + let mut runtime_manager = TestRuntimeManager::new(); + let UploadTest { + aggregator, + clock, + datastore, + ephemeral_datastore: _ephemeral_datastore, + hpke_keypair, + .. + } = UploadTest::new_with_runtime( + default_aggregator_config(), + runtime_manager.with_label("aggregator"), + ) + .await; + + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) + .with_task_end(Some(clock.now())) + .build() + .leader_view() + .unwrap(); + datastore.put_aggregator_task(&task).await.unwrap(); + + // Advance the clock to end the task. clock.advance(&Duration::from_seconds(1)); let report = create_report(&task, &hpke_keypair, clock.now()); @@ -523,7 +598,7 @@ async fn upload_report_task_expired() { assert_eq!(task.id(), rejection.task_id()); assert_eq!(report.metadata().id(), rejection.report_id()); assert_eq!(report.metadata().time(), rejection.time()); - assert_matches!(rejection.reason(), ReportRejectionReason::TaskExpired); + assert_matches!(rejection.reason(), ReportRejectionReason::TaskEnded); } ); @@ -541,7 +616,9 @@ async fn upload_report_task_expired() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 0, 0, 1)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 0, 0, 0, 1 + )) ) } @@ -602,7 +679,9 @@ async fn upload_report_report_expired() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 1, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 1, 0, 0, 0, 0, 0 + )) ) } @@ -662,7 +741,9 @@ async fn upload_report_faulty_encryption() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 1, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 1, 0, 0, 0, 0, 0, 0 + )) ) } @@ -723,7 +804,9 @@ async fn upload_report_public_share_decode_failure() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 1, 0, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 1, 0, 0, 0, 0, 0, 0, 0 + )) ) } @@ -798,6 +881,8 @@ async fn upload_report_leader_input_share_decode_failure() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 1, 0, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 1, 0, 0, 0, 0, 0, 0, 0 + )) ) } diff --git a/aggregator/src/binaries/janus_cli.rs b/aggregator/src/binaries/janus_cli.rs index b969341e3..02c1b3bc2 100644 --- a/aggregator/src/binaries/janus_cli.rs +++ b/aggregator/src/binaries/janus_cli.rs @@ -1328,7 +1328,7 @@ mod tests { bits: 2 role: Leader vdaf_verify_key: - task_expiration: 9000000000 + task_end: 9000000000 min_batch_size: 10 time_precision: 300 tolerable_clock_skew: 600 @@ -1351,7 +1351,7 @@ mod tests { bits: 2 role: Helper vdaf_verify_key: - task_expiration: 9000000000 + task_end: 9000000000 min_batch_size: 10 time_precision: 300 tolerable_clock_skew: 600 diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index 9ae36ce3b..fbf8d757e 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -367,14 +367,12 @@ mod tests { assert_eq!(task_aggregator.task.id(), task.id()); // Modify the task. - let new_expiration = Time::from_seconds_since_epoch(100); + let new_end = Time::from_seconds_since_epoch(100); datastore .run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { - tx.update_task_expiration(&task_id, Some(&new_expiration)) - .await - .unwrap(); + tx.update_task_end(&task_id, Some(&new_end)).await.unwrap(); Ok(()) }) }) @@ -385,8 +383,8 @@ mod tests { // previous task. let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); assert!( - (task_aggregator.task.task_expiration() == task.task_expiration()) - || (task_aggregator.task.task_expiration() == Some(&new_expiration)) + (task_aggregator.task.task_end() == task.task_end()) + || (task_aggregator.task.task_end() == Some(&new_end)) ); // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort @@ -394,10 +392,7 @@ mod tests { sleep(Duration::from_secs(1)).await; let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); - assert_eq!( - task_aggregator.task.task_expiration(), - Some(&new_expiration) - ); + assert_eq!(task_aggregator.task.task_end(), Some(&new_end)); } #[tokio::test] @@ -441,14 +436,12 @@ mod tests { assert_eq!(task_aggregator.task.id(), task.id()); // Modify the task. - let new_expiration = Time::from_seconds_since_epoch(100); + let new_end = Time::from_seconds_since_epoch(100); datastore .run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { - tx.update_task_expiration(&task_id, Some(&new_expiration)) - .await - .unwrap(); + tx.update_task_end(&task_id, Some(&new_end)).await.unwrap(); Ok(()) }) }) @@ -459,16 +452,13 @@ mod tests { // previous value. let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); assert!( - (task_aggregator.task.task_expiration() == task.task_expiration()) - || (task_aggregator.task.task_expiration() == Some(&new_expiration)) + (task_aggregator.task.task_end() == task.task_end()) + || (task_aggregator.task.task_end() == Some(&new_end)) ); sleep(Duration::from_secs(1)).await; let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); - assert_eq!( - task_aggregator.task.task_expiration(), - Some(&new_expiration) - ); + assert_eq!(task_aggregator.task.task_end(), Some(&new_end)); } } diff --git a/aggregator_api/src/models.rs b/aggregator_api/src/models.rs index 655bead05..0b6e82006 100644 --- a/aggregator_api/src/models.rs +++ b/aggregator_api/src/models.rs @@ -72,8 +72,10 @@ pub(crate) struct PostTaskReq { /// The VDAF verification key used for this DAP task, as Base64 encoded bytes. Task ID is /// derived from the verify key. pub(crate) vdaf_verify_key: String, + /// The time before which the task is considered invalid. + pub(crate) task_start: Option