Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement task start time. #3501

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,8 @@ impl<C: Clock> Aggregator<C> {
task_config.query_config().query().try_into()?,
vdaf_instance,
vdaf_verify_key,
Some(*task_config.task_expiration()),
None, // TODO(#3636): update taskprov implementation to specify task start
Some(*task_config.task_end()),
peer_aggregator.report_expiry_age().cloned(),
task_config.query_config().min_batch_size() as u64,
*task_config.query_config().time_precision(),
Expand Down Expand Up @@ -763,7 +764,7 @@ impl<C: Clock> Aggregator<C> {

/// Validate and authorize a taskprov request. Returns values necessary for determining whether
/// we can opt into the task. This function might return an opt-out error for conditions that
/// are relevant for all DAP workflows (e.g. task expiration).
/// are relevant for all DAP workflows (e.g. task end).
#[tracing::instrument(skip(self, aggregator_auth_token), err(level = Level::DEBUG))]
async fn taskprov_authorize_request(
&self,
Expand Down Expand Up @@ -794,8 +795,8 @@ impl<C: Clock> Aggregator<C> {
return Err(Error::UnauthorizedRequest(*task_id));
}

if self.clock.now() > *task_config.task_expiration() {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskExpired));
if self.clock.now() > *task_config.task_end() {
return Err(Error::InvalidTask(*task_id, OptOutReason::TaskEnded));
}

debug!(
Expand Down Expand Up @@ -1608,11 +1609,18 @@ impl VdafOps {
return Err(reject_report(ReportRejectionReason::TooEarly).await?);
}

// Reject reports after a task has expired.
// Reject reports before a task has started.
if let Some(task_start) = task.task_start() {
if report.metadata().time().is_before(task_start) {
return Err(reject_report(ReportRejectionReason::TaskNotStarted).await?);
}
}

// Reject reports after a task has ended.
// https://www.ietf.org/archive/id/draft-ietf-ppm-dap-07.html#section-4.4.2-20
if let Some(task_expiration) = task.task_expiration() {
if report.metadata().time().is_after(task_expiration) {
return Err(reject_report(ReportRejectionReason::TaskExpired).await?);
if let Some(task_end) = task.task_end() {
if report.metadata().time().is_after(task_end) {
return Err(reject_report(ReportRejectionReason::TaskEnded).await?);
}
}

Expand Down
10 changes: 6 additions & 4 deletions aggregator/src/aggregator/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ pub enum ReportRejectionReason {
IntervalCollected,
DecryptFailure,
DecodeFailure,
TaskExpired,
TaskEnded,
Expired,
TooEarly,
OutdatedHpkeConfig(HpkeConfigId),
TaskNotStarted,
}

impl ReportRejectionReason {
Expand All @@ -239,12 +240,13 @@ impl ReportRejectionReason {
}
ReportRejectionReason::DecryptFailure => "Report share could not be decrypted.",
ReportRejectionReason::DecodeFailure => "Report could not be decoded.",
ReportRejectionReason::TaskExpired => "Task has expired.",
ReportRejectionReason::TaskEnded => "Task has ended.",
ReportRejectionReason::Expired => "Report timestamp is too old.",
ReportRejectionReason::TooEarly => "Report timestamp is too far in the future.",
ReportRejectionReason::OutdatedHpkeConfig(_) => {
"Report is using an outdated HPKE configuration."
}
ReportRejectionReason::TaskNotStarted => "Task has not started.",
}
}
}
Expand All @@ -260,8 +262,8 @@ impl Display for ReportRejectionReason {
pub enum OptOutReason {
#[error("this aggregator is not peered with the given {0} aggregator")]
NoSuchPeer(Role),
#[error("task has expired")]
TaskExpired,
#[error("task has ended")]
TaskEnded,
#[error("invalid task: {0}")]
TaskParameters(#[from] task::Error),
#[error("URL parse error: {0}")]
Expand Down
18 changes: 9 additions & 9 deletions aggregator/src/aggregator/http_handlers/tests/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,21 @@ async fn upload_handler() {
)
.await;

// Reports with timestamps past the task's expiration should be rejected.
let task_expire_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count)
.with_task_expiration(Some(clock.now().add(&Duration::from_seconds(60)).unwrap()))
// Reports with timestamps past the task's end time should be rejected.
let task_end_soon = TaskBuilder::new(BatchMode::TimeInterval, VdafInstance::Prio3Count)
.with_task_end(Some(clock.now().add(&Duration::from_seconds(60)).unwrap()))
.build();
let leader_task_expire_soon = task_expire_soon.leader_view().unwrap();
let leader_task_end_soon = task_end_soon.leader_view().unwrap();
datastore
.put_aggregator_task(&leader_task_expire_soon)
.put_aggregator_task(&leader_task_end_soon)
.await
.unwrap();
let report_2 = create_report(
&leader_task_expire_soon,
&leader_task_end_soon,
&hpke_keypair,
clock.now().add(&Duration::from_seconds(120)).unwrap(),
);
let mut test_conn = post(task_expire_soon.report_upload_uri().unwrap().path())
let mut test_conn = post(task_end_soon.report_upload_uri().unwrap().path())
.with_request_header(KnownHeaderName::ContentType, Report::MEDIA_TYPE)
.with_request_body(report_2.get_encoded().unwrap())
.run_async(&handler)
Expand All @@ -226,8 +226,8 @@ async fn upload_handler() {
Status::BadRequest,
"reportRejected",
"Report could not be processed.",
task_expire_soon.id(),
Some(ReportRejectionReason::TaskExpired.detail()),
task_end_soon.id(),
Some(ReportRejectionReason::TaskEnded.detail()),
)
.await;

Expand Down
2 changes: 1 addition & 1 deletion aggregator/src/aggregator/problem_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
random(),
random(),
RealClock::default().now(),
ReportRejectionReason::TaskExpired
ReportRejectionReason::TaskEnded
))
}),
Some(DapProblemType::ReportRejected),
Expand Down
3 changes: 2 additions & 1 deletion aggregator/src/aggregator/report_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ impl TaskUploadCounters {
ReportRejectionReason::IntervalCollected => entry.increment_interval_collected(),
ReportRejectionReason::DecryptFailure => entry.increment_report_decrypt_failure(),
ReportRejectionReason::DecodeFailure => entry.increment_report_decode_failure(),
ReportRejectionReason::TaskExpired => entry.increment_task_expired(),
ReportRejectionReason::TaskEnded => entry.increment_task_ended(),
ReportRejectionReason::Expired => entry.increment_report_expired(),
ReportRejectionReason::TooEarly => entry.increment_report_too_early(),
ReportRejectionReason::OutdatedHpkeConfig(_) => entry.increment_report_outdated_key(),
ReportRejectionReason::TaskNotStarted => entry.increment_task_not_started(),
}
}

Expand Down
22 changes: 11 additions & 11 deletions aggregator/src/aggregator/taskprov_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ where

let time_precision = Duration::from_seconds(1);
let min_batch_size = 1;
let task_expiration = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap();
let task_end = clock.now().add(&Duration::from_hours(24).unwrap()).unwrap();
let task_config = TaskConfig::new(
Vec::from("foobar".as_bytes()),
"https://leader.example.com/".as_bytes().try_into().unwrap(),
Expand All @@ -164,7 +164,7 @@ where
min_batch_size,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
vdaf_config,
)
.unwrap();
Expand All @@ -185,7 +185,7 @@ where
.with_leader_aggregator_endpoint(Url::parse("https://leader.example.com/").unwrap())
.with_helper_aggregator_endpoint(Url::parse("https://helper.example.com/").unwrap())
.with_vdaf_verify_key(vdaf_verify_key)
.with_task_expiration(Some(task_expiration))
.with_task_end(Some(task_end))
.with_report_expiry_age(peer_aggregator.report_expiry_age().copied())
.with_min_batch_size(min_batch_size as u64)
.with_time_precision(Duration::from_seconds(1))
Expand Down Expand Up @@ -560,7 +560,7 @@ async fn taskprov_aggregate_init_malformed_extension() {
}

#[tokio::test]
async fn taskprov_opt_out_task_expired() {
async fn taskprov_opt_out_task_ended() {
let test = TaskprovTestCase::new().await;

let (transcript, report_share, _) = test.next_report_share();
Expand All @@ -582,7 +582,7 @@ async fn taskprov_opt_out_task_expired() {
.primary_aggregator_auth_token()
.request_authentication();

// Advance clock past task expiry.
// Advance clock past task end time.
test.clock.advance(&Duration::from_hours(48).unwrap());

let mut test_conn = put(test
Expand Down Expand Up @@ -631,7 +631,7 @@ async fn taskprov_opt_out_mismatched_task_id() {

let aggregation_job_id: AggregationJobId = random();

let task_expiration = test
let task_end = test
.clock
.now()
.add(&Duration::from_hours(24).unwrap())
Expand All @@ -646,7 +646,7 @@ async fn taskprov_opt_out_mismatched_task_id() {
100,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
VdafConfig::new(
DpConfig::new(DpMechanism::None),
VdafType::Fake { rounds: 2 },
Expand Down Expand Up @@ -708,7 +708,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() {

let aggregation_job_id: AggregationJobId = random();

let task_expiration = test
let task_end = test
.clock
.now()
.add(&Duration::from_hours(24).unwrap())
Expand All @@ -723,7 +723,7 @@ async fn taskprov_opt_out_peer_aggregator_wrong_role() {
100,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
VdafConfig::new(
DpConfig::new(DpMechanism::None),
VdafType::Fake { rounds: 2 },
Expand Down Expand Up @@ -786,7 +786,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() {

let aggregation_job_id: AggregationJobId = random();

let task_expiration = test
let task_end = test
.clock
.now()
.add(&Duration::from_hours(24).unwrap())
Expand All @@ -801,7 +801,7 @@ async fn taskprov_opt_out_peer_aggregator_does_not_exist() {
100,
TaskprovQuery::LeaderSelected,
),
task_expiration,
task_end,
VdafConfig::new(
DpConfig::new(DpMechanism::None),
VdafType::Fake { rounds: 2 },
Expand Down
Loading
Loading