Skip to content

Commit

Permalink
redis atomic transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Mar 6, 2024
1 parent 13b34fd commit ca2667a
Showing 1 changed file with 9 additions and 39 deletions.
48 changes: 9 additions & 39 deletions proto/src/discovery/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,43 +77,22 @@ impl DiscoveryClient for Redis {
num_connections: u64,
heartbeat_expiry: Duration,
) -> Result<()> {
// Set up atomic transaction
// TODO: macro this bail to something like bail_redis
// Atomically execute the following commands
bail!(
redis::cmd("MULTI")
.query_async(&mut self.underlying_connection)
.await,
Connection,
"failed to connect to Redis"
);

// Add our identifier to the broker list (if not there already)
bail!(
redis::cmd("SADD")
redis::pipe()
.atomic()
// Add our identifier to the broker list (if not there already)
.cmd("SADD")
.arg(&["brokers", &self.identifier.to_string()])
.query_async(&mut self.underlying_connection)
.await,
Connection,
"failed to connect to Redis"
);

// Set our expiry
bail!(
redis::cmd("EXPIREMEMBER")
// Set our expiry
.cmd("EXPIREMEMBER")
.arg(&[
"brokers",
&self.identifier.to_string(),
&heartbeat_expiry.as_secs().to_string()
])
.query_async(&mut self.underlying_connection)
.await,
Connection,
"failed to connect to Redis"
);

// Set our number connections
bail!(
redis::cmd("SET")
// Set our number of connections
.cmd("SET")
.arg(&[
format!("{}/num_connections", self.identifier),
num_connections.to_string(),
Expand All @@ -126,15 +105,6 @@ impl DiscoveryClient for Redis {
"failed to connect to Redis"
);

// Atomically execute all transactions
bail!(
redis::cmd("EXEC")
.query_async(&mut self.underlying_connection)
.await,
Connection,
"failed to connect to Redis"
);

Ok(())
}

Expand Down

0 comments on commit ca2667a

Please sign in to comment.