Skip to content

Commit

Permalink
feat(sns): Port Swap's periodic tasks from heartbeats to timers (#1932)
Browse files Browse the repository at this point in the history
This PR ports Swap's periodic tasks from heartbeats to timers, making
use of the fact that Swap has recently been ported to IC-CDK. For Swap's
periodic tasks, timers are better than heartbeats for two reasons:

1. Swap can run them less frequently (I propose once a minute vs. once
every round), which should start saving resources right away.
2. Swap can cancel the scheduling of any periodic tasks after reaching
the terminal lifecycle state, which should drastically save subnet
resources long-term.
  • Loading branch information
aterga authored Oct 9, 2024
1 parent c65c725 commit 77bd8fc
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2650,7 +2650,7 @@ pub mod sns {
status: SwapFinalizationStatus,
) -> Result<GetAutoFinalizationStatusResponse, String> {
let mut last_auto_finalization_status = None;
for _attempt_count in 1..=100 {
for _attempt_count in 1..=1000 {
pocket_ic.tick().await;
pocket_ic.advance_time(Duration::from_secs(1)).await;
let auto_finalization_status =
Expand Down
23 changes: 19 additions & 4 deletions rs/sns/integration_tests/src/initialization_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ fn test_one_proposal_sns_initialization_success_with_neurons_fund_participation(
sns_initialization_flow_test.advance_time_to_open_swap(test_sns.swap_canister_id.unwrap());

// Make sure the opening can occur in the heartbeat
sns_initialization_flow_test
.state_machine
.advance_time(std::time::Duration::from_secs(100));
sns_initialization_flow_test.state_machine.tick();
let get_lifecycle_response = get_lifecycle(
&sns_initialization_flow_test.state_machine,
Expand Down Expand Up @@ -480,7 +483,10 @@ fn test_one_proposal_sns_initialization_success_with_neurons_fund_participation(
direct_participant_amounts.insert(participant, response.icp_accepted_participation_e8s);
}

// Assert the Swap lifecycle transitions to Committed in a heartbeat message
// Assert the Swap lifecycle transitions to Committed after the next periodic task ran.
sns_initialization_flow_test
.state_machine
.advance_time(std::time::Duration::from_secs(100));
sns_initialization_flow_test.state_machine.tick();
let get_lifecycle_response = get_lifecycle(
&sns_initialization_flow_test.state_machine,
Expand Down Expand Up @@ -750,6 +756,9 @@ fn test_one_proposal_sns_initialization_success_without_neurons_fund_participati
sns_initialization_flow_test.advance_time_to_open_swap(test_sns.swap_canister_id.unwrap());

// Make sure the opening can occur in the heartbeat
sns_initialization_flow_test
.state_machine
.advance_time(std::time::Duration::from_secs(100));
sns_initialization_flow_test.state_machine.tick();
let get_lifecycle_response = get_lifecycle(
&sns_initialization_flow_test.state_machine,
Expand Down Expand Up @@ -782,7 +791,10 @@ fn test_one_proposal_sns_initialization_success_without_neurons_fund_participati
direct_participant_amounts.insert(participant, response.icp_accepted_participation_e8s);
}

// Assert the Swap lifecycle transitions to Committed in a heartbeat message
// Assert the Swap lifecycle transitions to Committed after the next periodic task ran.
sns_initialization_flow_test
.state_machine
.advance_time(std::time::Duration::from_secs(100));
sns_initialization_flow_test.state_machine.tick();
let get_lifecycle_response = get_lifecycle(
&sns_initialization_flow_test.state_machine,
Expand Down Expand Up @@ -1113,7 +1125,10 @@ fn test_one_proposal_sns_initialization_failed_swap_returns_neurons_fund_and_dap
direct_participant_amounts.insert(participant, response.icp_accepted_participation_e8s);
}

// Assert the Swap lifecycle transitions to Committed in a heartbeat message
// Assert the Swap lifecycle transitions to Committed after the next periodic task ran.
sns_initialization_flow_test
.state_machine
.advance_time(std::time::Duration::from_secs(100));
sns_initialization_flow_test.state_machine.tick();
let get_lifecycle_response = get_lifecycle(
&sns_initialization_flow_test.state_machine,
Expand Down Expand Up @@ -1215,7 +1230,7 @@ fn test_one_proposal_sns_initialization_supports_multiple_open_swaps() {
// Step 3: Advance time to open the swap for participation
sns_initialization_flow_test.advance_time_to_open_swap(test_sns_1.swap_canister_id.unwrap());

// Make sure the opening can occur in the heartbeat
// Make sure the opening can occur after the next periodic task ran.
sns_initialization_flow_test.state_machine.tick();
let get_lifecycle_response = get_lifecycle(
&sns_initialization_flow_test.state_machine,
Expand Down
8 changes: 8 additions & 0 deletions rs/sns/integration_tests/src/payment_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use lazy_static::lazy_static;
use std::{
sync::{Arc, Mutex},
thread,
time::Duration,
};

lazy_static! {
Expand Down Expand Up @@ -115,6 +116,9 @@ impl PaymentProtocolTestSetup {
state_machine
.install_existing_canister(swap_id, wasm, args)
.unwrap();
// Make sure at least one Swap periodic tasks is executed.
state_machine.advance_time(Duration::from_secs(100));
state_machine.tick();
}

Self {
Expand Down Expand Up @@ -308,6 +312,10 @@ impl PaymentProtocolTestSetup {
fn test_get_open_ticket() {
let user0 = PrincipalId::new_user_test_id(0);
let payment_flow_protocol = PaymentProtocolTestSetup::default_setup();
assert_eq!(
payment_flow_protocol.get_lifecycle().lifecycle,
Some(Lifecycle::Open as i32)
);
assert_eq!(payment_flow_protocol.get_open_ticket(&user0).unwrap(), None);
}

Expand Down
4 changes: 4 additions & 0 deletions rs/sns/integration_tests/src/sns_treasury.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ fn new_treasury_scenario(
index_canister_id: _,
} = sns_test_canister_ids;

// Make sure at least one Swap periodic tasks is executed.
state_machine.advance_time(std::time::Duration::from_secs(100));
state_machine.tick();

participate_in_swap(
state_machine,
swap_canister_id,
Expand Down
1 change: 1 addition & 0 deletions rs/sns/swap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ DEPENDENCIES = [
"@crate_index//:comparable",
"@crate_index//:hex",
"@crate_index//:ic-cdk",
"@crate_index//:ic-cdk-timers",
"@crate_index//:ic-metrics-encoder",
"@crate_index//:ic-stable-structures",
"@crate_index//:itertools",
Expand Down
1 change: 1 addition & 0 deletions rs/sns/swap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ic-canisters-http-types = { path = "../../rust_canisters/http_types" }
ic-ledger-core = { path = "../../ledger_suite/common/ledger_core" }
ic-cdk = { workspace = true }
ic-cdk-macros = { workspace = true }
ic-cdk-timers = { workspace = true }
ic-metrics-encoder = "1"
ic-nervous-system-canisters = { path = "../../nervous_system/canisters" }
ic-nervous-system-clients = { path = "../../nervous_system/clients" }
Expand Down
50 changes: 40 additions & 10 deletions rs/sns/swap/canister/canister.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use ic_base_types::{CanisterId, PrincipalId};
use ic_canister_log::log;
use ic_canisters_http_types::{HttpRequest, HttpResponse, HttpResponseBuilder};
use ic_cdk::{api::time, caller, heartbeat, id, init, post_upgrade, pre_upgrade, query, update};
use ic_cdk::{api::time, caller, id, init, post_upgrade, pre_upgrade, query, update};
use ic_cdk_timers::TimerId;
use ic_nervous_system_canisters::ledger::IcpLedgerCanister;
use ic_nervous_system_clients::{
canister_id_record::CanisterIdRecord,
Expand Down Expand Up @@ -30,10 +31,13 @@ use ic_sns_swap::{
use ic_stable_structures::{writer::Writer, Memory};
use prost::Message;
use std::{
cell::RefCell,
str::FromStr,
time::{Duration, SystemTime},
};

const RUN_PERIODIC_TASKS_INTERVAL: Duration = Duration::from_secs(60);

// TODO(NNS1-1589): Unhack.
// use ic_sns_root::pb::v1::{SetDappControllersRequest, SetDappControllersResponse};

Expand All @@ -44,6 +48,10 @@ use std::{
/// The global state of the this canister.
static mut SWAP: Option<Swap> = None;

thread_local! {
static TIMER_ID: RefCell<TimerId> = RefCell::new(Default::default());
}

/// Returns an immutable reference to the global state.
///
/// This should only be called once the global state has been initialized, which
Expand Down Expand Up @@ -244,15 +252,6 @@ fn notify_payment_failure(_request: NotifyPaymentFailureRequest) -> NotifyPaymen
// === Canister helper & boilerplate methods ===
// =============================================================================

/// Tries to commit or abort the swap if the parameters have been satisfied.
#[heartbeat]
fn canister_heartbeat() {
let future = swap_mut().heartbeat(now_fn);

// The canister_heartbeat must be synchronous, so we cannot .await the future.
ic_cdk::spawn(future);
}

fn now_nanoseconds() -> u64 {
if cfg!(target_arch = "wasm32") {
time()
Expand All @@ -277,6 +276,34 @@ fn create_real_icp_ledger(id: CanisterId) -> IcpLedgerCanister<CdkRuntime> {
IcpLedgerCanister::<CdkRuntime>::new(id)
}

async fn run_periodic_tasks() {
swap_mut().run_periodic_tasks(now_fn).await;

if !swap().requires_periodic_tasks() {
log!(
INFO,
"All work that needs to be done in Swap's periodic tasks has been completed. \
Stop scheduling new periodic tasks."
);
TIMER_ID.with(|saved_timer_id| {
let timer_id = saved_timer_id.borrow();
ic_cdk_timers::clear_timer(*timer_id);
});
}
}

fn init_timers() {
if swap().requires_periodic_tasks() {
let timer_id = ic_cdk_timers::set_timer_interval(RUN_PERIODIC_TASKS_INTERVAL, || {
ic_cdk::spawn(run_periodic_tasks())
});
TIMER_ID.with(|saved_timer_id| {
let mut saved_timer_id = saved_timer_id.borrow_mut();
*saved_timer_id = timer_id;
});
}
}

/// In contrast to canister_init(), this method does not do deserialization.
#[init]
fn canister_init(init_payload: Init) {
Expand All @@ -288,6 +315,7 @@ fn canister_init(init_payload: Init) {
);
SWAP = Some(swap);
}
init_timers();
log!(INFO, "Initialized");
}

Expand Down Expand Up @@ -372,6 +400,8 @@ fn canister_post_upgrade() {
err
)
});

init_timers();
}

/// Serve an HttpRequest made to this canister
Expand Down
59 changes: 35 additions & 24 deletions rs/sns/swap/src/swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,15 @@ impl Swap {
self.lifecycle().is_terminal()
}

/// Determines if Swap still has work that might need to be done in periodic tasks.
///
/// See also: `Swap.run_periodic_tasks`.
pub fn requires_periodic_tasks(&self) -> bool {
// Practically, already_tried_to_auto_finalize should never be None, but we err towards
// caution, which in this case means to continue scheduling periodic tasks.
!self.lifecycle_is_terminal() || !self.already_tried_to_auto_finalize.unwrap_or(false)
}

//
// --- state transition functions ------------------------------------------
//
Expand All @@ -681,8 +690,8 @@ impl Swap {
if !self.can_open(now_seconds) {
return false;
}
// set the purge_old_ticket last principal so that the routine can start
// in the next heartbeat
// set the purge_old_ticket last principal so that the next periodic task can start
// the routine.
self.purge_old_tickets_next_principal = Some(FIRST_PRINCIPAL_BYTES.to_vec());
self.update_derived_fields();
self.set_lifecycle(Lifecycle::Open);
Expand Down Expand Up @@ -1003,15 +1012,14 @@ impl Swap {
// --- state modifying methods ---------------------------------------------
//

/// Runs those tasks that should be run on canister heartbeat.
/// Runs those tasks that should be run periodically.
///
/// The argument 'now_fn' is a function that returns the current time
/// for bookkeeping of transfers. For easier testing, it is given
/// an argument that is 'false' to get the timestamp when a
/// transfer is initiated and 'true' to get the timestamp when a
/// transfer is successful.
pub async fn heartbeat(&mut self, now_fn: fn(bool) -> u64) {
let heartbeat_start_seconds = now_fn(false);
/// The argument 'now_fn' is a function that returns the current time for bookkeeping
/// of transfers. For easier testing, it is given an argument that is 'false' to get
/// the timestamp when a transfer is initiated and 'true' to get the timestamp when a transfer
/// is successful.
pub async fn run_periodic_tasks(&mut self, now_fn: fn(bool) -> u64) {
let periodic_task_start_seconds = now_fn(false);

// Purge old tickets
const NUMBER_OF_TICKETS_THRESHOLD: u64 = 100_000_000; // 100M * ~size(ticket) = ~25GB
Expand All @@ -1025,32 +1033,36 @@ impl Swap {
MAX_NUMBER_OF_PRINCIPALS_TO_INSPECT,
);

// Automatically transition the state. Only one state transition per heartbeat.
// Automatically transition the state. Only one state transition per periodic task.

// Auto-open the swap
if self.try_open(heartbeat_start_seconds) {
log!(INFO, "Swap opened at timestamp {}", heartbeat_start_seconds);
if self.try_open(periodic_task_start_seconds) {
log!(
INFO,
"Swap opened at timestamp {}",
periodic_task_start_seconds
);
}
// Auto-commit the swap
else if self.try_commit(heartbeat_start_seconds) {
else if self.try_commit(periodic_task_start_seconds) {
log!(
INFO,
"Swap committed at timestamp {}",
heartbeat_start_seconds
periodic_task_start_seconds
);
}
// Auto-abort the swap
else if self.try_abort(heartbeat_start_seconds) {
else if self.try_abort(periodic_task_start_seconds) {
log!(
INFO,
"Swap aborted at timestamp {}",
heartbeat_start_seconds
periodic_task_start_seconds
);
}
// Auto-finalize the swap
// We discard the error, if there is one, because to log it would mean
// it would be logged every heartbeat where we fall through to this
// point (and we don't want to spam the logs).
// We discard the error, if there is one, because to log it would mean it would be logged
// every time a periodic task is executed where we fall through to this point (and we don't
// want to spam the logs).
else if self.can_auto_finalize().is_ok() {
// First, record when the finalization started, in case this function is
// refactored to `await` before this point.
Expand Down Expand Up @@ -2615,10 +2627,9 @@ impl Swap {
max_number_to_inspect,
) {
Some(new_next_principal) => {
// If a principal is returned then there are some principals
// that haven't been checked yet by purge_old_tickets. We record
// the next principal so that the next heartbeat can continue the
// work.
// If a principal is returned then there are some principals that haven't been
// checked yet by purge_old_tickets. We record the next principal so that
// the next periodic task can continue the work.
self.purge_old_tickets_next_principal = Some(new_next_principal);
Some(false)
}
Expand Down

0 comments on commit 77bd8fc

Please sign in to comment.