diff --git a/Cargo.lock b/Cargo.lock index 9f99c2bfb..ff058374a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1891,6 +1891,7 @@ name = "janus_aggregator_api" version = "0.6.0-prerelease-2" dependencies = [ "anyhow", + "assert_matches", "async-trait", "base64 0.21.4", "futures", @@ -1901,6 +1902,7 @@ dependencies = [ "querystring", "rand", "ring 0.17.0", + "rstest", "serde", "serde_json", "serde_test", diff --git a/Cargo.toml b/Cargo.toml index 4f51f913e..e490c89be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ version = "0.6.0-prerelease-2" [workspace.dependencies] anyhow = "1" +assert_matches = "1" base64 = "0.21.3" # Disable default features to disable compatibility with the old `time` crate, and we also don't # (yet) need other default features. diff --git a/aggregator/Cargo.toml b/aggregator/Cargo.toml index 6a5398c00..4d9b9297a 100644 --- a/aggregator/Cargo.toml +++ b/aggregator/Cargo.toml @@ -100,7 +100,7 @@ url = { version = "2.4.1", features = ["serde"] } uuid = { version = "1.4.1", features = ["v4"] } [dev-dependencies] -assert_matches = "1" +assert_matches.workspace = true hyper = "0.14.27" janus_aggregator = { path = ".", features = ["fpvec_bounded_l2", "test-util"] } janus_aggregator_core = { workspace = true, features = ["test-util"] } diff --git a/aggregator/src/aggregator.rs b/aggregator/src/aggregator.rs index bdb0a79f4..101d8a097 100644 --- a/aggregator/src/aggregator.rs +++ b/aggregator/src/aggregator.rs @@ -354,15 +354,15 @@ impl Aggregator { ) -> Result { let task_aggregator = match self.task_aggregator_for(task_id).await? { Some(task_aggregator) => { + if task_aggregator.task.role() != &Role::Helper { + return Err(Error::UnrecognizedTask(*task_id)); + } if !task_aggregator .task .check_aggregator_auth_token(auth_token.as_ref()) { return Err(Error::UnauthorizedRequest(*task_id)); } - if task_aggregator.task.role() != &Role::Helper { - return Err(Error::UnrecognizedTask(*task_id)); - } task_aggregator } None if self.cfg.taskprov_config.enabled && taskprov_task_config.is_some() => { diff --git a/aggregator/src/bin/janus_cli.rs b/aggregator/src/bin/janus_cli.rs index 155b3350c..7e114e7e9 100644 --- a/aggregator/src/bin/janus_cli.rs +++ b/aggregator/src/bin/janus_cli.rs @@ -728,7 +728,11 @@ mod tests { aead_id: Aes128Gcm public_key: 8lAqZ7OfNV2Gi_9cNE6J9WRmPbO-k1UPtu2Bztd0-yc aggregator_auth_token: + type: Bearer + token: Y29sbGVjdG9yLWFiZjU0MDhlMmIxNjAxODMxNjI1YWYzOTU5MTA2NDU4 collector_auth_token: + type: Bearer + token: Y29sbGVjdG9yLWFiZjU0MDhlMmIxNjAxODMxNjI1YWYzOTU5MTA2NDU4 hpke_keys: [] - peer_aggregator_endpoint: https://leader query_type: TimeInterval @@ -748,7 +752,9 @@ mod tests { kdf_id: HkdfSha256 aead_id: Aes128Gcm public_key: 8lAqZ7OfNV2Gi_9cNE6J9WRmPbO-k1UPtu2Bztd0-yc - aggregator_auth_token: + aggregator_auth_token_hash: + type: Bearer + hash: MJOoBO_ysLEuG_lv2C37eEOf1Ngetsr-Ers0ZYj4vdQ collector_auth_token: hpke_keys: [] "#; diff --git a/aggregator_api/Cargo.toml b/aggregator_api/Cargo.toml index a1ba1db5b..501f52102 100644 --- a/aggregator_api/Cargo.toml +++ b/aggregator_api/Cargo.toml @@ -31,7 +31,9 @@ trillium-router.workspace = true url = { version = "2.4.1", features = ["serde"] } [dev-dependencies] +assert_matches.workspace = true futures = "0.3.28" janus_aggregator_core = { workspace = true, features = ["test-util"] } +rstest.workspace = true tokio.workspace = true trillium-testing = { workspace = true, features = ["tokio"] } diff --git a/aggregator_api/src/models.rs b/aggregator_api/src/models.rs index ad480b8a2..9047ec454 100644 --- a/aggregator_api/src/models.rs +++ b/aggregator_api/src/models.rs @@ -108,11 +108,9 @@ pub(crate) struct TaskResp { /// How much clock skew to allow between client and aggregator. Reports from /// farther than this duration into the future will be rejected. pub(crate) tolerable_clock_skew: Duration, - /// The authentication token for inter-aggregator communication in this task. If `role` is - /// Helper, this token is used by the aggregator to authenticate requests from the Leader. Not - /// set if `role` is Leader.. - // TODO(#1509): This field will have to change as Janus helpers will only store a salted - // hash of aggregator auth tokens. + /// The authentication token for inter-aggregator communication in this task. Only set in the + /// initial response to a task creation request and only when the role is helper. Subsequent + /// `TaskResp`s obtained from `GET /tasks/:task_id` will not contain the authentication token. pub(crate) aggregator_auth_token: Option, /// The authentication token used by the task's Collector to authenticate to the Leader. /// `Some` if `role` is Leader, `None` otherwise. @@ -149,7 +147,7 @@ impl TryFrom<&AggregatorTask> for TaskResp { min_batch_size: task.min_batch_size(), time_precision: *task.time_precision(), tolerable_clock_skew: *task.tolerable_clock_skew(), - aggregator_auth_token: task.aggregator_auth_token().cloned(), + aggregator_auth_token: None, collector_auth_token: task.collector_auth_token().cloned(), collector_hpke_config: task .collector_hpke_config() diff --git a/aggregator_api/src/routes.rs b/aggregator_api/src/routes.rs index 95d7f67b2..55f97a07e 100644 --- a/aggregator_api/src/routes.rs +++ b/aggregator_api/src/routes.rs @@ -14,7 +14,9 @@ use janus_aggregator_core::{ taskprov::PeerAggregator, SecretBytes, }; -use janus_core::{hpke::generate_hpke_config_and_private_key, time::Clock}; +use janus_core::{ + auth_tokens::AuthenticationTokenHash, hpke::generate_hpke_config_and_private_key, time::Clock, +}; use janus_messages::HpkeConfigId; use janus_messages::{ query_type::Code as SupportedQueryType, Duration, HpkeAeadId, HpkeKdfId, HpkeKemId, Role, @@ -103,7 +105,7 @@ pub(super) async fn post_task( let vdaf_verify_key = SecretBytes::new(vdaf_verify_key_bytes); - let aggregator_parameters = match req.role { + let (aggregator_auth_token, aggregator_parameters) = match req.role { Role::Leader => { let aggregator_auth_token = req.aggregator_auth_token.ok_or_else(|| { Error::BadRequest( @@ -111,11 +113,14 @@ pub(super) async fn post_task( .to_string(), ) })?; - AggregatorTaskParameters::Leader { - aggregator_auth_token, - collector_auth_token: random(), - collector_hpke_config: req.collector_hpke_config, - } + ( + None, + AggregatorTaskParameters::Leader { + aggregator_auth_token, + collector_auth_token: random(), + collector_hpke_config: req.collector_hpke_config, + }, + ) } Role::Helper => { @@ -126,10 +131,15 @@ pub(super) async fn post_task( )); } - AggregatorTaskParameters::Helper { - aggregator_auth_token: random(), - collector_hpke_config: req.collector_hpke_config, - } + let aggregator_auth_token = random(); + let aggregator_auth_token_hash = AuthenticationTokenHash::from(&aggregator_auth_token); + ( + Some(aggregator_auth_token), + AggregatorTaskParameters::Helper { + aggregator_auth_token_hash, + collector_hpke_config: req.collector_hpke_config, + }, + ) } _ => unreachable!(), @@ -194,9 +204,15 @@ pub(super) async fn post_task( }) .await?; - Ok(Json( - TaskResp::try_from(task.as_ref()).map_err(|err| Error::Internal(err.to_string()))?, - )) + let mut task_resp = + TaskResp::try_from(task.as_ref()).map_err(|err| Error::Internal(err.to_string()))?; + + // When creating a new task in the helper, we must put the unhashed aggregator auth token in the + // response so that divviup-api can later provide it to the leader, but the helper doesn't store + // the unhashed token and can't later provide it. + task_resp.aggregator_auth_token = aggregator_auth_token; + + Ok(Json(task_resp)) } pub(super) async fn get_task( diff --git a/aggregator_api/src/tests.rs b/aggregator_api/src/tests.rs index 01765e066..a0da39b97 100644 --- a/aggregator_api/src/tests.rs +++ b/aggregator_api/src/tests.rs @@ -7,6 +7,7 @@ use crate::{ }, Config, CONTENT_TYPE, }; +use assert_matches::assert_matches; use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine}; use futures::future::try_join_all; use janus_aggregator_core::{ @@ -305,7 +306,7 @@ async fn post_task_helper_no_optional_fields() { .run_async(&handler) .await; assert_status!(conn, Status::Ok); - let got_task_resp: TaskResp = serde_json::from_slice( + let mut got_task_resp: TaskResp = serde_json::from_slice( &conn .take_response_body() .unwrap() @@ -315,6 +316,13 @@ async fn post_task_helper_no_optional_fields() { ) .unwrap(); + // Task creation response will include the aggregator auth token, but it won't be in the + // datastore or subsequent TaskResps. The token should be a Bearer token. + assert_matches!( + got_task_resp.aggregator_auth_token, + Some(AuthenticationToken::Bearer(_)) + ); + let got_task = ds .run_tx(|tx| { let got_task_resp = got_task_resp.clone(); @@ -336,14 +344,16 @@ async fn post_task_helper_no_optional_fields() { assert_eq!(req.task_expiration.as_ref(), got_task.task_expiration()); assert_eq!(req.min_batch_size, got_task.min_batch_size()); assert_eq!(&req.time_precision, got_task.time_precision()); - assert!(got_task.aggregator_auth_token().is_some()); + assert!(got_task.aggregator_auth_token().is_none()); assert!(got_task.collector_auth_token().is_none()); assert_eq!( &req.collector_hpke_config, got_task.collector_hpke_config().unwrap() ); - // ...and the response. + // ...and the response. Clear the aggregator auth token from got_task_resp or it won't match + // what's in the datastore, as the helper only stores the auth token _hash_. + got_task_resp.aggregator_auth_token = None; assert_eq!(got_task_resp, TaskResp::try_from(&got_task).unwrap()); } @@ -598,14 +608,17 @@ async fn post_task_leader_no_aggregator_auth_token() { ); } +#[rstest::rstest] +#[case::leader(Role::Leader)] +#[case::helper(Role::Helper)] #[tokio::test] -async fn get_task() { +async fn get_task(#[case] role: Role) { // Setup: write a task to the datastore. let (handler, _ephemeral_datastore, ds) = setup_api_test().await; let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake) .build() - .leader_view() + .view_for_role(role) .unwrap(); ds.run_tx(|tx| { @@ -1849,19 +1862,7 @@ fn task_resp_serialization() { Token::NewtypeStruct { name: "Duration" }, Token::U64(60), Token::Str("aggregator_auth_token"), - Token::Some, - Token::Struct { - name: "AuthenticationToken", - len: 2, - }, - Token::Str("type"), - Token::UnitVariant { - name: "AuthenticationToken", - variant: "DapAuth", - }, - Token::Str("token"), - Token::Str("Y29sbGVjdG9yLWFiY2RlZjAw"), - Token::StructEnd, + Token::None, Token::Str("collector_auth_token"), Token::Some, Token::Struct { diff --git a/aggregator_core/Cargo.toml b/aggregator_core/Cargo.toml index 7d5091eea..74788585e 100644 --- a/aggregator_core/Cargo.toml +++ b/aggregator_core/Cargo.toml @@ -56,7 +56,7 @@ url = { version = "2.4.1", features = ["serde"] } uuid = { version = "1.4.1", features = ["v4"] } [dev-dependencies] -assert_matches = "1" +assert_matches.workspace = true hyper = "0.14.27" janus_aggregator_core = { path = ".", features = ["test-util"] } janus_core = { workspace = true, features = ["test-util"] } diff --git a/aggregator_core/src/datastore.rs b/aggregator_core/src/datastore.rs index 1911a6bc9..358f14bbe 100644 --- a/aggregator_core/src/datastore.rs +++ b/aggregator_core/src/datastore.rs @@ -534,10 +534,10 @@ impl Transaction<'_, C> { task_id, aggregator_role, peer_aggregator_endpoint, query_type, vdaf, max_batch_query_count, task_expiration, report_expiry_age, min_batch_size, time_precision, tolerable_clock_skew, collector_hpke_config, vdaf_verify_key, - aggregator_auth_token_type, aggregator_auth_token, collector_auth_token_type, - collector_auth_token) + aggregator_auth_token_type, aggregator_auth_token, aggregator_auth_token_hash, + collector_auth_token_type, collector_auth_token) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17 + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18 ) ON CONFLICT DO NOTHING", ) @@ -582,7 +582,11 @@ impl Transaction<'_, C> { /* aggregator_auth_token_type */ &task .aggregator_auth_token() - .map(AuthenticationTokenType::from), + .map(AuthenticationTokenType::from) + .or_else(|| { + task.aggregator_auth_token_hash() + .map(AuthenticationTokenType::from) + }), /* aggregator_auth_token */ &task .aggregator_auth_token() @@ -595,6 +599,10 @@ impl Transaction<'_, C> { ) }) .transpose()?, + /* aggregator_auth_token_hash */ + &task + .aggregator_auth_token_hash() + .map(|token_hash| token_hash.as_ref()), /* collector_auth_token_type */ &task .collector_auth_token() @@ -684,8 +692,8 @@ impl Transaction<'_, C> { "SELECT aggregator_role, peer_aggregator_endpoint, query_type, vdaf, max_batch_query_count, task_expiration, report_expiry_age, min_batch_size, time_precision, tolerable_clock_skew, collector_hpke_config, vdaf_verify_key, - aggregator_auth_token_type, aggregator_auth_token, collector_auth_token_type, - collector_auth_token + aggregator_auth_token_type, aggregator_auth_token, aggregator_auth_token_hash, + collector_auth_token_type, collector_auth_token FROM tasks WHERE task_id = $1", ) .await?; @@ -713,8 +721,8 @@ impl Transaction<'_, C> { "SELECT task_id, aggregator_role, peer_aggregator_endpoint, query_type, vdaf, max_batch_query_count, task_expiration, report_expiry_age, min_batch_size, time_precision, tolerable_clock_skew, collector_hpke_config, vdaf_verify_key, - aggregator_auth_token_type, aggregator_auth_token, collector_auth_token_type, - collector_auth_token + aggregator_auth_token_type, aggregator_auth_token, aggregator_auth_token_hash, + collector_auth_token_type, collector_auth_token FROM tasks", ) .await?; @@ -799,9 +807,12 @@ impl Transaction<'_, C> { ) .map(SecretBytes::new)?; + let aggregator_auth_token_type: Option = + row.try_get("aggregator_auth_token_type")?; + let aggregator_auth_token = row .try_get::<_, Option>>("aggregator_auth_token")? - .zip(row.try_get::<_, Option>("aggregator_auth_token_type")?) + .zip(aggregator_auth_token_type) .map(|(encrypted_token, token_type)| { token_type.as_authentication(&self.crypter.decrypt( "tasks", @@ -812,6 +823,12 @@ impl Transaction<'_, C> { }) .transpose()?; + let aggregator_auth_token_hash = row + .try_get::<_, Option>>("aggregator_auth_token_hash")? + .zip(aggregator_auth_token_type) + .map(|(token_hash, token_type)| token_type.as_authentication_token_hash(&token_hash)) + .transpose()?; + let collector_auth_token = row .try_get::<_, Option>>("collector_auth_token")? .zip(row.try_get::<_, Option>("collector_auth_token_type")?) @@ -849,12 +866,14 @@ impl Transaction<'_, C> { let aggregator_parameters = match ( aggregator_role, aggregator_auth_token, + aggregator_auth_token_hash, collector_auth_token, collector_hpke_config, ) { ( AggregatorRole::Leader, Some(aggregator_auth_token), + None, Some(collector_auth_token), Some(collector_hpke_config), ) => AggregatorTaskParameters::Leader { @@ -864,14 +883,17 @@ impl Transaction<'_, C> { }, ( AggregatorRole::Helper, - Some(aggregator_auth_token), + None, + Some(aggregator_auth_token_hash), None, Some(collector_hpke_config), ) => AggregatorTaskParameters::Helper { - aggregator_auth_token, + aggregator_auth_token_hash, collector_hpke_config, }, - (AggregatorRole::Helper, None, None, None) => AggregatorTaskParameters::TaskprovHelper, + (AggregatorRole::Helper, None, None, None, None) => { + AggregatorTaskParameters::TaskprovHelper + } values => { return Err(Error::DbState(format!( "found task row with unexpected combination of values {values:?}", diff --git a/aggregator_core/src/datastore/models.rs b/aggregator_core/src/datastore/models.rs index db2b709b9..6403a1603 100644 --- a/aggregator_core/src/datastore/models.rs +++ b/aggregator_core/src/datastore/models.rs @@ -5,7 +5,7 @@ use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD}; use chrono::NaiveDateTime; use derivative::Derivative; use janus_core::{ - auth_tokens::AuthenticationToken, + auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, hpke::HpkeKeypair, report_id::ReportIdChecksumExt, time::{DurationExt, IntervalExt, TimeExt}, @@ -59,6 +59,19 @@ impl AuthenticationTokenType { } .map_err(|e| Error::DbState(format!("invalid DAP auth token in database: {e:?}"))) } + + pub fn as_authentication_token_hash( + &self, + hash_bytes: &[u8], + ) -> Result { + let hash_array = hash_bytes + .try_into() + .map_err(|e| Error::DbState(format!("invalid auth token hash in database: {e:?}")))?; + Ok(match self { + Self::DapAuthToken => AuthenticationTokenHash::DapAuth(hash_array), + Self::AuthorizationBearerToken => AuthenticationTokenHash::Bearer(hash_array), + }) + } } impl From<&AuthenticationToken> for AuthenticationTokenType { @@ -71,6 +84,16 @@ impl From<&AuthenticationToken> for AuthenticationTokenType { } } +impl From<&AuthenticationTokenHash> for AuthenticationTokenType { + fn from(value: &AuthenticationTokenHash) -> Self { + match value { + AuthenticationTokenHash::DapAuth(_) => Self::DapAuthToken, + AuthenticationTokenHash::Bearer(_) => Self::AuthorizationBearerToken, + _ => unreachable!(), + } + } +} + /// Represents a report as it is stored in the leader's database, corresponding to a row in /// `client_reports`, where `leader_input_share` and `helper_encrypted_input_share` are required /// to be populated. diff --git a/aggregator_core/src/task.rs b/aggregator_core/src/task.rs index e44521ee7..293215c57 100644 --- a/aggregator_core/src/task.rs +++ b/aggregator_core/src/task.rs @@ -415,12 +415,19 @@ impl AggregatorTask { } } - /// Returns the aggregator authentication token for this task, or `None` for taskprov tasks. - /// TODO(#1509): add `fn aggregator_auth_token_hash(&self) -> Option<&AuthenticationTokenHash>` + /// Returns the aggregator [`AuthenticationToken`] for this task, used by the leader to + /// authenticate aggregation sub-protocol requests sent to the helper, or `None` for the helper. pub fn aggregator_auth_token(&self) -> Option<&AuthenticationToken> { self.aggregator_parameters.aggregator_auth_token() } + /// Returns the aggregator [`AuthenticationTokenHash`] for this task, used by the helper to + /// authenticate aggregation sub-protocol requests received from the leader, or `None` for the + /// leader. + pub fn aggregator_auth_token_hash(&self) -> Option<&AuthenticationTokenHash> { + self.aggregator_parameters.aggregator_auth_token_hash() + } + /// Returns the collector HPKE configuration for this task, or `None` for taskprov tasks. pub fn collector_hpke_config(&self) -> Option<&HpkeConfig> { self.aggregator_parameters.collector_hpke_config() @@ -453,11 +460,7 @@ impl AggregatorTask { &self, incoming_auth_token: Option<&AuthenticationToken>, ) -> bool { - // TODO(#1509): leader should hold only an AuthenticationToken and refuse to use it for - // incoming token validation. Helper should hold only an AuthenticationTokenHash, making the - // AuthenticationTokenHash::from call here unnecessary. - self.aggregator_auth_token() - .map(AuthenticationTokenHash::from) + self.aggregator_auth_token_hash() .zip(incoming_auth_token) .map(|(own_token_hash, incoming_token)| own_token_hash.validate(incoming_token)) .unwrap_or(false) @@ -490,17 +493,15 @@ pub enum AggregatorTaskParameters { aggregator_auth_token: AuthenticationToken, /// Authentication token used to validate requests from the collector during the collection /// sub-protocol. - /// TODO(#1509): make this an AuthenticationTokenHash collector_auth_token: AuthenticationToken, /// HPKE configuration for the collector. collector_hpke_config: HpkeConfig, }, /// Task parameters held exclusively by the DAP helper. Helper { - /// Authentication token used to validate requests from the leader during the aggregation - /// sub-protocol. - /// TODO(#1509): make this an AuthenticationTokenHash - aggregator_auth_token: AuthenticationToken, + /// Authentication token hash used to validate requests from the leader during the + /// aggregation sub-protocol. + aggregator_auth_token_hash: AuthenticationTokenHash, /// HPKE configuration for the collector. collector_hpke_config: HpkeConfig, }, @@ -518,22 +519,31 @@ impl AggregatorTaskParameters { } } - /// Returns the aggregator authentication token for this task, or `None` for taskprov tasks. - /// TODO(#1509): add `fn aggregator_auth_token_hash(&self) -> Option<&AuthenticationTokenHash>` + /// Returns the aggregator [`AuthenticationToken`] for this task, used by the leader to + /// authenticate aggregation sub-protocol requests sent to the helper, or `None` for the helper. fn aggregator_auth_token(&self) -> Option<&AuthenticationToken> { match self { Self::Leader { aggregator_auth_token, .. - } - | Self::Helper { - aggregator_auth_token, - .. } => Some(aggregator_auth_token), _ => None, } } + /// Returns the aggregator [`AuthenticationTokenHash`] for this task, used by the helper to + /// authenticate aggregation sub-protocol requests received from the leader, or `None` for the + /// leader. + fn aggregator_auth_token_hash(&self) -> Option<&AuthenticationTokenHash> { + match self { + Self::Helper { + aggregator_auth_token_hash, + .. + } => Some(aggregator_auth_token_hash), + _ => None, + } + } + /// Returns the collector HPKE configuration for this task, or `None` for taskprov tasks. fn collector_hpke_config(&self) -> Option<&HpkeConfig> { match self { @@ -580,6 +590,7 @@ pub struct SerializedAggregatorTask { tolerable_clock_skew: Duration, collector_hpke_config: HpkeConfig, aggregator_auth_token: Option, + aggregator_auth_token_hash: Option, collector_auth_token: Option, hpke_keys: Vec, // uses unpadded base64url } @@ -595,8 +606,7 @@ impl SerializedAggregatorTask { /// /// - Task ID /// - VDAF verify key - /// - Aggregator authentication token - /// - Collector authentication token (only if the task's role is leader) + /// - Aggregator authentication token (only if the task's role is helper) /// - The aggregator's HPKE keypair (only one keypair is generated) pub fn generate_missing_fields(&mut self) { if self.task_id.is_none() { @@ -615,14 +625,10 @@ impl SerializedAggregatorTask { self.vdaf_verify_key = Some(URL_SAFE_NO_PAD.encode(vdaf_verify_key.as_ref())); } - if self.aggregator_auth_token.is_none() { + if self.aggregator_auth_token.is_none() && self.role == Role::Helper { self.aggregator_auth_token = Some(random()); } - if self.collector_auth_token.is_none() && self.role == Role::Leader { - self.collector_auth_token = Some(random()); - } - if self.hpke_keys.is_empty() { // Unwrap safety: we always use a supported KEM. let hpke_keypair = generate_hpke_config_and_private_key( @@ -661,6 +667,10 @@ impl Serialize for AggregatorTask { .expect("serializable tasks must have collector_hpke_config") .clone(), aggregator_auth_token: self.aggregator_parameters.aggregator_auth_token().cloned(), + aggregator_auth_token_hash: self + .aggregator_parameters + .aggregator_auth_token_hash() + .cloned(), collector_auth_token: self.aggregator_parameters.collector_auth_token().cloned(), hpke_keys, } @@ -693,9 +703,9 @@ impl TryFrom for AggregatorTask { collector_hpke_config: serialized_task.collector_hpke_config, }, Role::Helper => AggregatorTaskParameters::Helper { - aggregator_auth_token: serialized_task - .aggregator_auth_token - .ok_or(Error::InvalidParameter("missing aggregator auth token"))?, + aggregator_auth_token_hash: serialized_task.aggregator_auth_token_hash.ok_or( + Error::InvalidParameter("missing aggregator auth token hash"), + )?, collector_hpke_config: serialized_task.collector_hpke_config, }, _ => return Err(Error::InvalidParameter("unexpected role")), @@ -739,7 +749,7 @@ pub mod test_util { }; use derivative::Derivative; use janus_core::{ - auth_tokens::AuthenticationToken, + auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, hpke::{ test_util::{ generate_test_hpke_config_and_private_key, @@ -1008,7 +1018,9 @@ pub mod test_util { self.leader_aggregator_endpoint.clone(), self.helper_hpke_keys.values().cloned().collect::>(), AggregatorTaskParameters::Helper { - aggregator_auth_token: self.aggregator_auth_token.clone(), + aggregator_auth_token_hash: AuthenticationTokenHash::from( + &self.aggregator_auth_token, + ), collector_hpke_config: self.collector_hpke_keypair.config().clone(), }, ) @@ -1286,7 +1298,7 @@ mod tests { }; use assert_matches::assert_matches; use janus_core::{ - auth_tokens::AuthenticationToken, + auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, hpke::{HpkeKeypair, HpkePrivateKey}, test_util::roundtrip_encoding, time::DurationExt, @@ -1375,6 +1387,34 @@ mod tests { } } + #[test] + fn request_authentication() { + let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Prio3Count).build(); + + let leader_task = task.leader_view().unwrap(); + let helper_task = task.helper_view().unwrap(); + + let incorrect_auth_token = random(); + + // Helper should accept valid aggregator auth token + assert!(helper_task.check_aggregator_auth_token(Some(task.aggregator_auth_token()))); + // Leader should accept valid collector auth token + assert!(leader_task.check_collector_auth_token(Some(task.collector_auth_token()))); + + // Leader should reject absent collector auth token + assert!(!leader_task.check_collector_auth_token(None)); + // Helper should reject absent aggregator auth token + assert!(!helper_task.check_aggregator_auth_token(None)); + // Leader should not be able to validate aggregation sub protocol requests + assert!(!leader_task.check_aggregator_auth_token(Some(task.aggregator_auth_token()))); + // Helper should not be able to validate collection sub protocol requests + assert!(!helper_task.check_collector_auth_token(Some(task.collector_auth_token()))); + // Incorrect collector token should be rejected by leader + assert!(!leader_task.check_collector_auth_token(Some(&incorrect_auth_token))); + // Incorrect aggregator token should be rejected by helper + assert!(!helper_task.check_aggregator_auth_token(Some(&incorrect_auth_token))); + } + #[test] fn aggregator_task_serde() { assert_tokens( @@ -1422,7 +1462,7 @@ mod tests { &[ Token::Struct { name: "SerializedAggregatorTask", - len: 16, + len: 17, }, Token::Str("task_id"), Token::Some, @@ -1503,6 +1543,8 @@ mod tests { Token::Str("token"), Token::Str("YWdncmVnYXRvciB0b2tlbg"), Token::StructEnd, + Token::Str("aggregator_auth_token_hash"), + Token::None, Token::Str("collector_auth_token"), Token::Some, Token::Struct { @@ -1589,10 +1631,12 @@ mod tests { HpkePrivateKey::new(b"helper hpke private key".to_vec()), )], AggregatorTaskParameters::Helper { - aggregator_auth_token: AuthenticationToken::new_bearer_token_from_string( - "YWdncmVnYXRvciB0b2tlbg", - ) - .unwrap(), + aggregator_auth_token_hash: AuthenticationTokenHash::from( + &AuthenticationToken::new_bearer_token_from_string( + "YWdncmVnYXRvciB0b2tlbg", + ) + .unwrap(), + ), collector_hpke_config: HpkeConfig::new( HpkeConfigId::from(8), HpkeKemId::X25519HkdfSha256, @@ -1606,7 +1650,7 @@ mod tests { &[ Token::Struct { name: "SerializedAggregatorTask", - len: 16, + len: 17, }, Token::Str("task_id"), Token::Some, @@ -1688,18 +1732,20 @@ mod tests { Token::Str("Y29sbGVjdG9yIGhwa2UgcHVibGljIGtleQ"), Token::StructEnd, Token::Str("aggregator_auth_token"), + Token::None, + Token::Str("aggregator_auth_token_hash"), Token::Some, Token::Struct { - name: "AuthenticationToken", + name: "AuthenticationTokenHash", len: 2, }, Token::Str("type"), Token::UnitVariant { - name: "AuthenticationToken", + name: "AuthenticationTokenHash", variant: "Bearer", }, - Token::Str("token"), - Token::Str("YWdncmVnYXRvciB0b2tlbg"), + Token::Str("hash"), + Token::Str("MJOoBO_ysLEuG_lv2C37eEOf1Ngetsr-Ers0ZYj4vdQ"), Token::StructEnd, Token::Str("collector_auth_token"), Token::None, diff --git a/client/Cargo.toml b/client/Cargo.toml index 530ca7ac2..2048bfe9c 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -25,7 +25,7 @@ tracing = "0.1.37" url = "2.4.1" [dev-dependencies] -assert_matches = "1" +assert_matches.workspace = true hex-literal = "0.4.1" janus_core = { workspace = true, features = ["test-util"]} mockito = "1.2.0" diff --git a/collector/Cargo.toml b/collector/Cargo.toml index f32faa65a..8deee0a84 100644 --- a/collector/Cargo.toml +++ b/collector/Cargo.toml @@ -35,7 +35,7 @@ tracing = "0.1.37" url = "2.4.1" [dev-dependencies] -assert_matches = "1" +assert_matches.workspace = true janus_collector = { path = ".", features = ["fpvec_bounded_l2", "test-util"] } janus_core = { workspace = true, features = ["fpvec_bounded_l2", "test-util"] } mockito = "1.2.0" diff --git a/db/00000000000001_initial_schema.up.sql b/db/00000000000001_initial_schema.up.sql index dad14e3b4..110b911a0 100644 --- a/db/00000000000001_initial_schema.up.sql +++ b/db/00000000000001_initial_schema.up.sql @@ -93,9 +93,16 @@ CREATE TABLE tasks( -- Authentication token used to authenticate messages to/from the other aggregator. -- These columns are NULL if the task was provisioned by taskprov. aggregator_auth_token_type AUTH_TOKEN_TYPE, -- the type of the authentication token - aggregator_auth_token BYTEA, -- encrypted bearer token - -- The aggregator_auth_token columns must either both be NULL or both be non-NULL - CONSTRAINT aggregator_auth_token_null CHECK ((aggregator_auth_token_type IS NULL) = (aggregator_auth_token IS NULL)), + aggregator_auth_token BYTEA, -- encrypted bearer token (only set for leader) + aggregator_auth_token_hash BYTEA, -- hash of the token (only set for helper) + CONSTRAINT aggregator_auth_token_null CHECK ( + -- If aggregator_auth_token_type is not NULL, then exactly one of aggregator_auth_token or + -- aggregator_auth_token_hash must be not NULL. + ((aggregator_auth_token_type IS NOT NULL) AND (aggregator_auth_token IS NULL) != (aggregator_auth_token_hash IS NULL)) + -- If aggregator_auth_token_type is NULL, then both aggregator_auth_token and + -- aggregator_auth_token_hash must be NULL + OR ((aggregator_auth_token_type IS NULL) AND (aggregator_auth_token IS NULL) AND (aggregator_auth_token_hash IS NULL)) + ), -- Authentication token used to authenticate messages to the leader from the collector. These -- columns are NULL if the task was provisioned by taskprov or if the task's role is helper. diff --git a/docs/samples/tasks.yaml b/docs/samples/tasks.yaml index 44fe5ebff..2c003e628 100644 --- a/docs/samples/tasks.yaml +++ b/docs/samples/tasks.yaml @@ -58,11 +58,8 @@ aead_id: Aes128Gcm public_key: 4qiv6IY5jrjCV3xbaQXULmPIpvoIml1oJmeXm-yOuAo - # Authentication token shared beteween the aggregators, and used to - # authenticate leader-to-helper requests. In the case of a leader-role task, - # the leader will include the token in a header when making requests to the - # helper. In the case of a helper-role task, the helper will accept requests - # requests with authentication tokens. + # Authentication token hash used by the leader to authenticate requests made + # to the helper. This value should only be included in leader-role tasks. # # Each token's `type` governs how it is inserted into HTTP requests if used by # the leader to authenticate a request to the helper. @@ -118,9 +115,18 @@ kdf_id: HkdfSha256 aead_id: Aes128Gcm public_key: KHRLcWgfWxli8cdOLPsgsZPttHXh0ho3vLVLrW-63lE - aggregator_auth_token: + # Authentication token hash used by the helper to authenticate requests + # received from the leader. This value should only be included in helper-role + # tasks. + # + # The `type` corresponds to the `type` of an `aggregator_auth_token` stanza, + # and the helper will only accept the auth token if it is presented in a + # request in the indicated manner. + # + # `hash` is the SHA-256 hash of the token value, encoded in unpadded base64url. + aggregator_auth_token_hash: type: "Bearer" - token: "YWdncmVnYXRvci1jZmE4NDMyZjdkMzllMjZiYjU3OGUzMzY5Mzk1MWQzNQ" + hash: "MJOoBO_ysLEuG_lv2C37eEOf1Ngetsr-Ers0ZYj4vdQ" # Note that this task does not have a collector authentication token, since # it is a helper role task. collector_auth_token: diff --git a/interop_binaries/src/bin/janus_interop_aggregator.rs b/interop_binaries/src/bin/janus_interop_aggregator.rs index a9353798e..97bbbe13d 100644 --- a/interop_binaries/src/bin/janus_interop_aggregator.rs +++ b/interop_binaries/src/bin/janus_interop_aggregator.rs @@ -11,7 +11,10 @@ use janus_aggregator_core::{ task::{self, AggregatorTask, AggregatorTaskParameters}, SecretBytes, }; -use janus_core::{auth_tokens::AuthenticationToken, time::RealClock}; +use janus_core::{ + auth_tokens::{AuthenticationToken, AuthenticationTokenHash}, + time::RealClock, +}; use janus_interop_binaries::{ status::{ERROR, SUCCESS}, AddTaskResponse, AggregatorAddTaskRequest, AggregatorRole, HpkeConfigRegistry, Keyring, @@ -73,7 +76,7 @@ async fn handle_add_task( } } (AggregatorRole::Helper, _) => AggregatorTaskParameters::Helper { - aggregator_auth_token: leader_authentication_token, + aggregator_auth_token_hash: AuthenticationTokenHash::from(&leader_authentication_token), collector_hpke_config, }, }; diff --git a/messages/Cargo.toml b/messages/Cargo.toml index 1a25acebe..59804677f 100644 --- a/messages/Cargo.toml +++ b/messages/Cargo.toml @@ -27,5 +27,5 @@ thiserror.workspace = true url = "2.4.1" [dev-dependencies] -assert_matches = "1" +assert_matches.workspace = true serde_test.workspace = true diff --git a/tools/Cargo.toml b/tools/Cargo.toml index a80c291b2..84152a6a0 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -31,7 +31,7 @@ tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"] url = "2.4.1" [dev-dependencies] -assert_matches = "1" +assert_matches.workspace = true cfg-if = "1.0.0" janus_core = { workspace = true, features = ["test-util"] } rand = "0.8"