Skip to content

Commit

Permalink
chore(boundary): add periodic shared salt fetching in ic-boundary (#…
Browse files Browse the repository at this point in the history
…3887)

- Added periodic shared salt fetching for `ic-boundary` from the
canister
- Added `system-test` asserting API boundary nodes retrieve the salt
from the canister after its installation

---------

Co-authored-by: IDX GitLab Automation <[email protected]>
  • Loading branch information
nikolay-komarevskiy and sa-idx-admin authored Feb 13, 2025
1 parent 7e5f934 commit a130103
Show file tree
Hide file tree
Showing 12 changed files with 411 additions and 30 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rs/boundary_node/ic_boundary/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package(default_visibility = ["//visibility:public"])
DEPENDENCIES = [
# Keep sorted.
"//packages/ic-ed25519",
"//rs/boundary_node/anonymization/client",
"//rs/boundary_node/rate_limits/api:rate_limits_api",
"//rs/boundary_node/salt_sharing/api:salt_sharing_api",
"//rs/canister_client",
"//rs/canister_client/sender",
"//rs/certification/test-utils",
Expand Down
2 changes: 1 addition & 1 deletion rs/boundary_node/ic_boundary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ tls = []

[dependencies]
anyhow = { workspace = true }
anonymization-client = { path = "../anonymization/client" }
arc-swap = "1.7.1"
async-scoped = { version = "0.9", features = ["use-tokio"] }
async-trait = { workspace = true }
Expand Down Expand Up @@ -72,6 +71,7 @@ prometheus = { workspace = true }
rand = { workspace = true }
ratelimit = "0.9.1"
rate-limits-api = { path = "../rate_limits/api" }
salt-sharing-api = { path = "../salt_sharing/api" }
rcgen = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions rs/boundary_node/ic_boundary/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ pub struct Observability {
/// Log Anonymization Canister ID
#[clap(env, long)]
pub obs_log_anonymization_canister_id: Option<Principal>,

/// Frequency to poll the canister for the anonymization salt
#[clap(env, long, default_value = "60s", value_parser = parse_duration)]
pub obs_log_anonymization_poll_interval: Duration,
}

#[derive(Args)]
Expand Down
44 changes: 19 additions & 25 deletions rs/boundary_node/ic_boundary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ use std::{
time::{Duration, Instant},
};

use anonymization_client::{
Canister as AnonymizationCanister,
CanisterMethodsBuilder as AnonymizationCanisterMethodsBuilder, Track,
Tracker as AnonymizationTracker,
};
use anyhow::{anyhow, Context, Error};
use arc_swap::ArcSwapOption;
use async_scoped::TokioScope;
Expand Down Expand Up @@ -49,7 +44,6 @@ use ic_registry_replicator::RegistryReplicator;
use ic_types::{crypto::threshold_sig::ThresholdSigPublicKey, messages::MessageId};
use nix::unistd::{getpgid, setpgid, Pid};
use prometheus::Registry;
use rand::rngs::OsRng;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tower::{limit::ConcurrencyLimitLayer, util::MapResponseLayer, ServiceBuilder};
Expand All @@ -73,6 +67,7 @@ use crate::{
rate_limiting::{generic, RateLimit},
retry::{retry_request, RetryParams},
routes::{self, ErrorCause, Health, Lookup, Proxy, ProxyRouter, RootKey},
salt_fetcher::AnonymizationSaltFetcher,
snapshot::{
generate_stub_snapshot, generate_stub_subnet, RegistrySnapshot, SnapshotPersister,
Snapshotter,
Expand Down Expand Up @@ -447,15 +442,24 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
}

// HTTP Logs Anonymization
let tracker = if let Some(v) = cli.obs.obs_log_anonymization_canister_id {
let canister = AnonymizationCanister::new(agent.clone().unwrap(), v);
let cm = AnonymizationCanisterMethodsBuilder::new(canister)
.with_metrics(&metrics_registry)
.build();
Some(AnonymizationTracker::new(Box::new(OsRng), cm)?)
} else {
None
};
let salt_fetcher = cli
.obs
.obs_log_anonymization_canister_id
.and_then(|canister_id| {
agent.as_ref().map(|agent| {
Arc::new(AnonymizationSaltFetcher::new(
agent.clone(),
canister_id,
cli.obs.obs_log_anonymization_poll_interval,
anonymization_salt,
&metrics_registry,
))
})
});

if let Some(fetcher) = salt_fetcher {
runners.push(Box::new(fetcher));
}

TokioScope::scope_and_block(move |s| {
if let Some(v) = registry_replicator {
Expand All @@ -470,16 +474,6 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
});
}

// Anonymization Tracker
if let Some(mut t) = tracker {
s.spawn(async move {
t.track(|value| {
anonymization_salt.store(Some(Arc::new(value)));
})
.await
});
}

// HTTP servers
s.spawn(async move {
metrics_server
Expand Down
1 change: 1 addition & 0 deletions rs/boundary_node/ic_boundary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod persist;
mod rate_limiting;
mod retry;
mod routes;
mod salt_fetcher;
mod snapshot;
#[cfg(any(test, feature = "bench"))]
pub mod test_utils;
Expand Down
1 change: 1 addition & 0 deletions rs/boundary_node/ic_boundary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod persist;
mod rate_limiting;
mod retry;
mod routes;
mod salt_fetcher;
mod snapshot;
#[cfg(any(test, feature = "bench"))]
pub mod test_utils;
Expand Down
155 changes: 155 additions & 0 deletions rs/boundary_node/ic_boundary/src/salt_fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use crate::core::Run;
use anyhow::Error;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use candid::Principal;
use candid::{Decode, Encode};
use ic_canister_client::Agent;
use ic_types::CanisterId;
use prometheus::{
register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec,
IntGauge, Registry,
};
use salt_sharing_api::GetSaltResponse;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use tracing::warn;

const SERVICE: &str = "AnonymizationSaltFetcher";
const METRIC_PREFIX: &str = "anonymization_salt";

fn nonce() -> Vec<u8> {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos()
.to_le_bytes()
.to_vec()
}

struct Metrics {
last_successful_fetch: IntGauge,
fetches: IntCounterVec,
last_salt_id: IntGauge,
}

impl Metrics {
fn new(registry: &Registry) -> Self {
Self {
last_successful_fetch: register_int_gauge_with_registry!(
format!("{METRIC_PREFIX}_last_successful_fetch"),
format!("The Unix timestamp of the last successful salt fetch"),
registry
)
.unwrap(),

last_salt_id: register_int_gauge_with_registry!(
format!("{METRIC_PREFIX}_last_salt_id"),
format!("ID of the latest fetched salt"),
registry,
)
.unwrap(),

fetches: register_int_counter_vec_with_registry!(
format!("{METRIC_PREFIX}_fetches"),
format!("Count of salt fetches and their outcome"),
&["result"],
registry
)
.unwrap(),
}
}
}

pub struct AnonymizationSaltFetcher {
agent: Agent,
canister_id: CanisterId,
polling_interval: Duration,
anonymization_salt: Arc<ArcSwapOption<Vec<u8>>>,
metrics: Metrics,
}

impl AnonymizationSaltFetcher {
pub fn new(
agent: Agent,
canister_id: Principal,
polling_interval: Duration,
anonymization_salt: Arc<ArcSwapOption<Vec<u8>>>,
registry: &Registry,
) -> Self {
Self {
agent,
canister_id: CanisterId::try_from_principal_id(canister_id.into()).unwrap(),
anonymization_salt,
polling_interval,
metrics: Metrics::new(registry),
}
}
}

#[async_trait]
impl Run for Arc<AnonymizationSaltFetcher> {
async fn run(&mut self) -> Result<(), Error> {
loop {
let query_response = match self
.agent
.execute_update(
&self.canister_id,
&self.canister_id,
"get_salt",
Encode!().unwrap(),
nonce(),
)
.await
{
Ok(response) => match response {
Some(response) => response,
None => {
warn!("{SERVICE}: got empty response from the canister");
continue;
}
},
Err(err) => {
warn!("{SERVICE}: failed to get salt from the canister: {err:#}");
continue;
}
};

let salt_response = match Decode!(&query_response, GetSaltResponse) {
Ok(response) => response,
Err(err) => {
warn!("{SERVICE}: failed to decode candid response: {err:?}");
continue;
}
};

let status = if salt_response.is_ok() {
"success"
} else {
"failure"
};
self.metrics.fetches.with_label_values(&[status]).inc();

match salt_response {
Ok(resp) => {
// Overwrite salt used for hashing sensitive data
self.anonymization_salt.store(Some(Arc::new(resp.salt)));
// Update metrics
self.metrics.last_salt_id.set(resp.salt_id as i64);
self.metrics.last_successful_fetch.set(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
}
Err(err) => {
warn!("{SERVICE}: get_salt failed: {err:?}");
}
}

sleep(self.polling_interval).await;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type SaltGenerationStrategy = variant {
};

// Initialization arguments used when installing/upgrading/reinstalling the canister
type InitArgs = record {
type InitArg = record {
// If true salt is regenerated immediately and subsequently based on the chosen strategy
regenerate_now: bool;
// Strategy defining salt generation
Expand All @@ -56,7 +56,7 @@ type InitArgs = record {
registry_polling_interval_secs: nat64;
};

service : (InitArgs) -> {
service : (InitArg) -> {
// Fetches the current salt (randomly generated value to be added to data before hashing)
get_salt: () -> (GetSaltResponse) query;
// Canister metrics (Http Interface)
Expand Down
33 changes: 33 additions & 0 deletions rs/tests/boundary_nodes/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,39 @@ system_test_nns(
],
)

system_test_nns(
name = "salt_sharing_canister_test",
env = {
"SALT_SHARING_CANISTER_WASM_PATH": "$(rootpath //rs/boundary_node/salt_sharing:salt_sharing_canister)",
},
proc_macro_deps = [],
tags = [
"k8s",
],
target_compatible_with = ["@platforms//os:linux"], # requires libssh that does not build on Mac OS
runtime_deps = GUESTOS_RUNTIME_DEPS + [
"//rs/boundary_node/salt_sharing:salt_sharing_canister",
],
deps = [
# Keep sorted.
"//rs/boundary_node/salt_sharing/api:salt_sharing_api",
"//rs/nns/constants",
"//rs/nns/test_utils",
"//rs/registry/subnet_type",
"//rs/rust_canisters/canister_test",
"//rs/tests/boundary_nodes/utils",
"//rs/tests/driver:ic-system-test-driver",
"//rs/types/base_types",
"//rs/types/types",
"@crate_index//:anyhow",
"@crate_index//:candid",
"@crate_index//:ic-agent",
"@crate_index//:slog",
"@crate_index//:tokio",
"@crate_index//:wat",
],
)

system_test_nns(
name = "rate_limit_canister_test",
env = {
Expand Down
5 changes: 5 additions & 0 deletions rs/tests/boundary_nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ k256 = { workspace = true }
rand = { workspace = true }
rand_chacha = { workspace = true }
rate-limits-api = { path = "../../boundary_node/rate_limits/api" }
salt-sharing-api = { path = "../../boundary_node/salt_sharing/api" }
regex = { workspace = true }
registry-canister = { path = "../../registry/canister" }
slog = { workspace = true }
Expand Down Expand Up @@ -68,3 +69,7 @@ path = "bn_update_workload_test.rs"
[[bin]]
name = "ic-systest-rate-limit-canister"
path = "rate_limit_canister_test.rs"

[[bin]]
name = "ic-systest-salt-sharing-canister"
path = "salt_sharing_canister_test.rs"
Loading

0 comments on commit a130103

Please sign in to comment.