diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index 901221e71..e60cf5902 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -724,7 +724,8 @@ impl Aggregator { task_config.query_config().query().try_into()?, vdaf_instance, vdaf_verify_key, - Some(*task_config.task_expiration()), + None, // TODO(#3636): update taskprov implementation to specify task start + Some(*task_config.task_end()), peer_aggregator.report_expiry_age().cloned(), task_config.query_config().min_batch_size() as u64, *task_config.query_config().time_precision(), @@ -763,7 +764,7 @@ impl Aggregator { /// Validate and authorize a taskprov request. Returns values necessary for determining whether /// we can opt into the task. This function might return an opt-out error for conditions that - /// are relevant for all DAP workflows (e.g. task expiration). + /// are relevant for all DAP workflows (e.g. task end). #[tracing::instrument(skip(self, aggregator_auth_token), err(level = Level::DEBUG))] async fn taskprov_authorize_request( &self, @@ -794,8 +795,8 @@ impl Aggregator { return Err(Error::UnauthorizedRequest(*task_id)); } - if self.clock.now() > *task_config.task_expiration() { - return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired)); + if self.clock.now() > *task_config.task_end() { + return Err(Error::InvalidTask(*task_id, OptOutReason::TaskEnded)); } debug!( @@ -1608,11 +1609,18 @@ impl VdafOps { return Err(reject_report(ReportRejectionReason::TooEarly).await?); } - // Reject reports after a task has expired. + // Reject reports before a task has started. + if let Some(task_start) = task.task_start() { + if report.metadata().time().is_before(task_start) { + return Err(reject_report(ReportRejectionReason::TaskNotStarted).await?); + } + } + + // Reject reports after a task has ended. // https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#section-4.4.2-20 - if let Some(task_expiration) = task.task_expiration() { - if report.metadata().time().is_after(task_expiration) { - return Err(reject_report(ReportRejectionReason::TaskExpired).await?); + if let Some(task_end) = task.task_end() { + if report.metadata().time().is_after(task_end) { + return Err(reject_report(ReportRejectionReason::TaskEnded).await?); } } diff --git a/aggregator/src/aggregator/error.rs b/aggregator/src/aggregator/error.rs index c763aef5e..bb73a7fe0 100644 --- a/aggregator/src/aggregator/error.rs +++ b/aggregator/src/aggregator/error.rs @@ -225,10 +225,11 @@ pub enum ReportRejectionReason { IntervalCollected, DecryptFailure, DecodeFailure, - TaskExpired, + TaskEnded, Expired, TooEarly, OutdatedHpkeConfig(HpkeConfigId), + TaskNotStarted, } impl ReportRejectionReason { @@ -239,12 +240,13 @@ impl ReportRejectionReason { } ReportRejectionReason::DecryptFailure => "Report share could not be decrypted.", ReportRejectionReason::DecodeFailure => "Report could not be decoded.", - ReportRejectionReason::TaskExpired => "Task has expired.", + ReportRejectionReason::TaskEnded => "Task has ended.", ReportRejectionReason::Expired => "Report timestamp is too old.", ReportRejectionReason::TooEarly => "Report timestamp is too far in the future.", ReportRejectionReason::OutdatedHpkeConfig(_) => { "Report is using an outdated HPKE configuration." } + ReportRejectionReason::TaskNotStarted => "Task has not started.", } } } @@ -260,8 +262,8 @@ impl Display for ReportRejectionReason { pub enum OptOutReason { #[error("this aggregator is not peered with the given {0} aggregator")] NoSuchPeer(Role), - #[error("task has expired")] - TaskExpired, + #[error("task has ended")] + TaskEnded, #[error("invalid task: {0}")] TaskParameters(#[from] task::Error), #[error("URL parse error: {0}")] diff --git a/aggregator/src/aggregator/http_handlers/tests/report.rs b/aggregator/src/aggregator/http_handlers/tests/report.rs index c9c29c74b..664132b0d 100644 --- a/aggregator/src/aggregator/http_handlers/tests/report.rs +++ b/aggregator/src/aggregator/http_handlers/tests/report.rs @@ -202,21 +202,21 @@ async fn upload_handler() { ) .await; - // Reports with timestamps past the task's expiration should be rejected. - let task_expire_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) - .with_task_expiration(Some(clock.now().add(&Duration::from_seconds(60)).unwrap())) + // Reports with timestamps past the task's end time should be rejected. + let task_end_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) + .with_task_end(Some(clock.now().add(&Duration::from_seconds(60)).unwrap())) .build(); - let leader_task_expire_soon = task_expire_soon.leader_view().unwrap(); + let leader_task_end_soon = task_end_soon.leader_view().unwrap(); datastore - .put_aggregator_task(&leader_task_expire_soon) + .put_aggregator_task(&leader_task_end_soon) .await .unwrap(); let report_2 = create_report( - &leader_task_expire_soon, + &leader_task_end_soon, &hpke_keypair, clock.now().add(&Duration::from_seconds(120)).unwrap(), ); - let mut test_conn = post(task_expire_soon.report_upload_uri().unwrap().path()) + let mut test_conn = post(task_end_soon.report_upload_uri().unwrap().path()) .with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE) .with_request_body(report_2.get_encoded().unwrap()) .run_async(&handler) @@ -226,8 +226,8 @@ async fn upload_handler() { Status::BadRequest, "reportRejected", "Report could not be processed.", - task_expire_soon.id(), - Some(ReportRejectionReason::TaskExpired.detail()), + task_end_soon.id(), + Some(ReportRejectionReason::TaskEnded.detail()), ) .await; diff --git a/aggregator/src/aggregator/problem_details.rs b/aggregator/src/aggregator/problem_details.rs index f5d7f5e36..5f380e4d5 100644 --- a/aggregator/src/aggregator/problem_details.rs +++ b/aggregator/src/aggregator/problem_details.rs @@ -207,7 +207,7 @@ mod tests { random(), random(), RealClock::default().now(), - ReportRejectionReason::TaskExpired + ReportRejectionReason::TaskEnded )) }), Some(DapProblemType::ReportRejected), diff --git a/aggregator/src/aggregator/report_writer.rs b/aggregator/src/aggregator/report_writer.rs index b5adf5058..eee774a02 100644 --- a/aggregator/src/aggregator/report_writer.rs +++ b/aggregator/src/aggregator/report_writer.rs @@ -344,10 +344,11 @@ impl TaskUploadCounters { ReportRejectionReason::IntervalCollected => entry.increment_interval_collected(), ReportRejectionReason::DecryptFailure => entry.increment_report_decrypt_failure(), ReportRejectionReason::DecodeFailure => entry.increment_report_decode_failure(), - ReportRejectionReason::TaskExpired => entry.increment_task_expired(), + ReportRejectionReason::TaskEnded => entry.increment_task_ended(), ReportRejectionReason::Expired => entry.increment_report_expired(), ReportRejectionReason::TooEarly => entry.increment_report_too_early(), ReportRejectionReason::OutdatedHpkeConfig(_) => entry.increment_report_outdated_key(), + ReportRejectionReason::TaskNotStarted => entry.increment_task_not_started(), } } diff --git a/aggregator/src/aggregator/taskprov_tests.rs b/aggregator/src/aggregator/taskprov_tests.rs index c4ab23ae3..9fcb770f4 100644 --- a/aggregator/src/aggregator/taskprov_tests.rs +++ b/aggregator/src/aggregator/taskprov_tests.rs @@ -154,7 +154,7 @@ where let time_precision = Duration::from_seconds(1); let min_batch_size = 1; - let task_expiration = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap(); + let task_end = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap(); let task_config = TaskConfig::new( Vec::from("foobar".as_bytes()), "https://leader.example.com/".as_bytes().try_into().unwrap(), @@ -164,7 +164,7 @@ where min_batch_size, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, vdaf_config, ) .unwrap(); @@ -185,7 +185,7 @@ where .with_leader_aggregator_endpoint(Url::parse("https://leader.example.com/").unwrap()) .with_helper_aggregator_endpoint(Url::parse("https://helper.example.com/").unwrap()) .with_vdaf_verify_key(vdaf_verify_key) - .with_task_expiration(Some(task_expiration)) + .with_task_end(Some(task_end)) .with_report_expiry_age(peer_aggregator.report_expiry_age().copied()) .with_min_batch_size(min_batch_size as u64) .with_time_precision(Duration::from_seconds(1)) @@ -560,7 +560,7 @@ async fn taskprov_aggregate_init_malformed_extension() { } #[tokio::test] -async fn taskprov_opt_out_task_expired() { +async fn taskprov_opt_out_task_ended() { let test = TaskprovTestCase::new().await; let (transcript, report_share, _) = test.next_report_share(); @@ -582,7 +582,7 @@ async fn taskprov_opt_out_task_expired() { .primary_aggregator_auth_token() .request_authentication(); - // Advance clock past task expiry. + // Advance clock past task end time. test.clock.advance(&Duration::from_hours(48).unwrap()); let mut test_conn = put(test @@ -631,7 +631,7 @@ async fn taskprov_opt_out_mismatched_task_id() { let aggregation_job_id: AggregationJobId = random(); - let task_expiration = test + let task_end = test .clock .now() .add(&Duration::from_hours(24).unwrap()) @@ -646,7 +646,7 @@ async fn taskprov_opt_out_mismatched_task_id() { 100, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, VdafConfig::new( DpConfig::new(DpMechanism::None), VdafType::Fake { rounds: 2 }, @@ -708,7 +708,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() { let aggregation_job_id: AggregationJobId = random(); - let task_expiration = test + let task_end = test .clock .now() .add(&Duration::from_hours(24).unwrap()) @@ -723,7 +723,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() { 100, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, VdafConfig::new( DpConfig::new(DpMechanism::None), VdafType::Fake { rounds: 2 }, @@ -786,7 +786,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() { let aggregation_job_id: AggregationJobId = random(); - let task_expiration = test + let task_end = test .clock .now() .add(&Duration::from_hours(24).unwrap()) @@ -801,7 +801,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() { 100, TaskprovQuery::LeaderSelected, ), - task_expiration, + task_end, VdafConfig::new( DpConfig::new(DpMechanism::None), VdafType::Fake { rounds: 2 }, diff --git a/aggregator/src/aggregator/upload_tests.rs b/aggregator/src/aggregator/upload_tests.rs index f8a0d6a23..f432f711d 100644 --- a/aggregator/src/aggregator/upload_tests.rs +++ b/aggregator/src/aggregator/upload_tests.rs @@ -167,7 +167,9 @@ async fn upload() { assert_eq!( got_counter, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 1, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 1, 0, 0, 0 + )) ) } @@ -230,7 +232,9 @@ async fn upload_batch() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 100, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 100, 0, 0, 0 + )) ); } @@ -297,7 +301,9 @@ async fn upload_wrong_hpke_config_id() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 1, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 1, 0, 0, 0, 0 + )) ) } @@ -344,7 +350,9 @@ async fn upload_report_in_the_future_boundary_condition() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 1, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 1, 0, 0, 0 + )) ) } @@ -400,7 +408,9 @@ async fn upload_report_in_the_future_past_clock_skew() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 0, 1, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 0, 1, 0, 0 + )) ) } @@ -481,12 +491,14 @@ async fn upload_report_for_collected_batch() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(1, 0, 0, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 1, 0, 0, 0, 0, 0, 0, 0, 0 + )) ) } #[tokio::test] -async fn upload_report_task_expired() { +async fn upload_report_task_not_started() { let mut runtime_manager = TestRuntimeManager::new(); let UploadTest { aggregator, @@ -501,14 +513,77 @@ async fn upload_report_task_expired() { ) .await; + // Set the task start time to the future, and generate & upload a report from before that time. let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) - .with_task_expiration(Some(clock.now())) + .with_task_start(Some( + clock.now().add(&Duration::from_seconds(3600)).unwrap(), + )) .build() .leader_view() .unwrap(); datastore.put_aggregator_task(&task).await.unwrap(); - // Advance the clock to expire the task. + let report = create_report(&task, &hpke_keypair, clock.now()); + + // Try to upload the report, verify that we get the expected error. + let error = aggregator + .handle_upload(task.id(), &report.get_encoded().unwrap()) + .await + .unwrap_err(); + assert_matches!( + error.as_ref(), + Error::ReportRejected(rejection) => { + assert_eq!(task.id(), rejection.task_id()); + assert_eq!(report.metadata().id(), rejection.report_id()); + assert_eq!(report.metadata().time(), rejection.time()); + assert_matches!(rejection.reason(), ReportRejectionReason::TaskNotStarted); + } + ); + + // Wait for the report writer to have completed one write task. + runtime_manager + .wait_for_completed_tasks("aggregator", 1) + .await; + + let got_counters = datastore + .run_unnamed_tx(|tx| { + let task_id = *task.id(); + Box::pin(async move { tx.get_task_upload_counter(&task_id).await }) + }) + .await + .unwrap(); + assert_eq!( + got_counters, + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 0, 0, 1, 0 + )) + ) +} + +#[tokio::test] +async fn upload_report_task_ended() { + let mut runtime_manager = TestRuntimeManager::new(); + let UploadTest { + aggregator, + clock, + datastore, + ephemeral_datastore: _ephemeral_datastore, + hpke_keypair, + .. + } = UploadTest::new_with_runtime( + default_aggregator_config(), + runtime_manager.with_label("aggregator"), + ) + .await; + + let task = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count) + .with_task_end(Some(clock.now())) + .build() + .leader_view() + .unwrap(); + datastore.put_aggregator_task(&task).await.unwrap(); + + // Advance the clock to end the task. clock.advance(&Duration::from_seconds(1)); let report = create_report(&task, &hpke_keypair, clock.now()); @@ -523,7 +598,7 @@ async fn upload_report_task_expired() { assert_eq!(task.id(), rejection.task_id()); assert_eq!(report.metadata().id(), rejection.report_id()); assert_eq!(report.metadata().time(), rejection.time()); - assert_matches!(rejection.reason(), ReportRejectionReason::TaskExpired); + assert_matches!(rejection.reason(), ReportRejectionReason::TaskEnded); } ); @@ -541,7 +616,9 @@ async fn upload_report_task_expired() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 0, 0, 0, 0, 1)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 0, 0, 0, 0, 0, 1 + )) ) } @@ -602,7 +679,9 @@ async fn upload_report_report_expired() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 0, 1, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 0, 1, 0, 0, 0, 0, 0 + )) ) } @@ -662,7 +741,9 @@ async fn upload_report_faulty_encryption() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 0, 1, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 0, 1, 0, 0, 0, 0, 0, 0 + )) ) } @@ -723,7 +804,9 @@ async fn upload_report_public_share_decode_failure() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 1, 0, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 1, 0, 0, 0, 0, 0, 0, 0 + )) ) } @@ -798,6 +881,8 @@ async fn upload_report_leader_input_share_decode_failure() { .unwrap(); assert_eq!( got_counters, - Some(TaskUploadCounter::new_with_values(0, 1, 0, 0, 0, 0, 0, 0)) + Some(TaskUploadCounter::new_with_values( + 0, 1, 0, 0, 0, 0, 0, 0, 0 + )) ) } diff --git a/aggregator/src/binaries/janus_cli.rs b/aggregator/src/binaries/janus_cli.rs index b969341e3..02c1b3bc2 100644 --- a/aggregator/src/binaries/janus_cli.rs +++ b/aggregator/src/binaries/janus_cli.rs @@ -1328,7 +1328,7 @@ mod tests { bits: 2 role: Leader vdaf_verify_key: - task_expiration: 9000000000 + task_end: 9000000000 min_batch_size: 10 time_precision: 300 tolerable_clock_skew: 600 @@ -1351,7 +1351,7 @@ mod tests { bits: 2 role: Helper vdaf_verify_key: - task_expiration: 9000000000 + task_end: 9000000000 min_batch_size: 10 time_precision: 300 tolerable_clock_skew: 600 diff --git a/aggregator/src/cache.rs b/aggregator/src/cache.rs index 9ae36ce3b..fbf8d757e 100644 --- a/aggregator/src/cache.rs +++ b/aggregator/src/cache.rs @@ -367,14 +367,12 @@ mod tests { assert_eq!(task_aggregator.task.id(), task.id()); // Modify the task. - let new_expiration = Time::from_seconds_since_epoch(100); + let new_end = Time::from_seconds_since_epoch(100); datastore .run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { - tx.update_task_expiration(&task_id, Some(&new_expiration)) - .await - .unwrap(); + tx.update_task_end(&task_id, Some(&new_end)).await.unwrap(); Ok(()) }) }) @@ -385,8 +383,8 @@ mod tests { // previous task. let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); assert!( - (task_aggregator.task.task_expiration() == task.task_expiration()) - || (task_aggregator.task.task_expiration() == Some(&new_expiration)) + (task_aggregator.task.task_end() == task.task_end()) + || (task_aggregator.task.task_end() == Some(&new_end)) ); // Unfortunately, because moka doesn't provide any facility for a fake clock, we have to resort @@ -394,10 +392,7 @@ mod tests { sleep(Duration::from_secs(1)).await; let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); - assert_eq!( - task_aggregator.task.task_expiration(), - Some(&new_expiration) - ); + assert_eq!(task_aggregator.task.task_end(), Some(&new_end)); } #[tokio::test] @@ -441,14 +436,12 @@ mod tests { assert_eq!(task_aggregator.task.id(), task.id()); // Modify the task. - let new_expiration = Time::from_seconds_since_epoch(100); + let new_end = Time::from_seconds_since_epoch(100); datastore .run_unnamed_tx(|tx| { let task_id = *task.id(); Box::pin(async move { - tx.update_task_expiration(&task_id, Some(&new_expiration)) - .await - .unwrap(); + tx.update_task_end(&task_id, Some(&new_end)).await.unwrap(); Ok(()) }) }) @@ -459,16 +452,13 @@ mod tests { // previous value. let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); assert!( - (task_aggregator.task.task_expiration() == task.task_expiration()) - || (task_aggregator.task.task_expiration() == Some(&new_expiration)) + (task_aggregator.task.task_end() == task.task_end()) + || (task_aggregator.task.task_end() == Some(&new_end)) ); sleep(Duration::from_secs(1)).await; let task_aggregator = task_aggregators.get(task.id()).await.unwrap().unwrap(); - assert_eq!( - task_aggregator.task.task_expiration(), - Some(&new_expiration) - ); + assert_eq!(task_aggregator.task.task_end(), Some(&new_end)); } } diff --git a/aggregator_api/src/models.rs b/aggregator_api/src/models.rs index 655bead05..0b6e82006 100644 --- a/aggregator_api/src/models.rs +++ b/aggregator_api/src/models.rs @@ -72,8 +72,10 @@ pub(crate) struct PostTaskReq { /// The VDAF verification key used for this DAP task, as Base64 encoded bytes. Task ID is /// derived from the verify key. pub(crate) vdaf_verify_key: String, + /// The time before which the task is considered invalid. + pub(crate) task_start: Option