Skip to content

Commit

Permalink
add heartbeat semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Mar 6, 2024
1 parent f64f749 commit 183cafb
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
14 changes: 13 additions & 1 deletion proto/src/discovery/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
//! 3. Brokers to verify permits
//! 4. Brokers for peer discovery
use std::{collections::HashSet, time::Duration};
use std::{collections::HashSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use rand::{rngs::StdRng, RngCore, SeedableRng};
use redis::aio::ConnectionManager;
use tokio::sync::Semaphore;

use crate::{
bail,
Expand All @@ -24,6 +25,8 @@ use super::{BrokerIdentifier, DiscoveryClient};
pub struct Redis {
/// The underlying `Redis` connection. Is managed, so we don't have to worry about reconnections
underlying_connection: ConnectionManager,
/// The semaphore we need to ensure that heartbeat transactions are atomic
heartbeat_semaphore: Arc<Semaphore>,
/// Our operator identifier (in practice, will be something like a concat of advertise addresses)
identifier: BrokerIdentifier,
}
Expand Down Expand Up @@ -61,6 +64,7 @@ impl DiscoveryClient for Redis {
"failed to create Redis connection manager"
),
identifier,
heartbeat_semaphore: Arc::from(Semaphore::const_new(1)),
})
}

Expand All @@ -77,6 +81,12 @@ impl DiscoveryClient for Redis {
num_connections: u64,
heartbeat_expiry: Duration,
) -> Result<()> {
// Acquire permit to perform the heartbeat so we don't interleave requests˜
let heartbeat_permit = bail!(
self.heartbeat_semaphore.acquire().await,
Async,
"failed to acquire semaphore"
);
// Set up atomic transaction
// TODO: macro this bail to something like bail_redis
bail!(
Expand Down Expand Up @@ -135,6 +145,8 @@ impl DiscoveryClient for Redis {
"failed to connect to Redis"
);

// Drop the heartbeat permit, allowing others to perform
drop(heartbeat_permit);
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions proto/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub enum Error {
Time(String),
/// An error that is used to specify that a required task has exited.
Exited(String),
/// An error related to a general asynchronicity/lock issue.
Async(String),
}

#[macro_export]
Expand Down

0 comments on commit 183cafb

Please sign in to comment.