Skip to content

Commit

Permalink
Only store aggregator auth token hash in helper (#2062)
Browse files Browse the repository at this point in the history
When acting as the helper, Janus only needs the aggregator auth token
hash to validate requests. It never makes aggregation sub-protocol
requests and thus doesn't need the actual token.

In this commit we change the schema of the `tasks` table so that helpers
may store an auth token hash instead of an auth token. These changes are
reflected in `janus_aggregator_core::task::AggregatorTask`. Finally we
make changes to the aggregator API to accommodate this: the helper
reports the aggregator auth token it generates during task provisioning,
but doesn't store that token and cannot subsequently provide it when
responding to `GET /tasks/:task_id` requests. Fortunately that field in
`TaskResp` was already optional, and divviup-api already ignored it
except in the creation case.

Part of #1509, 1524
  • Loading branch information
tgeoghegan authored Oct 3, 2023
1 parent a87421f commit e1739b3
Show file tree
Hide file tree
Showing 20 changed files with 247 additions and 114 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ version = "0.6.0-prerelease-2"

[workspace.dependencies]
anyhow = "1"
assert_matches = "1"
base64 = "0.21.3"
# Disable default features to disable compatibility with the old `time` crate, and we also don't
# (yet) need other default features.
Expand Down
2 changes: 1 addition & 1 deletion aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ url = { version = "2.4.1", features = ["serde"] }
uuid = { version = "1.4.1", features = ["v4"] }

[dev-dependencies]
assert_matches = "1"
assert_matches.workspace = true
hyper = "0.14.27"
janus_aggregator = { path = ".", features = ["fpvec_bounded_l2", "test-util"] }
janus_aggregator_core = { workspace = true, features = ["test-util"] }
Expand Down
6 changes: 3 additions & 3 deletions aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,15 @@ impl<C: Clock> Aggregator<C> {
) -> Result<AggregationJobResp, Error> {
let task_aggregator = match self.task_aggregator_for(task_id).await? {
Some(task_aggregator) => {
if task_aggregator.task.role() != &Role::Helper {
return Err(Error::UnrecognizedTask(*task_id));
}
if !task_aggregator
.task
.check_aggregator_auth_token(auth_token.as_ref())
{
return Err(Error::UnauthorizedRequest(*task_id));
}
if task_aggregator.task.role() != &Role::Helper {
return Err(Error::UnrecognizedTask(*task_id));
}
task_aggregator
}
None if self.cfg.taskprov_config.enabled && taskprov_task_config.is_some() => {
Expand Down
8 changes: 7 additions & 1 deletion aggregator/src/bin/janus_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,11 @@ mod tests {
aead_id: Aes128Gcm
public_key: 8lAqZ7OfNV2Gi_9cNE6J9WRmPbO-k1UPtu2Bztd0-yc
aggregator_auth_token:
type: Bearer
token: Y29sbGVjdG9yLWFiZjU0MDhlMmIxNjAxODMxNjI1YWYzOTU5MTA2NDU4
collector_auth_token:
type: Bearer
token: Y29sbGVjdG9yLWFiZjU0MDhlMmIxNjAxODMxNjI1YWYzOTU5MTA2NDU4
hpke_keys: []
- peer_aggregator_endpoint: https://leader
query_type: TimeInterval
Expand All @@ -748,7 +752,9 @@ mod tests {
kdf_id: HkdfSha256
aead_id: Aes128Gcm
public_key: 8lAqZ7OfNV2Gi_9cNE6J9WRmPbO-k1UPtu2Bztd0-yc
aggregator_auth_token:
aggregator_auth_token_hash:
type: Bearer
hash: MJOoBO_ysLEuG_lv2C37eEOf1Ngetsr-Ers0ZYj4vdQ
collector_auth_token:
hpke_keys: []
"#;
Expand Down
2 changes: 2 additions & 0 deletions aggregator_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ trillium-router.workspace = true
url = { version = "2.4.1", features = ["serde"] }

[dev-dependencies]
assert_matches.workspace = true
futures = "0.3.28"
janus_aggregator_core = { workspace = true, features = ["test-util"] }
rstest.workspace = true
tokio.workspace = true
trillium-testing = { workspace = true, features = ["tokio"] }
10 changes: 4 additions & 6 deletions aggregator_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,9 @@ pub(crate) struct TaskResp {
/// How much clock skew to allow between client and aggregator. Reports from
/// farther than this duration into the future will be rejected.
pub(crate) tolerable_clock_skew: Duration,
/// The authentication token for inter-aggregator communication in this task. If `role` is
/// Helper, this token is used by the aggregator to authenticate requests from the Leader. Not
/// set if `role` is Leader..
// TODO(#1509): This field will have to change as Janus helpers will only store a salted
// hash of aggregator auth tokens.
/// The authentication token for inter-aggregator communication in this task. Only set in the
/// initial response to a task creation request and only when the role is helper. Subsequent
/// `TaskResp`s obtained from `GET /tasks/:task_id` will not contain the authentication token.
pub(crate) aggregator_auth_token: Option<AuthenticationToken>,
/// The authentication token used by the task's Collector to authenticate to the Leader.
/// `Some` if `role` is Leader, `None` otherwise.
Expand Down Expand Up @@ -149,7 +147,7 @@ impl TryFrom<&AggregatorTask> for TaskResp {
min_batch_size: task.min_batch_size(),
time_precision: *task.time_precision(),
tolerable_clock_skew: *task.tolerable_clock_skew(),
aggregator_auth_token: task.aggregator_auth_token().cloned(),
aggregator_auth_token: None,
collector_auth_token: task.collector_auth_token().cloned(),
collector_hpke_config: task
.collector_hpke_config()
Expand Down
44 changes: 30 additions & 14 deletions aggregator_api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use janus_aggregator_core::{
taskprov::PeerAggregator,
SecretBytes,
};
use janus_core::{hpke::generate_hpke_config_and_private_key, time::Clock};
use janus_core::{
auth_tokens::AuthenticationTokenHash, hpke::generate_hpke_config_and_private_key, time::Clock,
};
use janus_messages::HpkeConfigId;
use janus_messages::{
query_type::Code as SupportedQueryType, Duration, HpkeAeadId, HpkeKdfId, HpkeKemId, Role,
Expand Down Expand Up @@ -103,19 +105,22 @@ pub(super) async fn post_task<C: Clock>(

let vdaf_verify_key = SecretBytes::new(vdaf_verify_key_bytes);

let aggregator_parameters = match req.role {
let (aggregator_auth_token, aggregator_parameters) = match req.role {
Role::Leader => {
let aggregator_auth_token = req.aggregator_auth_token.ok_or_else(|| {
Error::BadRequest(
"aggregator acting in leader role must be provided an aggregator auth token"
.to_string(),
)
})?;
AggregatorTaskParameters::Leader {
aggregator_auth_token,
collector_auth_token: random(),
collector_hpke_config: req.collector_hpke_config,
}
(
None,
AggregatorTaskParameters::Leader {
aggregator_auth_token,
collector_auth_token: random(),
collector_hpke_config: req.collector_hpke_config,
},
)
}

Role::Helper => {
Expand All @@ -126,10 +131,15 @@ pub(super) async fn post_task<C: Clock>(
));
}

AggregatorTaskParameters::Helper {
aggregator_auth_token: random(),
collector_hpke_config: req.collector_hpke_config,
}
let aggregator_auth_token = random();
let aggregator_auth_token_hash = AuthenticationTokenHash::from(&aggregator_auth_token);
(
Some(aggregator_auth_token),
AggregatorTaskParameters::Helper {
aggregator_auth_token_hash,
collector_hpke_config: req.collector_hpke_config,
},
)
}

_ => unreachable!(),
Expand Down Expand Up @@ -194,9 +204,15 @@ pub(super) async fn post_task<C: Clock>(
})
.await?;

Ok(Json(
TaskResp::try_from(task.as_ref()).map_err(|err| Error::Internal(err.to_string()))?,
))
let mut task_resp =
TaskResp::try_from(task.as_ref()).map_err(|err| Error::Internal(err.to_string()))?;

// When creating a new task in the helper, we must put the unhashed aggregator auth token in the
// response so that divviup-api can later provide it to the leader, but the helper doesn't store
// the unhashed token and can't later provide it.
task_resp.aggregator_auth_token = aggregator_auth_token;

Ok(Json(task_resp))
}

pub(super) async fn get_task<C: Clock>(
Expand Down
37 changes: 19 additions & 18 deletions aggregator_api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
},
Config, CONTENT_TYPE,
};
use assert_matches::assert_matches;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use futures::future::try_join_all;
use janus_aggregator_core::{
Expand Down Expand Up @@ -305,7 +306,7 @@ async fn post_task_helper_no_optional_fields() {
.run_async(&handler)
.await;
assert_status!(conn, Status::Ok);
let got_task_resp: TaskResp = serde_json::from_slice(
let mut got_task_resp: TaskResp = serde_json::from_slice(
&conn
.take_response_body()
.unwrap()
Expand All @@ -315,6 +316,13 @@ async fn post_task_helper_no_optional_fields() {
)
.unwrap();

// Task creation response will include the aggregator auth token, but it won't be in the
// datastore or subsequent TaskResps. The token should be a Bearer token.
assert_matches!(
got_task_resp.aggregator_auth_token,
Some(AuthenticationToken::Bearer(_))
);

let got_task = ds
.run_tx(|tx| {
let got_task_resp = got_task_resp.clone();
Expand All @@ -336,14 +344,16 @@ async fn post_task_helper_no_optional_fields() {
assert_eq!(req.task_expiration.as_ref(), got_task.task_expiration());
assert_eq!(req.min_batch_size, got_task.min_batch_size());
assert_eq!(&req.time_precision, got_task.time_precision());
assert!(got_task.aggregator_auth_token().is_some());
assert!(got_task.aggregator_auth_token().is_none());
assert!(got_task.collector_auth_token().is_none());
assert_eq!(
&req.collector_hpke_config,
got_task.collector_hpke_config().unwrap()
);

// ...and the response.
// ...and the response. Clear the aggregator auth token from got_task_resp or it won't match
// what's in the datastore, as the helper only stores the auth token _hash_.
got_task_resp.aggregator_auth_token = None;
assert_eq!(got_task_resp, TaskResp::try_from(&got_task).unwrap());
}

Expand Down Expand Up @@ -598,14 +608,17 @@ async fn post_task_leader_no_aggregator_auth_token() {
);
}

#[rstest::rstest]
#[case::leader(Role::Leader)]
#[case::helper(Role::Helper)]
#[tokio::test]
async fn get_task() {
async fn get_task(#[case] role: Role) {
// Setup: write a task to the datastore.
let (handler, _ephemeral_datastore, ds) = setup_api_test().await;

let task = TaskBuilder::new(QueryType::TimeInterval, VdafInstance::Fake)
.build()
.leader_view()
.view_for_role(role)
.unwrap();

ds.run_tx(|tx| {
Expand Down Expand Up @@ -1849,19 +1862,7 @@ fn task_resp_serialization() {
Token::NewtypeStruct { name: "Duration" },
Token::U64(60),
Token::Str("aggregator_auth_token"),
Token::Some,
Token::Struct {
name: "AuthenticationToken",
len: 2,
},
Token::Str("type"),
Token::UnitVariant {
name: "AuthenticationToken",
variant: "DapAuth",
},
Token::Str("token"),
Token::Str("Y29sbGVjdG9yLWFiY2RlZjAw"),
Token::StructEnd,
Token::None,
Token::Str("collector_auth_token"),
Token::Some,
Token::Struct {
Expand Down
2 changes: 1 addition & 1 deletion aggregator_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ url = { version = "2.4.1", features = ["serde"] }
uuid = { version = "1.4.1", features = ["v4"] }

[dev-dependencies]
assert_matches = "1"
assert_matches.workspace = true
hyper = "0.14.27"
janus_aggregator_core = { path = ".", features = ["test-util"] }
janus_core = { workspace = true, features = ["test-util"] }
Expand Down
Loading

0 comments on commit e1739b3

Please sign in to comment.