diff --git a/Cargo.lock b/Cargo.lock index 3d3a216a2d7..d267bad5748 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6005,7 +6005,6 @@ dependencies = [ name = "ic-boundary" version = "0.9.0" dependencies = [ - "anonymization-client", "anyhow", "arc-swap", "async-scoped", @@ -6069,6 +6068,7 @@ dependencies = [ "reqwest 0.12.12", "rustls 0.23.21", "rustls-pemfile 2.2.0", + "salt-sharing-api", "serde", "serde_bytes", "serde_cbor", @@ -14140,6 +14140,7 @@ dependencies = [ "rate-limits-api", "regex", "registry-canister", + "salt-sharing-api", "slog", "tokio", "wat", diff --git a/rs/boundary_node/ic_boundary/BUILD.bazel b/rs/boundary_node/ic_boundary/BUILD.bazel index a8556797ba3..02aeebd1837 100644 --- a/rs/boundary_node/ic_boundary/BUILD.bazel +++ b/rs/boundary_node/ic_boundary/BUILD.bazel @@ -6,8 +6,8 @@ package(default_visibility = ["//visibility:public"]) DEPENDENCIES = [ # Keep sorted. "//packages/ic-ed25519", - "//rs/boundary_node/anonymization/client", "//rs/boundary_node/rate_limits/api:rate_limits_api", + "//rs/boundary_node/salt_sharing/api:salt_sharing_api", "//rs/canister_client", "//rs/canister_client/sender", "//rs/certification/test-utils", diff --git a/rs/boundary_node/ic_boundary/Cargo.toml b/rs/boundary_node/ic_boundary/Cargo.toml index c290fef3c9a..689a0a216ca 100644 --- a/rs/boundary_node/ic_boundary/Cargo.toml +++ b/rs/boundary_node/ic_boundary/Cargo.toml @@ -16,7 +16,6 @@ tls = [] [dependencies] anyhow = { workspace = true } -anonymization-client = { path = "../anonymization/client" } arc-swap = "1.7.1" async-scoped = { version = "0.9", features = ["use-tokio"] } async-trait = { workspace = true } @@ -72,6 +71,7 @@ prometheus = { workspace = true } rand = { workspace = true } ratelimit = "0.9.1" rate-limits-api = { path = "../rate_limits/api" } +salt-sharing-api = { path = "../salt_sharing/api" } rcgen = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } diff --git a/rs/boundary_node/ic_boundary/src/cli.rs b/rs/boundary_node/ic_boundary/src/cli.rs index 8ca8c993d02..9dd64cd29da 100644 --- a/rs/boundary_node/ic_boundary/src/cli.rs +++ b/rs/boundary_node/ic_boundary/src/cli.rs @@ -241,6 +241,10 @@ pub struct Observability { /// Log Anonymization Canister ID #[clap(env, long)] pub obs_log_anonymization_canister_id: Option, + + /// Frequency to poll the canister for the anonymization salt + #[clap(env, long, default_value = "60s", value_parser = parse_duration)] + pub obs_log_anonymization_poll_interval: Duration, } #[derive(Args)] diff --git a/rs/boundary_node/ic_boundary/src/core.rs b/rs/boundary_node/ic_boundary/src/core.rs index 8c06873aa21..a2d96354941 100644 --- a/rs/boundary_node/ic_boundary/src/core.rs +++ b/rs/boundary_node/ic_boundary/src/core.rs @@ -6,11 +6,6 @@ use std::{ time::{Duration, Instant}, }; -use anonymization_client::{ - Canister as AnonymizationCanister, - CanisterMethodsBuilder as AnonymizationCanisterMethodsBuilder, Track, - Tracker as AnonymizationTracker, -}; use anyhow::{anyhow, Context, Error}; use arc_swap::ArcSwapOption; use async_scoped::TokioScope; @@ -49,7 +44,6 @@ use ic_registry_replicator::RegistryReplicator; use ic_types::{crypto::threshold_sig::ThresholdSigPublicKey, messages::MessageId}; use nix::unistd::{getpgid, setpgid, Pid}; use prometheus::Registry; -use rand::rngs::OsRng; use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tower::{limit::ConcurrencyLimitLayer, util::MapResponseLayer, ServiceBuilder}; @@ -73,6 +67,7 @@ use crate::{ rate_limiting::{generic, RateLimit}, retry::{retry_request, RetryParams}, routes::{self, ErrorCause, Health, Lookup, Proxy, ProxyRouter, RootKey}, + salt_fetcher::AnonymizationSaltFetcher, snapshot::{ generate_stub_snapshot, generate_stub_subnet, RegistrySnapshot, SnapshotPersister, Snapshotter, @@ -447,15 +442,24 @@ pub async fn main(cli: Cli) -> Result<(), Error> { } // HTTP Logs Anonymization - let tracker = if let Some(v) = cli.obs.obs_log_anonymization_canister_id { - let canister = AnonymizationCanister::new(agent.clone().unwrap(), v); - let cm = AnonymizationCanisterMethodsBuilder::new(canister) - .with_metrics(&metrics_registry) - .build(); - Some(AnonymizationTracker::new(Box::new(OsRng), cm)?) - } else { - None - }; + let salt_fetcher = cli + .obs + .obs_log_anonymization_canister_id + .and_then(|canister_id| { + agent.as_ref().map(|agent| { + Arc::new(AnonymizationSaltFetcher::new( + agent.clone(), + canister_id, + cli.obs.obs_log_anonymization_poll_interval, + anonymization_salt, + &metrics_registry, + )) + }) + }); + + if let Some(fetcher) = salt_fetcher { + runners.push(Box::new(fetcher)); + } TokioScope::scope_and_block(move |s| { if let Some(v) = registry_replicator { @@ -470,16 +474,6 @@ pub async fn main(cli: Cli) -> Result<(), Error> { }); } - // Anonymization Tracker - if let Some(mut t) = tracker { - s.spawn(async move { - t.track(|value| { - anonymization_salt.store(Some(Arc::new(value))); - }) - .await - }); - } - // HTTP servers s.spawn(async move { metrics_server diff --git a/rs/boundary_node/ic_boundary/src/lib.rs b/rs/boundary_node/ic_boundary/src/lib.rs index 768c8119df5..789c3023745 100644 --- a/rs/boundary_node/ic_boundary/src/lib.rs +++ b/rs/boundary_node/ic_boundary/src/lib.rs @@ -12,6 +12,7 @@ mod persist; mod rate_limiting; mod retry; mod routes; +mod salt_fetcher; mod snapshot; #[cfg(any(test, feature = "bench"))] pub mod test_utils; diff --git a/rs/boundary_node/ic_boundary/src/main.rs b/rs/boundary_node/ic_boundary/src/main.rs index e12c4d5b2dd..dc98bdc8aa2 100644 --- a/rs/boundary_node/ic_boundary/src/main.rs +++ b/rs/boundary_node/ic_boundary/src/main.rs @@ -19,6 +19,7 @@ mod persist; mod rate_limiting; mod retry; mod routes; +mod salt_fetcher; mod snapshot; #[cfg(any(test, feature = "bench"))] pub mod test_utils; diff --git a/rs/boundary_node/ic_boundary/src/salt_fetcher.rs b/rs/boundary_node/ic_boundary/src/salt_fetcher.rs new file mode 100644 index 00000000000..53db6b80117 --- /dev/null +++ b/rs/boundary_node/ic_boundary/src/salt_fetcher.rs @@ -0,0 +1,155 @@ +use crate::core::Run; +use anyhow::Error; +use arc_swap::ArcSwapOption; +use async_trait::async_trait; +use candid::Principal; +use candid::{Decode, Encode}; +use ic_canister_client::Agent; +use ic_types::CanisterId; +use prometheus::{ + register_int_counter_vec_with_registry, register_int_gauge_with_registry, IntCounterVec, + IntGauge, Registry, +}; +use salt_sharing_api::GetSaltResponse; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{sync::Arc, time::Duration}; +use tokio::time::sleep; +use tracing::warn; + +const SERVICE: &str = "AnonymizationSaltFetcher"; +const METRIC_PREFIX: &str = "anonymization_salt"; + +fn nonce() -> Vec { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() + .to_le_bytes() + .to_vec() +} + +struct Metrics { + last_successful_fetch: IntGauge, + fetches: IntCounterVec, + last_salt_id: IntGauge, +} + +impl Metrics { + fn new(registry: &Registry) -> Self { + Self { + last_successful_fetch: register_int_gauge_with_registry!( + format!("{METRIC_PREFIX}_last_successful_fetch"), + format!("The Unix timestamp of the last successful salt fetch"), + registry + ) + .unwrap(), + + last_salt_id: register_int_gauge_with_registry!( + format!("{METRIC_PREFIX}_last_salt_id"), + format!("ID of the latest fetched salt"), + registry, + ) + .unwrap(), + + fetches: register_int_counter_vec_with_registry!( + format!("{METRIC_PREFIX}_fetches"), + format!("Count of salt fetches and their outcome"), + &["result"], + registry + ) + .unwrap(), + } + } +} + +pub struct AnonymizationSaltFetcher { + agent: Agent, + canister_id: CanisterId, + polling_interval: Duration, + anonymization_salt: Arc>>, + metrics: Metrics, +} + +impl AnonymizationSaltFetcher { + pub fn new( + agent: Agent, + canister_id: Principal, + polling_interval: Duration, + anonymization_salt: Arc>>, + registry: &Registry, + ) -> Self { + Self { + agent, + canister_id: CanisterId::try_from_principal_id(canister_id.into()).unwrap(), + anonymization_salt, + polling_interval, + metrics: Metrics::new(registry), + } + } +} + +#[async_trait] +impl Run for Arc { + async fn run(&mut self) -> Result<(), Error> { + loop { + let query_response = match self + .agent + .execute_update( + &self.canister_id, + &self.canister_id, + "get_salt", + Encode!().unwrap(), + nonce(), + ) + .await + { + Ok(response) => match response { + Some(response) => response, + None => { + warn!("{SERVICE}: got empty response from the canister"); + continue; + } + }, + Err(err) => { + warn!("{SERVICE}: failed to get salt from the canister: {err:#}"); + continue; + } + }; + + let salt_response = match Decode!(&query_response, GetSaltResponse) { + Ok(response) => response, + Err(err) => { + warn!("{SERVICE}: failed to decode candid response: {err:?}"); + continue; + } + }; + + let status = if salt_response.is_ok() { + "success" + } else { + "failure" + }; + self.metrics.fetches.with_label_values(&[status]).inc(); + + match salt_response { + Ok(resp) => { + // Overwrite salt used for hashing sensitive data + self.anonymization_salt.store(Some(Arc::new(resp.salt))); + // Update metrics + self.metrics.last_salt_id.set(resp.salt_id as i64); + self.metrics.last_successful_fetch.set( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + ); + } + Err(err) => { + warn!("{SERVICE}: get_salt failed: {err:?}"); + } + } + + sleep(self.polling_interval).await; + } + } +} diff --git a/rs/boundary_node/salt_sharing/canister/salt_sharing_canister.did b/rs/boundary_node/salt_sharing/canister/salt_sharing_canister.did index 4063072950e..43937702712 100644 --- a/rs/boundary_node/salt_sharing/canister/salt_sharing_canister.did +++ b/rs/boundary_node/salt_sharing/canister/salt_sharing_canister.did @@ -46,7 +46,7 @@ type SaltGenerationStrategy = variant { }; // Initialization arguments used when installing/upgrading/reinstalling the canister -type InitArgs = record { +type InitArg = record { // If true salt is regenerated immediately and subsequently based on the chosen strategy regenerate_now: bool; // Strategy defining salt generation @@ -56,7 +56,7 @@ type InitArgs = record { registry_polling_interval_secs: nat64; }; -service : (InitArgs) -> { +service : (InitArg) -> { // Fetches the current salt (randomly generated value to be added to data before hashing) get_salt: () -> (GetSaltResponse) query; // Canister metrics (Http Interface) diff --git a/rs/tests/boundary_nodes/BUILD.bazel b/rs/tests/boundary_nodes/BUILD.bazel index 058bb5ed0a2..cb946b31537 100644 --- a/rs/tests/boundary_nodes/BUILD.bazel +++ b/rs/tests/boundary_nodes/BUILD.bazel @@ -125,6 +125,39 @@ system_test_nns( ], ) +system_test_nns( + name = "salt_sharing_canister_test", + env = { + "SALT_SHARING_CANISTER_WASM_PATH": "$(rootpath //rs/boundary_node/salt_sharing:salt_sharing_canister)", + }, + proc_macro_deps = [], + tags = [ + "k8s", + ], + target_compatible_with = ["@platforms//os:linux"], # requires libssh that does not build on Mac OS + runtime_deps = GUESTOS_RUNTIME_DEPS + [ + "//rs/boundary_node/salt_sharing:salt_sharing_canister", + ], + deps = [ + # Keep sorted. + "//rs/boundary_node/salt_sharing/api:salt_sharing_api", + "//rs/nns/constants", + "//rs/nns/test_utils", + "//rs/registry/subnet_type", + "//rs/rust_canisters/canister_test", + "//rs/tests/boundary_nodes/utils", + "//rs/tests/driver:ic-system-test-driver", + "//rs/types/base_types", + "//rs/types/types", + "@crate_index//:anyhow", + "@crate_index//:candid", + "@crate_index//:ic-agent", + "@crate_index//:slog", + "@crate_index//:tokio", + "@crate_index//:wat", + ], +) + system_test_nns( name = "rate_limit_canister_test", env = { diff --git a/rs/tests/boundary_nodes/Cargo.toml b/rs/tests/boundary_nodes/Cargo.toml index 354485874b3..90e5f98883f 100644 --- a/rs/tests/boundary_nodes/Cargo.toml +++ b/rs/tests/boundary_nodes/Cargo.toml @@ -31,6 +31,7 @@ k256 = { workspace = true } rand = { workspace = true } rand_chacha = { workspace = true } rate-limits-api = { path = "../../boundary_node/rate_limits/api" } +salt-sharing-api = { path = "../../boundary_node/salt_sharing/api" } regex = { workspace = true } registry-canister = { path = "../../registry/canister" } slog = { workspace = true } @@ -68,3 +69,7 @@ path = "bn_update_workload_test.rs" [[bin]] name = "ic-systest-rate-limit-canister" path = "rate_limit_canister_test.rs" + +[[bin]] +name = "ic-systest-salt-sharing-canister" +path = "salt_sharing_canister_test.rs" \ No newline at end of file diff --git a/rs/tests/boundary_nodes/salt_sharing_canister_test.rs b/rs/tests/boundary_nodes/salt_sharing_canister_test.rs new file mode 100644 index 00000000000..507685b1b40 --- /dev/null +++ b/rs/tests/boundary_nodes/salt_sharing_canister_test.rs @@ -0,0 +1,187 @@ +/* tag::catalog[] +Title:: Salt-sharing canister integration with API boundary nodes + +Goal:: Ensure API boundary nodes can retrieve shared salt generated by the canister. + +Runbook: +1. Set up an Internet Computer (IC) with a system-subnet and an API boundary node +2. Install the salt-sharing canister at a specified mainnet ID +3. Retrieve the `last_salt_id` metric from the ic-boundary and assert that it is greater than zero + +end::catalog[] */ + +use anyhow::{bail, Result}; +use candid::{Encode, Principal}; +use ic_base_types::PrincipalId; +use ic_nns_test_utils::itest_helpers::install_rust_canister_from_path; +use salt_sharing_api::InitArg; +use slog::info; +use std::{env, net::SocketAddr, str::FromStr, time::Duration}; +use tokio::runtime::Runtime; + +use ic_agent::export::reqwest::Client; +use ic_registry_subnet_type::SubnetType; +use ic_system_test_driver::{ + driver::test_env_api::NnsInstallationBuilder, + driver::{ + group::SystemTestGroup, + ic::InternetComputer, + test_env::TestEnv, + test_env_api::{ + get_dependency_path, GetFirstHealthyNodeSnapshot, HasPublicApiUrl, HasTopologySnapshot, + IcNodeContainer, + }, + }, + retry_with_msg_async, systest, + util::runtime_from_url, +}; + +const SALT_SHARING_CANISTER_ID: &str = "uz2z3-qyaaa-aaaaq-qaacq-cai"; +const IC_BOUNDARY_SALT_METRIC: &str = "ic_boundary_anonymization_salt_last_salt_id"; + +pub fn setup(env: TestEnv) { + info!( + &env.logger(), + "Step 1. Set up an Internet Computer (IC) with a system-subnet and an API boundary node" + ); + InternetComputer::new() + .add_fast_single_node_subnet(SubnetType::System) + .use_specified_ids_allocation_range() + .with_api_boundary_nodes(1) + .setup_and_start(&env) + .expect("failed to setup IC under test"); + let nns_node = env + .topology_snapshot() + .root_subnet() + .nodes() + .next() + .unwrap(); + NnsInstallationBuilder::new() + .at_ids() + .install(&nns_node, &env) + .expect("could not install NNS canisters"); + info!(&env.logger(), "Checking readiness of all replica nodes ..."); + env.topology_snapshot().subnets().for_each(|subnet| { + subnet + .nodes() + .for_each(|node| node.await_status_is_healthy().unwrap()) + }); +} + +async fn test_async(env: TestEnv) { + let logger = env.logger(); + + let nns_node = env.get_first_healthy_system_node_snapshot(); + let nns = runtime_from_url(nns_node.get_public_url(), nns_node.effective_canister_id()); + + let salt_sharing_canister_id = Principal::from_text(SALT_SHARING_CANISTER_ID).unwrap(); + + info!( + &logger, + "Step 2. Install the salt-sharing canister at a specified mainnet ID {salt_sharing_canister_id}" + ); + + let mut salt_sharing_canister = nns + .create_canister_at_id(PrincipalId(salt_sharing_canister_id)) + .await + .unwrap(); + + let path_to_wasm = get_dependency_path( + env::var("SALT_SHARING_CANISTER_WASM_PATH") + .expect("SALT_SHARING_CANISTER_WASM_PATH not set"), + ); + + let args = Encode!(&InitArg { + regenerate_now: true, + salt_generation_strategy: salt_sharing_api::SaltGenerationStrategy::StartOfMonth, + registry_polling_interval_secs: 5, + }) + .unwrap(); + + info!(&logger, "Installing salt-sharing canister wasm ..."); + + install_rust_canister_from_path(&mut salt_sharing_canister, path_to_wasm, Some(args)).await; + + info!( + &logger, + "Salt-sharing canister with id={salt_sharing_canister_id} installed successfully" + ); + + let url = { + let api_bn = env.topology_snapshot().api_boundary_nodes().next().unwrap(); + let ipv6 = SocketAddr::new(api_bn.get_ip_addr(), 9324); + format!("http://{ipv6}/metrics") + }; + + info!(&logger, "Step 3. Verify that API boundary node retrieves the salt and exposes the corresponding metric"); + + let metrics_extractor = MetricsExtractor { url }; + + retry_with_msg_async!( + "check_last_salt_id_metric_is_set".to_string(), + &logger, + Duration::from_secs(180), + Duration::from_secs(5), + || async { + match metrics_extractor + .try_get_metric::(IC_BOUNDARY_SALT_METRIC) + .await + { + Ok(last_salt_id) if last_salt_id > 0 => Ok(()), + _ => { + bail!("last_salt_id hasn't been set yet"); + } + } + } + ) + .await + .expect("shared salt was not received by API boundary node"); +} + +fn test(env: TestEnv) { + let rt = Runtime::new().expect("Could not create tokio runtime"); + rt.block_on(test_async(env)); +} + +fn main() -> Result<()> { + SystemTestGroup::new() + .with_setup(setup) + .add_test(systest!(test)) + .execute_from_args()?; + Ok(()) +} + +struct MetricsExtractor { + url: String, +} + +impl MetricsExtractor { + pub async fn try_get_metric(&self, pattern: &str) -> Result + where + T: FromStr, + { + let client = Client::builder() + .danger_accept_invalid_certs(true) + .build() + .map_err(|err| format!("failed to build a client: {err:?}"))?; + + let response = client + .get(self.url.clone()) + .send() + .await + .map_err(|err| format!("http request failed: {err:?}"))?; + + let body = response + .text() + .await + .map_err(|err| format!("failed to decode response body: {err:?}"))?; + + let metrics = body.lines().map(str::to_string).collect::>(); + + metrics + .into_iter() + .find(|metric| metric.starts_with(pattern)) + .and_then(|metric| metric.split_whitespace().nth(1)?.parse().ok()) + .ok_or_else(|| "metric not found".to_string()) + } +}