Skip to content

Commit

Permalink
Implement task start time.
Browse files Browse the repository at this point in the history
Note that DAP specifies the parameters as "task start time" and "task
duration", rather than Janus' "task start time" and "task end time".
Janus implements things this way so that either parameter can be left
unspecified.
  • Loading branch information
branlwyd committed Nov 22, 2024
1 parent b1e2767 commit 5f13168
Show file tree
Hide file tree
Showing 25 changed files with 433 additions and 224 deletions.
24 changes: 16 additions & 8 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,8 @@ impl<C: Clock> Aggregator<C> {
task_config.query_config().query().try_into()?,
vdaf_instance,
vdaf_verify_key,
Some(*task_config.task_expiration()),
None, // TODO(#3636): update taskprov implementation to specify task start
Some(*task_config.task_end()),
peer_aggregator.report_expiry_age().cloned(),
task_config.query_config().min_batch_size() as u64,
*task_config.query_config().time_precision(),
Expand Down Expand Up @@ -763,7 +764,7 @@ impl<C: Clock> Aggregator<C> {

/// Validate and authorize a taskprov request. Returns values necessary for determining whether
/// we can opt into the task. This function might return an opt-out error for conditions that
/// are relevant for all DAP workflows (e.g. task expiration).
/// are relevant for all DAP workflows (e.g. task end).
#[tracing::instrument(skip(self, aggregator_auth_token), err(level = Level::DEBUG))]
async fn taskprov_authorize_request(
&self,
Expand Down Expand Up @@ -794,8 +795,8 @@ impl<C: Clock> Aggregator<C> {
return Err(Error::UnauthorizedRequest(*task_id));
}

if self.clock.now() > *task_config.task_expiration() {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired));
if self.clock.now() > *task_config.task_end() {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskEnded));
}

debug!(
Expand Down Expand Up @@ -1608,11 +1609,18 @@ impl VdafOps {
return Err(reject_report(ReportRejectionReason::TooEarly).await?);
}

// Reject reports after a task has expired.
// Reject reports before a task has started.
if let Some(task_start) = task.task_start() {
if report.metadata().time().is_before(task_start) {
return Err(reject_report(ReportRejectionReason::TaskNotStarted).await?);
}
}

// Reject reports after a task has ended.
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#section-4.4.2-20
if let Some(task_expiration) = task.task_expiration() {
if report.metadata().time().is_after(task_expiration) {
return Err(reject_report(ReportRejectionReason::TaskExpired).await?);
if let Some(task_end) = task.task_end() {
if report.metadata().time().is_after(task_end) {
return Err(reject_report(ReportRejectionReason::TaskEnded).await?);
}
}

Expand Down
10 changes: 6 additions & 4 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ pub enum ReportRejectionReason {
IntervalCollected,
DecryptFailure,
DecodeFailure,
TaskExpired,
TaskEnded,
Expired,
TooEarly,
OutdatedHpkeConfig(HpkeConfigId),
TaskNotStarted,
}

impl ReportRejectionReason {
Expand All @@ -239,12 +240,13 @@ impl ReportRejectionReason {
}
ReportRejectionReason::DecryptFailure => "Report share could not be decrypted.",
ReportRejectionReason::DecodeFailure => "Report could not be decoded.",
ReportRejectionReason::TaskExpired => "Task has expired.",
ReportRejectionReason::TaskEnded => "Task has ended.",
ReportRejectionReason::Expired => "Report timestamp is too old.",
ReportRejectionReason::TooEarly => "Report timestamp is too far in the future.",
ReportRejectionReason::OutdatedHpkeConfig(_) => {
"Report is using an outdated HPKE configuration."
}
ReportRejectionReason::TaskNotStarted => "Task has not started.",
}
}
}
Expand All @@ -260,8 +262,8 @@ impl Display for ReportRejectionReason {
pub enum OptOutReason {
#[error("this aggregator is not peered with the given {0} aggregator")]
NoSuchPeer(Role),
#[error("task has expired")]
TaskExpired,
#[error("task has ended")]
TaskEnded,
#[error("invalid task: {0}")]
TaskParameters(#[from] task::Error),
#[error("URL parse error: {0}")]
Expand Down
18 changes: 9 additions & 9 deletions aggregator/src/aggregator/http_handlers/tests/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,21 @@ async fn upload_handler() {
)
.await;

// Reports with timestamps past the task's expiration should be rejected.
let task_expire_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count)
.with_task_expiration(Some(clock.now().add(&Duration::from_seconds(60)).unwrap()))
// Reports with timestamps past the task's end time should be rejected.
let task_end_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count)
.with_task_end(Some(clock.now().add(&Duration::from_seconds(60)).unwrap()))
.build();
let leader_task_expire_soon = task_expire_soon.leader_view().unwrap();
let leader_task_end_soon = task_end_soon.leader_view().unwrap();
datastore
.put_aggregator_task(&leader_task_expire_soon)
.put_aggregator_task(&leader_task_end_soon)
.await
.unwrap();
let report_2 = create_report(
&leader_task_expire_soon,
&leader_task_end_soon,
&hpke_keypair,
clock.now().add(&Duration::from_seconds(120)).unwrap(),
);
let mut test_conn = post(task_expire_soon.report_upload_uri().unwrap().path())
let mut test_conn = post(task_end_soon.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(report_2.get_encoded().unwrap())
.run_async(&handler)
Expand All @@ -226,8 +226,8 @@ async fn upload_handler() {
Status::BadRequest,
"reportRejected",
"Report could not be processed.",
task_expire_soon.id(),
Some(ReportRejectionReason::TaskExpired.detail()),
task_end_soon.id(),
Some(ReportRejectionReason::TaskEnded.detail()),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/problem_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
random(),
random(),
RealClock::default().now(),
ReportRejectionReason::TaskExpired
ReportRejectionReason::TaskEnded
))
}),
Some(DapProblemType::ReportRejected),
Expand Down
3 changes: 2 additions & 1 deletion aggregator/src/aggregator/report_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ impl TaskUploadCounters {
ReportRejectionReason::IntervalCollected => entry.increment_interval_collected(),
ReportRejectionReason::DecryptFailure => entry.increment_report_decrypt_failure(),
ReportRejectionReason::DecodeFailure => entry.increment_report_decode_failure(),
ReportRejectionReason::TaskExpired => entry.increment_task_expired(),
ReportRejectionReason::TaskEnded => entry.increment_task_ended(),
ReportRejectionReason::Expired => entry.increment_report_expired(),
ReportRejectionReason::TooEarly => entry.increment_report_too_early(),
ReportRejectionReason::OutdatedHpkeConfig(_) => entry.increment_report_outdated_key(),
ReportRejectionReason::TaskNotStarted => entry.increment_task_not_started(),
}
}

Expand Down
22 changes: 11 additions & 11 deletions aggregator/src/aggregator/taskprov_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where

let time_precision = Duration::from_seconds(1);
let min_batch_size = 1;
let task_expiration = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap();
let task_end = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap();
let task_config = TaskConfig::new(
Vec::from("foobar".as_bytes()),
"https://leader.example.com/".as_bytes().try_into().unwrap(),
Expand All @@ -164,7 +164,7 @@ where
min_batch_size,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
vdaf_config,
)
.unwrap();
Expand All @@ -185,7 +185,7 @@ where
.with_leader_aggregator_endpoint(Url::parse("https://leader.example.com/").unwrap())
.with_helper_aggregator_endpoint(Url::parse("https://helper.example.com/").unwrap())
.with_vdaf_verify_key(vdaf_verify_key)
.with_task_expiration(Some(task_expiration))
.with_task_end(Some(task_end))
.with_report_expiry_age(peer_aggregator.report_expiry_age().copied())
.with_min_batch_size(min_batch_size as u64)
.with_time_precision(Duration::from_seconds(1))
Expand Down Expand Up @@ -560,7 +560,7 @@ async fn taskprov_aggregate_init_malformed_extension() {
}

#[tokio::test]
async fn taskprov_opt_out_task_expired() {
async fn taskprov_opt_out_task_ended() {
let test = TaskprovTestCase::new().await;

let (transcript, report_share, _) = test.next_report_share();
Expand All @@ -582,7 +582,7 @@ async fn taskprov_opt_out_task_expired() {
.primary_aggregator_auth_token()
.request_authentication();

// Advance clock past task expiry.
// Advance clock past task end time.
test.clock.advance(&Duration::from_hours(48).unwrap());

let mut test_conn = put(test
Expand Down Expand Up @@ -631,7 +631,7 @@ async fn taskprov_opt_out_mismatched_task_id() {

let aggregation_job_id: AggregationJobId = random();

let task_expiration = test
let task_end = test
.clock
.now()
.add(&Duration::from_hours(24).unwrap())
Expand All @@ -646,7 +646,7 @@ async fn taskprov_opt_out_mismatched_task_id() {
100,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
VdafConfig::new(
DpConfig::new(DpMechanism::None),
VdafType::Fake { rounds: 2 },
Expand Down Expand Up @@ -708,7 +708,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() {

let aggregation_job_id: AggregationJobId = random();

let task_expiration = test
let task_end = test
.clock
.now()
.add(&Duration::from_hours(24).unwrap())
Expand All @@ -723,7 +723,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() {
100,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
VdafConfig::new(
DpConfig::new(DpMechanism::None),
VdafType::Fake { rounds: 2 },
Expand Down Expand Up @@ -786,7 +786,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() {

let aggregation_job_id: AggregationJobId = random();

let task_expiration = test
let task_end = test
.clock
.now()
.add(&Duration::from_hours(24).unwrap())
Expand All @@ -801,7 +801,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() {
100,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
VdafConfig::new(
DpConfig::new(DpMechanism::None),
VdafType::Fake { rounds: 2 },
Expand Down
Loading

0 comments on commit 5f13168

Please sign in to comment.