Skip to content

Commit

Permalink
only store collector auth token hash in leader
Browse files Browse the repository at this point in the history
The leader only needs the hash of the collector auth token in order to
authenticate collection sub-protocol requests and thus we can avoid the
risk of storing the unhashed token.

This commit changes the `tasks` table to store a hash instead of the
token, and makes the corresponding change to
`janus_aggregator_core::task::AggregatorTask`. There are corresponding
changes to the aggregator API: the leader must now be provided the
collector auth token hash during task creation, and `TaskResp` now only
contains a collector auth token hash.

Part of #1509, #1524
  • Loading branch information
tgeoghegan committed Oct 2, 2023
1 parent 772549c commit 80dfa34
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 112 deletions.
8 changes: 4 additions & 4 deletions aggregator/src/bin/janus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,9 @@ mod tests {
aggregator_auth_token:
type: Bearer
token: Y29sbGVjdG9yLWFiZjU0MDhlMmIxNjAxODMxNjI1YWYzOTU5MTA2NDU4
collector_auth_token:
collector_auth_token_hash:
type: Bearer
token: Y29sbGVjdG9yLWFiZjU0MDhlMmIxNjAxODMxNjI1YWYzOTU5MTA2NDU4
hash: MJOoBO_ysLEuG_lv2C37eEOf1Ngetsr-Ers0ZYj4vdQ
hpke_keys: []
- peer_aggregator_endpoint: https://leader
query_type: TimeInterval
Expand Down Expand Up @@ -801,8 +801,8 @@ mod tests {

for task in &got_tasks {
match task.role() {
Role::Leader => assert!(task.collector_auth_token().is_some()),
Role::Helper => assert!(task.collector_auth_token().is_none()),
Role::Leader => assert!(task.collector_auth_token_hash().is_some()),
Role::Helper => assert!(task.collector_auth_token_hash().is_none()),
role => panic!("unexpected role {role}"),
}
}
Expand Down
16 changes: 9 additions & 7 deletions aggregator_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use janus_aggregator_core::{
task::{AggregatorTask, QueryType},
taskprov::{PeerAggregator, VerifyKeyInit},
};
use janus_core::{auth_tokens::AuthenticationToken, vdaf::VdafInstance};
use janus_core::{
auth_tokens::{AuthenticationToken, AuthenticationTokenHash},
vdaf::VdafInstance,
};
use janus_messages::{
query_type::Code as SupportedQueryType, Duration, HpkeAeadId, HpkeConfig, HpkeKdfId, HpkeKemId,
Role, TaskId, Time,
Expand All @@ -28,6 +31,7 @@ pub(crate) struct AggregatorApiConfig {
pub role: AggregatorRole,
pub vdafs: Vec<SupportedVdaf>,
pub query_types: Vec<SupportedQueryType>,
pub features: &'static [&'static str],
}

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -76,6 +80,10 @@ pub(crate) struct PostTaskReq {
/// If this aggregator is the leader, this is the token to use to authenticate requests to
/// the helper. If this aggregator is the helper, the value is `None`.
pub(crate) aggregator_auth_token: Option<AuthenticationToken>,
/// If this aggregator is the leader, this is the token hash used to authenticate collection
/// sub-protocol requests received from the helper. If this aggregator is the helper, the value
/// is `None`.
pub(crate) collector_auth_token_hash: Option<AuthenticationTokenHash>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -112,11 +120,6 @@ pub(crate) struct TaskResp {
/// initial response to a task creation request and only when the role is helper. Subsequent
/// `TaskResp`s obtained from `GET /tasks/:task_id` will not contain the authentication token.
pub(crate) aggregator_auth_token: Option<AuthenticationToken>,
/// The authentication token used by the task's Collector to authenticate to the Leader.
/// `Some` if `role` is Leader, `None` otherwise.
// TODO(#1509) This field will have to change as Janus leaders will only store a salted hash
// of collector auth tokens.
pub(crate) collector_auth_token: Option<AuthenticationToken>,
/// HPKE configuration used by the collector to decrypt aggregate shares.
pub(crate) collector_hpke_config: HpkeConfig,
/// HPKE configuration(s) used by this aggregator to decrypt report shares.
Expand Down Expand Up @@ -148,7 +151,6 @@ impl TryFrom<&AggregatorTask> for TaskResp {
time_precision: *task.time_precision(),
tolerable_clock_skew: *task.tolerable_clock_skew(),
aggregator_auth_token: None,
collector_auth_token: task.collector_auth_token().cloned(),
collector_hpke_config: task
.collector_hpke_config()
.ok_or("collector_hpke_config is required")?
Expand Down
10 changes: 9 additions & 1 deletion aggregator_api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub(super) async fn get_config(
SupportedQueryType::TimeInterval,
SupportedQueryType::FixedSize,
],
// Unconditionally indicate to divviup-api that we support collector auth token hashes
features: &["token-hash"],
})
}

Expand Down Expand Up @@ -113,11 +115,17 @@ pub(super) async fn post_task<C: Clock>(
.to_string(),
)
})?;
let collector_auth_token_hash = req.collector_auth_token_hash.ok_or_else(|| {
Error::BadRequest(
"aggregator acting in leader role must be provided a collector auth token"
.to_string(),
)
})?;
(
None,
AggregatorTaskParameters::Leader {
aggregator_auth_token,
collector_auth_token: random(),
collector_auth_token_hash,
collector_hpke_config: req.collector_hpke_config,
},
)
Expand Down
70 changes: 44 additions & 26 deletions aggregator_api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use janus_aggregator_core::{
SecretBytes,
};
use janus_core::{
auth_tokens::AuthenticationToken,
auth_tokens::{AuthenticationToken, AuthenticationTokenHash},
hpke::{
generate_hpke_config_and_private_key,
test_util::{
Expand Down Expand Up @@ -86,7 +86,8 @@ async fn get_config() {
concat!(
r#"{"protocol":"DAP-07","dap_url":"https://dap.url/","role":"Either","vdafs":"#,
r#"["Prio3Count","Prio3Sum","Prio3Histogram","Prio3CountVec","Prio3SumVec"],"#,
r#""query_types":["TimeInterval","FixedSize"]}"#
r#""query_types":["TimeInterval","FixedSize"],"#,
r#""features":["token-hash"]}"#,
)
);
}
Expand Down Expand Up @@ -214,6 +215,7 @@ async fn post_task_bad_role() {
.config()
.clone(),
aggregator_auth_token: Some(aggregator_auth_token),
collector_auth_token_hash: Some(AuthenticationTokenHash::from(&random())),
};
assert_response!(
post("/tasks")
Expand Down Expand Up @@ -255,6 +257,7 @@ async fn post_task_unauthorized() {
.config()
.clone(),
aggregator_auth_token: Some(aggregator_auth_token),
collector_auth_token_hash: Some(AuthenticationTokenHash::from(&random())),
};
assert_response!(
post("/tasks")
Expand Down Expand Up @@ -297,6 +300,7 @@ async fn post_task_helper_no_optional_fields() {
.config()
.clone(),
aggregator_auth_token: None,
collector_auth_token_hash: None,
};
let mut conn = post("/tasks")
.with_request_body(serde_json::to_vec(&req).unwrap())
Expand Down Expand Up @@ -345,7 +349,7 @@ async fn post_task_helper_no_optional_fields() {
assert_eq!(req.min_batch_size, got_task.min_batch_size());
assert_eq!(&req.time_precision, got_task.time_precision());
assert!(got_task.aggregator_auth_token().is_none());
assert!(got_task.collector_auth_token().is_none());
assert!(got_task.collector_auth_token_hash().is_none());
assert_eq!(
&req.collector_hpke_config,
got_task.collector_hpke_config().unwrap()
Expand Down Expand Up @@ -386,6 +390,7 @@ async fn post_task_helper_with_aggregator_auth_token() {
.config()
.clone(),
aggregator_auth_token: Some(aggregator_auth_token),
collector_auth_token_hash: None,
};
assert_response!(
post("/tasks")
Expand Down Expand Up @@ -429,6 +434,8 @@ async fn post_task_idempotence() {
.config()
.clone(),
aggregator_auth_token: Some(aggregator_auth_token.clone()),

collector_auth_token_hash: Some(AuthenticationTokenHash::from(&random())),
};

let post_task = || async {
Expand Down Expand Up @@ -488,7 +495,7 @@ async fn post_task_leader_all_optional_fields() {

let vdaf_verify_key = SecretBytes::new(thread_rng().sample_iter(Standard).take(16).collect());
let aggregator_auth_token = AuthenticationToken::DapAuth(random());

let collector_auth_token_hash = AuthenticationTokenHash::from(&random());
// Verify: posting a task creates a new task which matches the request.
let req = PostTaskReq {
peer_aggregator_endpoint: "http://aggregator.endpoint".try_into().unwrap(),
Expand All @@ -510,6 +517,7 @@ async fn post_task_leader_all_optional_fields() {
.config()
.clone(),
aggregator_auth_token: Some(aggregator_auth_token.clone()),
collector_auth_token_hash: Some(collector_auth_token_hash.clone()),
};
let mut conn = post("/tasks")
.with_request_body(serde_json::to_vec(&req).unwrap())
Expand Down Expand Up @@ -559,7 +567,10 @@ async fn post_task_leader_all_optional_fields() {
aggregator_auth_token.as_ref(),
got_task.aggregator_auth_token().unwrap().as_ref()
);
assert!(got_task.collector_auth_token().is_some());
assert_eq!(
got_task.collector_auth_token_hash().unwrap(),
&collector_auth_token_hash
);

// ...and the response.
assert_eq!(got_task_resp, TaskResp::try_from(&got_task).unwrap());
Expand Down Expand Up @@ -594,6 +605,7 @@ async fn post_task_leader_no_aggregator_auth_token() {
.config()
.clone(),
aggregator_auth_token: None,
collector_auth_token_hash: Some(AuthenticationTokenHash::from(&random())),
};

assert_response!(
Expand Down Expand Up @@ -1555,11 +1567,12 @@ fn post_task_req_serialization() {
HpkePublicKey::from([0u8; 32].to_vec()),
),
aggregator_auth_token: None,
collector_auth_token_hash: None,
},
&[
Token::Struct {
name: "PostTaskReq",
len: 11,
len: 12,
},
Token::Str("peer_aggregator_endpoint"),
Token::Str("https://example.com/"),
Expand Down Expand Up @@ -1631,6 +1644,8 @@ fn post_task_req_serialization() {
Token::StructEnd,
Token::Str("aggregator_auth_token"),
Token::None,
Token::Str("collector_auth_token_hash"),
Token::None,
Token::StructEnd,
],
);
Expand Down Expand Up @@ -1663,11 +1678,14 @@ fn post_task_req_serialization() {
aggregator_auth_token: Some(
AuthenticationToken::new_dap_auth_token_from_string("ZW5jb2RlZA").unwrap(),
),
collector_auth_token_hash: Some(AuthenticationTokenHash::from(
&AuthenticationToken::new_dap_auth_token_from_string("ZW5jb2RlZA").unwrap(),
)),
},
&[
Token::Struct {
name: "PostTaskReq",
len: 11,
len: 12,
},
Token::Str("peer_aggregator_endpoint"),
Token::Str("https://example.com/"),
Expand Down Expand Up @@ -1753,6 +1771,20 @@ fn post_task_req_serialization() {
Token::Str("token"),
Token::Str("ZW5jb2RlZA"),
Token::StructEnd,
Token::Str("collector_auth_token_hash"),
Token::Some,
Token::Struct {
name: "AuthenticationTokenHash",
len: 2,
},
Token::Str("type"),
Token::UnitVariant {
name: "AuthenticationTokenHash",
variant: "DapAuth",
},
Token::Str("hash"),
Token::Str("hT_ixzv_X1CmJmHGT7jYSEBbdB-CN9H8WxAvjgv4rms"),
Token::StructEnd,
Token::StructEnd,
],
);
Expand Down Expand Up @@ -1793,10 +1825,10 @@ fn task_resp_serialization() {
"Y29sbGVjdG9yLWFiY2RlZjAw",
)
.unwrap(),
collector_auth_token: AuthenticationToken::new_dap_auth_token_from_string(
"Y29sbGVjdG9yLWFiY2RlZjAw",
)
.unwrap(),
collector_auth_token_hash: AuthenticationTokenHash::from(
&AuthenticationToken::new_dap_auth_token_from_string("Y29sbGVjdG9yLWFiY2RlZjAw")
.unwrap(),
),
collector_hpke_config: HpkeConfig::new(
HpkeConfigId::from(7),
HpkeKemId::X25519HkdfSha256,
Expand All @@ -1812,7 +1844,7 @@ fn task_resp_serialization() {
&[
Token::Struct {
name: "TaskResp",
len: 16,
len: 15,
},
Token::Str("task_id"),
Token::Str("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"),
Expand Down Expand Up @@ -1863,20 +1895,6 @@ fn task_resp_serialization() {
Token::U64(60),
Token::Str("aggregator_auth_token"),
Token::None,
Token::Str("collector_auth_token"),
Token::Some,
Token::Struct {
name: "AuthenticationToken",
len: 2,
},
Token::Str("type"),
Token::UnitVariant {
name: "AuthenticationToken",
variant: "DapAuth",
},
Token::Str("token"),
Token::Str("Y29sbGVjdG9yLWFiY2RlZjAw"),
Token::StructEnd,
Token::Str("collector_hpke_config"),
Token::Struct {
name: "HpkeConfig",
Expand Down
Loading

0 comments on commit 80dfa34

Please sign in to comment.