Skip to content

Commit

Permalink
feat(sns): Add simple global lock for the periodic task that relates …
Browse files Browse the repository at this point in the history
…to upgrades (#2193)

## What

This PR adds a lock around checking the upgrade status and refreshing
the cached upgrade steps. This ensures that they do not interleave with
one another (or themselves).

## Why

This is useful because it simplifies our reasoning about any logic that
may be impacted by such an interleaving.

## Additional Notes

This implementation is not ideal, because we attempt to acquire the lock
unconditionally, which means that in cases where the lock was acquired
it's impossible to say why. And in the unlikely even that the lock is
not released, it would be useful to know what actual work necessitated
it be acquired. #2124 builds upon this PR to create a more robust
locking mechanism.
  • Loading branch information
anchpop authored and nmattia committed Oct 25, 2024
1 parent e8be4eb commit c1b8010
Showing 1 changed file with 105 additions and 6 deletions.
111 changes: 105 additions & 6 deletions rs/sns/governance/src/governance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,13 @@ pub fn log_prefix() -> String {
/// The static MEMO used when calculating the SNS Treasury subaccount.
pub const TREASURY_SUBACCOUNT_NONCE: u64 = 0;

// How frequently the canister should attempt to refresh the cached_upgrade_steps
/// How frequently the canister should attempt to refresh the cached_upgrade_steps
pub const UPGRADE_STEPS_INTERVAL_REFRESH_BACKOFF_SECONDS: u64 = 60 * 60; // 1 hour

/// The maximum duration for which the upgrade periodic task lock may be held.
/// Past this duration, the lock will be automatically released.
const UPGRADE_PERIODIC_TASK_LOCK_TIMEOUT_SECONDS: u64 = 600;

/// Converts bytes to a subaccountpub fn bytes_to_subaccount(bytes: &[u8]) -> Result<icrc_ledger_types::icrc1::account::Subaccount, GovernanceError> {
pub fn bytes_to_subaccount(
bytes: &[u8],
Expand Down Expand Up @@ -696,6 +700,12 @@ pub struct Governance {
/// The number of proposals after the last time "garbage collection" was run.
pub latest_gc_num_proposals: usize,

/// Global lock for all periodic tasks that relate to upgrades - this is used to
/// guarantee that they don't interleave with one another outside of rare circumstances (e.g. timeouts).
/// `None` means that the lock is not currently held by any task.
/// `Some(x)` means that a task is has been holding the lock since timestamp `x`.
pub upgrade_periodic_task_lock: Option<u64>,

/// Whether test features are enabled.
/// Test features should not be exposed in production. But, code that should
/// not run in production can be gated behind a check for this flag as an
Expand Down Expand Up @@ -777,6 +787,7 @@ impl Governance {
closest_proposal_deadline_timestamp_seconds: 0,
latest_gc_timestamp_seconds: 0,
latest_gc_num_proposals: 0,
upgrade_periodic_task_lock: None,
test_features_enabled: false,
};

Expand Down Expand Up @@ -4623,8 +4634,24 @@ impl Governance {

self.process_proposals();

if self.should_check_upgrade_status() {
self.check_upgrade_status().await;
// None of the upgrade-related tasks should interleave with one another or themselves, so we acquire a global
// lock for the duration of their execution. This will return `false` if the lock has already been acquired less
// than 10 minutes ago by a previous invocation of `run_periodic_tasks`, in which case we skip the
// upgrade-related tasks.
if self.acquire_upgrade_periodic_task_lock() {
// We only want to check the upgrade status if we are currently executing an upgrade.
if self.should_check_upgrade_status() {
self.check_upgrade_status().await;
}

if self.should_refresh_cached_upgrade_steps() {
// We only want to refresh the cached_upgrade_steps every UPGRADE_STEPS_INTERVAL_REFRESH_BACKOFF_SECONDS
// seconds, so we first lock the refresh operation (which will automatically unlock after that interval)
self.temporarily_lock_refresh_cached_upgrade_steps();
self.refresh_cached_upgrade_steps().await;
}

self.release_upgrade_periodic_task_lock();
}

let should_distribute_rewards = self.should_distribute_rewards();
Expand Down Expand Up @@ -4656,13 +4683,32 @@ impl Governance {
self.maybe_move_staked_maturity();

self.maybe_gc();
}

if self.should_refresh_cached_upgrade_steps() {
self.temporarily_lock_refresh_cached_upgrade_steps();
self.refresh_cached_upgrade_steps().await;
// Acquires the "upgrade periodic task lock" (a lock shared between all periodic tasks that relate to upgrades)
// if it is currently released or was last acquired over UPGRADE_PERIODIC_TASK_LOCK_TIMEOUT_SECONDS ago.
fn acquire_upgrade_periodic_task_lock(&mut self) -> bool {
let now = self.env.now();
match self.upgrade_periodic_task_lock {
Some(time_acquired)
if now
> time_acquired.saturating_add(UPGRADE_PERIODIC_TASK_LOCK_TIMEOUT_SECONDS) =>
{
self.upgrade_periodic_task_lock = Some(now);
true
}
Some(_) => false,
None => {
self.upgrade_periodic_task_lock = Some(now);
true
}
}
}

fn release_upgrade_periodic_task_lock(&mut self) {
self.upgrade_periodic_task_lock = None;
}

pub fn temporarily_lock_refresh_cached_upgrade_steps(&mut self) {
if let Some(ref mut cached_upgrade_steps) = self.proto.cached_upgrade_steps {
cached_upgrade_steps.requested_timestamp_seconds = Some(self.env.now());
Expand Down Expand Up @@ -8239,6 +8285,59 @@ mod tests {
);
}

#[test]
fn test_upgrade_periodic_task_lock() {
let env = NativeEnvironment::new(Some(*TEST_GOVERNANCE_CANISTER_ID));
let mut gov = Governance::new(
basic_governance_proto().try_into().unwrap(),
Box::new(env),
Box::new(DoNothingLedger {}),
Box::new(DoNothingLedger {}),
Box::new(FakeCmc::new()),
);

// The lock is initially None
assert!(gov.upgrade_periodic_task_lock.is_none());

// Test acquiring it
assert!(gov.acquire_upgrade_periodic_task_lock());
assert!(gov.upgrade_periodic_task_lock.is_some()); // the lock is now engaged
assert!(!gov.acquire_upgrade_periodic_task_lock()); // acquiring it twice fails
assert!(!gov.acquire_upgrade_periodic_task_lock()); // acquiring it a third time fails
assert!(gov.upgrade_periodic_task_lock.is_some()); // the lock is still engaged

// Test releasing it
gov.release_upgrade_periodic_task_lock();
assert!(gov.upgrade_periodic_task_lock.is_none());

// Releasing twice is fine
gov.release_upgrade_periodic_task_lock();
assert!(gov.upgrade_periodic_task_lock.is_none());
}

#[test]
fn test_upgrade_periodic_task_lock_times_out() {
let env = NativeEnvironment::new(Some(*TEST_GOVERNANCE_CANISTER_ID));
let mut gov = Governance::new(
basic_governance_proto().try_into().unwrap(),
Box::new(env),
Box::new(DoNothingLedger {}),
Box::new(DoNothingLedger {}),
Box::new(FakeCmc::new()),
);

assert!(gov.acquire_upgrade_periodic_task_lock());
assert!(!gov.acquire_upgrade_periodic_task_lock());
assert!(gov.upgrade_periodic_task_lock.is_some());

// advance time
gov.env.set_time_warp(TimeWarp {
delta_s: UPGRADE_PERIODIC_TASK_LOCK_TIMEOUT_SECONDS as i64 + 1,
});
assert!(gov.acquire_upgrade_periodic_task_lock()); // The lock should successfully be acquired, since the previous one timed out
assert!(!gov.acquire_upgrade_periodic_task_lock());
}

#[test]
fn test_check_upgrade_can_succeed_if_archives_out_of_sync() {
let root_canister_id = *TEST_ROOT_CANISTER_ID;
Expand Down

0 comments on commit c1b8010

Please sign in to comment.