Skip to content

Commit

Permalink
Implement the async worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Jan 17, 2025
1 parent ad4efa6 commit e339c9c
Show file tree
Hide file tree
Showing 61 changed files with 2,669 additions and 473 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-subscriber = "0.3.18"
url = { version = "2.5.4", features = ["serde"] }
wasm-bindgen = "0.2.99"
webpki = "0.22.4"
worker = { version = "0.5", features = ["http"] }
worker = "0.5"
x509-parser = "0.15.1"

[workspace.dependencies.sentry]
Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ helper:
-c ./crates/daphne-server/examples/configuration-helper.toml
h: helper

compute-offload:
RUST_LOG=hyper=off,debug cargo run \
--profile release-symbols \
--features test-utils \
--example service \
-- \
-c ./crates/daphne-server/examples/configuration-cpu-offload.toml
co: compute-offload


helper-worker:
cd ./crates/daphne-worker-test/ && \
wrangler dev -c wrangler.aggregator.toml --port 8788 -e helper
Expand Down
46 changes: 37 additions & 9 deletions crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use daphne::{
hpke::{HpkeConfig, HpkeKemId, HpkeReceiverConfig},
messages::{
self, taskprov::TaskprovAdvertisement, AggregateShareReq, AggregationJobId, Base64Encode,
BatchId, BatchSelector, PartialBatchSelector, TaskId,
BatchId, BatchSelector, PartialBatchSelector, ReadyAggregationJobResp, TaskId,
},
metrics::DaphneMetrics,
testing::report_generator::ReportGenerator,
Expand Down Expand Up @@ -511,7 +511,7 @@ impl Test {
let _guard = load_control.wait().await;
info!("Starting AggregationJobInitReq");
let start = Instant::now();
let agg_job_resp = self
let mut agg_job_resp = self
.http_client
.submit_aggregation_job_init_req(
self.helper_url.join(&format!(
Expand All @@ -528,14 +528,42 @@ impl Test {
)
.await?;
let duration = start.elapsed();
info!("Finished AggregationJobInitReq in {duration:#?}");
info!("Finished submitting AggregationJobInitReq in {duration:#?}");
let mut poll_count = 1;
let ready = loop {
agg_job_resp = match agg_job_resp {
messages::AggregationJobResp::Ready { transitions } => {
if poll_count != 1 {
info!(
"Finished polling for AggregationJobResp after {:#?}",
start.elapsed()
);
}
break ReadyAggregationJobResp { transitions };
}
messages::AggregationJobResp::Processing => {
if poll_count == 1 {
info!("Polling for AggregationJobResp");
}
tokio::time::sleep(Duration::from_millis(poll_count * 200)).await;
poll_count += 1;
self.http_client
.poll_aggregation_job_init(
self.helper_url
.join(&format!("tasks/{task_id}/aggregation_jobs/{agg_job_id}"))?,
task_config.version,
functions::helper::Options {
taskprov_advertisement: taskprov_advertisement.as_ref(),
bearer_token: self.bearer_token.as_ref(),
},
)
.await?
}
};
};

let agg_share_span = task_config.consume_agg_job_resp(
task_id,
agg_job_state,
agg_job_resp,
self.metrics(),
)?;
let agg_share_span =
task_config.consume_agg_job_resp(task_id, agg_job_state, ready, self.metrics())?;

let aggregated_report_count = agg_share_span
.iter()
Expand Down
12 changes: 10 additions & 2 deletions crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// Copyright (c) 2025 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

use std::{future::ready, num::NonZeroUsize, ops::Range, time::SystemTime};
Expand Down Expand Up @@ -79,6 +79,7 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self))]
async fn get_agg_share(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_sel: &BatchSelector,
) -> Result<DapAggregateShare, DapError> {
Expand Down Expand Up @@ -115,6 +116,7 @@ impl DapAggregator for crate::App {
#[tracing::instrument(skip(self))]
async fn mark_collected(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_sel: &BatchSelector,
) -> Result<(), DapError> {
Expand Down Expand Up @@ -255,6 +257,7 @@ impl DapAggregator for crate::App {

async fn is_batch_overlapping(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_sel: &BatchSelector,
) -> Result<bool, DapError> {
Expand Down Expand Up @@ -288,7 +291,12 @@ impl DapAggregator for crate::App {
)
}

async fn batch_exists(&self, task_id: &TaskId, batch_id: &BatchId) -> Result<bool, DapError> {
async fn batch_exists(
&self,
_version: DapVersion,
task_id: &TaskId,
batch_id: &BatchId,
) -> Result<bool, DapError> {
let task_config = self
.get_task_config_for(task_id)
.await?
Expand Down
12 changes: 11 additions & 1 deletion crates/daphne-server/src/roles/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

use axum::async_trait;
use daphne::{
messages::{AggregationJobId, TaskId},
fatal_error,
messages::{AggregationJobId, AggregationJobResp, TaskId},
roles::{helper::AggregationJobRequestHash, DapHelper},
DapError, DapVersion,
};
Expand All @@ -20,4 +21,13 @@ impl DapHelper for crate::App {
// the server implementation can't check for this
Ok(())
}

async fn poll_aggregated(
&self,
_version: DapVersion,
_task_id: &TaskId,
_agg_job_id: &AggregationJobId,
) -> Result<AggregationJobResp, DapError> {
Err(fatal_error!(err = "polling not implemented"))
}
}
4 changes: 4 additions & 0 deletions crates/daphne-server/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ impl DapLeader for crate::App {
{
self.send_http(meta, Method::PUT, url, payload).await
}

async fn send_http_get(&self, meta: DapRequestMeta, url: Url) -> Result<DapResponse, DapError> {
self.send_http(meta, Method::PUT, url, ()).await
}
}

impl crate::App {
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-server/src/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ impl DecodeFromDapHttpBody for HashedAggregationJobReq {
}
}

/// Using `()` ignores the body of a request.
impl DecodeFromDapHttpBody for CollectionPollReq {
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
Ok(Self)
}
}

/// Using `()` ignores the body of a request.
impl DecodeFromDapHttpBody for () {
fn decode_from_http_body(_bytes: Bytes, _meta: &DapRequestMeta) -> Result<Self, DapAbort> {
Expand Down
5 changes: 4 additions & 1 deletion crates/daphne-service-utils/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ fn main() {
#[cfg(feature = "durable_requests")]
compiler
.file("./src/durable_requests/durable_request.capnp")
.file("./src/durable_requests/bindings/aggregation_job_store.capnp");
.file("./src/durable_requests/bindings/aggregation_job_store.capnp")
.file("./src/durable_requests/bindings/aggregate_store_v2.capnp")
.file("./src/durable_requests/bindings/agg_job_response_store.capnp")
.file("./src/durable_requests/bindings/replay_checker.capnp");

#[cfg(feature = "compute-offload")]
compiler.file("./src/compute_offload/compute_offload.capnp");
Expand Down
18 changes: 17 additions & 1 deletion crates/daphne-service-utils/src/capnproto/base.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,29 @@ struct U8L16 @0x9e3f65b13f71cfcb {
snd @1 :UInt64;
}

struct PartialBatchSelector {
struct PartialBatchSelector @0xae86084e56c22fc0 {
union {
timeInterval @0 :Void;
leaderSelectedByBatchId @1 :BatchId;
}
}

enum ReportError @0xa76428617779e659 {
reserved @0;
batchCollected @1;
reportReplayed @2;
reportDropped @3;
hpkeUnknownConfigId @4;
hpkeDecryptError @5;
vdafPrepError @6;
batchSaturated @7;
taskExpired @8;
invalidMessage @9;
reportTooEarly @10;
taskNotStarted @11;
}


using ReportId = U8L16;
using BatchId = U8L32;
using TaskId = U8L32;
Expand Down
39 changes: 39 additions & 0 deletions crates/daphne-service-utils/src/capnproto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::base_capnp::{self, partial_batch_selector, u8_l16, u8_l32};
use capnp::struct_list;
use capnp::traits::{FromPointerBuilder, FromPointerReader};
use daphne::messages;
use daphne::{
messages::{AggregationJobId, BatchId, PartialBatchSelector, ReportId, TaskId},
DapVersion,
Expand Down Expand Up @@ -204,6 +205,44 @@ impl CapnprotoPayloadDecode for PartialBatchSelector {
}
}

impl From<messages::ReportError> for base_capnp::ReportError {
fn from(failure: messages::ReportError) -> Self {
match failure {
messages::ReportError::Reserved => Self::Reserved,
messages::ReportError::BatchCollected => Self::BatchCollected,
messages::ReportError::ReportReplayed => Self::ReportReplayed,
messages::ReportError::ReportDropped => Self::ReportDropped,
messages::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
messages::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
messages::ReportError::VdafPrepError => Self::VdafPrepError,
messages::ReportError::BatchSaturated => Self::BatchSaturated,
messages::ReportError::TaskExpired => Self::TaskExpired,
messages::ReportError::InvalidMessage => Self::InvalidMessage,
messages::ReportError::ReportTooEarly => Self::ReportTooEarly,
messages::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

impl From<base_capnp::ReportError> for messages::ReportError {
fn from(val: base_capnp::ReportError) -> Self {
match val {
base_capnp::ReportError::Reserved => Self::Reserved,
base_capnp::ReportError::BatchCollected => Self::BatchCollected,
base_capnp::ReportError::ReportReplayed => Self::ReportReplayed,
base_capnp::ReportError::ReportDropped => Self::ReportDropped,
base_capnp::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
base_capnp::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
base_capnp::ReportError::VdafPrepError => Self::VdafPrepError,
base_capnp::ReportError::BatchSaturated => Self::BatchSaturated,
base_capnp::ReportError::TaskExpired => Self::TaskExpired,
base_capnp::ReportError::InvalidMessage => Self::InvalidMessage,
base_capnp::ReportError::ReportTooEarly => Self::ReportTooEarly,
base_capnp::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

pub fn encode_list<I, O>(list: I, mut builder: struct_list::Builder<'_, O>)
where
I: IntoIterator<Item: CapnprotoPayloadEncode>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

@0xd932f3d934afce3b;

# Utilities

using Base = import "../capnproto/base.capnp";

using VdafConfig = Text; # json encoded
Expand Down Expand Up @@ -93,27 +91,11 @@ struct PrepareInit @0x8192568cb3d03f59 {



struct InitializedReports {
struct InitializedReport {
struct InitializedReports @0xf36341397ae4a146 {
struct InitializedReport @0xfa833aa6b5d03d6d {
using VdafPrepShare = Data;
using VdafPrepState = Data;

enum ReportError {
reserved @0;
batchCollected @1;
reportReplayed @2;
reportDropped @3;
hpkeUnknownConfigId @4;
hpkeDecryptError @5;
vdafPrepError @6;
batchSaturated @7;
taskExpired @8;
invalidMessage @9;
reportTooEarly @10;
taskNotStarted @11;
}


union {
ready :group {
metadata @0 :ReportMetadata;
Expand All @@ -124,7 +106,7 @@ struct InitializedReports {
}
rejected :group {
metadata @5 :ReportMetadata;
failure @6 :ReportError;
failure @6 :Base.ReportError;
}
}
}
Expand Down
38 changes: 0 additions & 38 deletions crates/daphne-service-utils/src/compute_offload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,44 +476,6 @@ impl CapnprotoPayloadDecode for InitializedReports {
}
}

impl From<messages::ReportError> for initialized_report::ReportError {
fn from(failure: messages::ReportError) -> Self {
match failure {
messages::ReportError::Reserved => Self::Reserved,
messages::ReportError::BatchCollected => Self::BatchCollected,
messages::ReportError::ReportReplayed => Self::ReportReplayed,
messages::ReportError::ReportDropped => Self::ReportDropped,
messages::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
messages::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
messages::ReportError::VdafPrepError => Self::VdafPrepError,
messages::ReportError::BatchSaturated => Self::BatchSaturated,
messages::ReportError::TaskExpired => Self::TaskExpired,
messages::ReportError::InvalidMessage => Self::InvalidMessage,
messages::ReportError::ReportTooEarly => Self::ReportTooEarly,
messages::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

impl From<initialized_report::ReportError> for messages::ReportError {
fn from(val: initialized_report::ReportError) -> Self {
match val {
initialized_report::ReportError::Reserved => Self::Reserved,
initialized_report::ReportError::BatchCollected => Self::BatchCollected,
initialized_report::ReportError::ReportReplayed => Self::ReportReplayed,
initialized_report::ReportError::ReportDropped => Self::ReportDropped,
initialized_report::ReportError::HpkeUnknownConfigId => Self::HpkeUnknownConfigId,
initialized_report::ReportError::HpkeDecryptError => Self::HpkeDecryptError,
initialized_report::ReportError::VdafPrepError => Self::VdafPrepError,
initialized_report::ReportError::BatchSaturated => Self::BatchSaturated,
initialized_report::ReportError::TaskExpired => Self::TaskExpired,
initialized_report::ReportError::InvalidMessage => Self::InvalidMessage,
initialized_report::ReportError::ReportTooEarly => Self::ReportTooEarly,
initialized_report::ReportError::TaskNotStarted => Self::TaskNotStarted,
}
}
}

fn to_capnp<E: ToString>(e: E) -> capnp::Error {
capnp::Error {
kind: capnp::ErrorKind::Failed,
Expand Down
Loading

0 comments on commit e339c9c

Please sign in to comment.