Skip to content

Commit

Permalink
Revert "add heartbeat semaphore"
Browse files Browse the repository at this point in the history
This reverts commit 183cafb.
  • Loading branch information
rob-maron committed Mar 6, 2024
1 parent 183cafb commit 59abe2f
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 15 deletions.
14 changes: 1 addition & 13 deletions proto/src/discovery/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
//! 3. Brokers to verify permits
//! 4. Brokers for peer discovery
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{collections::HashSet, time::Duration};

use async_trait::async_trait;
use rand::{rngs::StdRng, RngCore, SeedableRng};
use redis::aio::ConnectionManager;
use tokio::sync::Semaphore;

use crate::{
bail,
Expand All @@ -25,8 +24,6 @@ use super::{BrokerIdentifier, DiscoveryClient};
pub struct Redis {
/// The underlying `Redis` connection. Is managed, so we don't have to worry about reconnections
underlying_connection: ConnectionManager,
/// The semaphore we need to ensure that heartbeat transactions are atomic
heartbeat_semaphore: Arc<Semaphore>,
/// Our operator identifier (in practice, will be something like a concat of advertise addresses)
identifier: BrokerIdentifier,
}
Expand Down Expand Up @@ -64,7 +61,6 @@ impl DiscoveryClient for Redis {
"failed to create Redis connection manager"
),
identifier,
heartbeat_semaphore: Arc::from(Semaphore::const_new(1)),
})
}

Expand All @@ -81,12 +77,6 @@ impl DiscoveryClient for Redis {
num_connections: u64,
heartbeat_expiry: Duration,
) -> Result<()> {
// Acquire permit to perform the heartbeat so we don't interleave requests˜
let heartbeat_permit = bail!(
self.heartbeat_semaphore.acquire().await,
Async,
"failed to acquire semaphore"
);
// Set up atomic transaction
// TODO: macro this bail to something like bail_redis
bail!(
Expand Down Expand Up @@ -145,8 +135,6 @@ impl DiscoveryClient for Redis {
"failed to connect to Redis"
);

// Drop the heartbeat permit, allowing others to perform
drop(heartbeat_permit);
Ok(())
}

Expand Down
2 changes: 0 additions & 2 deletions proto/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ pub enum Error {
Time(String),
/// An error that is used to specify that a required task has exited.
Exited(String),
/// An error related to a general asynchronicity/lock issue.
Async(String),
}

#[macro_export]
Expand Down

0 comments on commit 59abe2f

Please sign in to comment.