Skip to content

Commit

Permalink
synchronous lock
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 25, 2024
1 parent 3db169d commit d36b4e6
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ derive_builder.workspace = true
rkyv.workspace = true
derivative = "2.2.0"
dashmap = "5.5.3"
parking_lot = "0.12.1"

2 changes: 1 addition & 1 deletion broker/src/connections/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::{
hash::Hash,
};

use parking_lot::RwLock;
use proto::{connection::Bytes, discovery::BrokerIdentifier, message::Topic};
use tokio::sync::RwLock;

/// Our broadcast map is just two associative (bidirectional, multi) maps:
/// one for brokers and one for users.
Expand Down
58 changes: 20 additions & 38 deletions broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{collections::HashSet, sync::Arc};

use dashmap::DashMap;
pub use direct::DirectMap;
use parking_lot::RwLock;
use proto::{
connection::{
protocols::{Protocol, Sender},
Expand All @@ -14,7 +15,7 @@ use proto::{
message::Topic,
mnemonic,
};
use tokio::{spawn, sync::RwLock};
use tokio::spawn;
use tracing::{error, warn};

use self::{broadcast::BroadcastMap, versioned::VersionedMap};
Expand Down Expand Up @@ -63,40 +64,40 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
/// Get the full versioned vector map of user -> broker.
/// We send this to other brokers so they can merge it.
pub async fn get_full_user_sync(self: &Arc<Self>) -> DirectMap {
self.direct_map.read().await.get_full()
self.direct_map.read().get_full()
}

/// Get the differences in the versioned vector map of user -> broker
/// We send this to other brokers so they can merge it.
pub async fn get_partial_user_sync(self: &Arc<Self>) -> DirectMap {
self.direct_map.write().await.diff()
self.direct_map.write().diff()
}

/// Apply a received user sync map. Overwrites our values if they are old.
/// Kicks off users that are now connected elsewhere.
pub async fn apply_user_sync(self: &Arc<Self>, map: DirectMap) {
// Merge the maps, returning the difference
let users_to_remove = self.direct_map.write().await.merge(map);
let users_to_remove = self.direct_map.write().merge(map);

// We should remove the users that are different, if they exist locally.
for user in users_to_remove {
self.remove_user(user).await;
self.remove_user(user);
}
}

/// Get the full list of topics that we are interested in.
/// We send this to new brokers when they start.
pub async fn get_full_topic_sync(self: &Arc<Self>) -> Vec<Topic> {
self.broadcast_map.users.read().await.get_values()
pub fn get_full_topic_sync(self: &Arc<Self>) -> Vec<Topic> {
self.broadcast_map.users.read().get_values()
}

/// Get the partial list of topics that we are interested in. Returns the
/// additions and removals as a tuple `(a, r)` in that order. We send this
/// to other brokers whenever there are changes.
pub async fn get_partial_topic_sync(self: &Arc<Self>) -> (Vec<Topic>, Vec<Topic>) {
pub fn get_partial_topic_sync(self: &Arc<Self>) -> (Vec<Topic>, Vec<Topic>) {
// Lock the maps
let mut previous = self.broadcast_map.previous_subscribed_topics.write().await;
let now = HashSet::from_iter(self.broadcast_map.users.read().await.get_values());
let mut previous = self.broadcast_map.previous_subscribed_topics.write();
let now = HashSet::from_iter(self.broadcast_map.users.read().get_values());

// Calculate additions and removals
let added = now.difference(&previous);
Expand Down Expand Up @@ -145,43 +146,39 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
// Insert into our direct map
self.direct_map
.write()
.await
.insert(user_public_key, self.identity.clone());
}

/// Remove a broker from our map by their identifier. Also removes them
/// from our broadcast map, in case they were subscribed to any topics.
pub async fn remove_broker(self: &Arc<Self>, broker_identifier: &BrokerIdentifier) {
pub fn remove_broker(self: &Arc<Self>, broker_identifier: &BrokerIdentifier) {
// Remove from broker list
self.brokers.remove(broker_identifier);

// Remove from all topics
self.broadcast_map
.brokers
.write()
.await
.remove_key(broker_identifier);
}

/// Remove a user from our map by their public key. Also removes them
/// from our broadcast map, in case they were subscribed to any topics, and
/// the versioned vector map. This is so other brokers don't keep trying
/// to send us messages for a disconnected user.
pub async fn remove_user(self: &Arc<Self>, user_public_key: Bytes) {
pub fn remove_user(self: &Arc<Self>, user_public_key: Bytes) {
// Remove from user list
self.users.remove(&user_public_key);

// Remove from user topics
self.broadcast_map
.users
.write()
.await
.remove_key(&user_public_key);

// Remove from direct map if they're connected to us
self.direct_map
.write()
.await
.remove_if_equals(user_public_key, self.identity.clone());
}

Expand All @@ -194,7 +191,6 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
self.broadcast_map
.brokers
.write()
.await
.associate_key_with_values(broker_identifier, topics);
}

Expand All @@ -203,7 +199,6 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
self.broadcast_map
.users
.write()
.await
.associate_key_with_values(user_public_key, topics);
}

Expand All @@ -216,7 +211,6 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
self.broadcast_map
.brokers
.write()
.await
.dissociate_keys_from_value(broker_identifier, &topics);
}

Expand All @@ -225,7 +219,6 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
self.broadcast_map
.users
.write()
.await
.dissociate_keys_from_value(user_public_key, &topics);
}

Expand All @@ -248,7 +241,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
if let Err(err) = connection.send_message_raw(message).await {
// If we fail, remove the broker from our map.
error!("broker send failed: {err}");
inner.remove_broker(&broker_identifier).await;
inner.remove_broker(&broker_identifier);
};
});
}
Expand All @@ -268,7 +261,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
if let Err(err) = connection.send_message_raw(message).await {
// Remove them if we failed to send it
error!("broker send failed: {err}");
inner.remove_broker(&broker_identifier).await;
inner.remove_broker(&broker_identifier);
};
});
}
Expand All @@ -287,7 +280,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
spawn(async move {
if connection.send_message_raw(message).await.is_err() {
// If we fail to send the message, remove the user.
inner.remove_user(user_public_key).await;
inner.remove_user(user_public_key);
};
});
}
Expand All @@ -303,7 +296,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
to_user_only: bool,
) {
// Look up from our map
if let Some(broker_identifier) = self.direct_map.read().await.get(&user_public_key) {
if let Some(broker_identifier) = self.direct_map.read().get(&user_public_key) {
if *broker_identifier == self.identity {
// We own the user, send it this way
self.send_to_user(user_public_key, message);
Expand Down Expand Up @@ -341,21 +334,10 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
for topic in topics {
// If we can send to brokers, we should do it
if !to_users_only {
broker_recipients.extend(
self.broadcast_map
.brokers
.read()
.await
.get_keys_by_value(&topic),
);
broker_recipients
.extend(self.broadcast_map.brokers.read().get_keys_by_value(&topic));
}
user_recipients.extend(
self.broadcast_map
.users
.read()
.await
.get_keys_by_value(&topic),
);
user_recipients.extend(self.broadcast_map.users.read().get_keys_by_value(&topic));
}

// If we can send to brokers, do so
Expand Down
6 changes: 3 additions & 3 deletions broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ impl<
// Send a full user sync
if let Err(err) = self.full_user_sync(&broker_identifier).await {
error!("failed to perform full user sync: {err}");
self.connections.remove_broker(&broker_identifier).await;
self.connections.remove_broker(&broker_identifier);
return;
};

// Send a full topic sync
// TODO: macro removals or something
if let Err(err) = self.full_topic_sync(&broker_identifier).await {
error!("failed to perform full topic sync: {err}");
self.connections.remove_broker(&broker_identifier).await;
self.connections.remove_broker(&broker_identifier);
return;
};

Expand Down Expand Up @@ -95,7 +95,7 @@ impl<

// Remove from the connected broker identities so that we may
// try to reconnect inthe future.
self.connections.remove_broker(&broker_identifier).await;
self.connections.remove_broker(&broker_identifier);
}

pub async fn broker_receive_loop(
Expand Down
2 changes: 1 addition & 1 deletion broker/src/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<
metrics::NUM_USERS_CONNECTED.dec();

// Once the main loop ends, we remove the connection
self.connections.remove_user(public_key).await;
self.connections.remove_user(public_key);
}

/// This is the main loop where we deal with user connectins. On exit, the calling function
Expand Down
4 changes: 2 additions & 2 deletions broker/src/tasks/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<
/// - if we fail to serialize the message
pub async fn full_topic_sync(self: &Arc<Self>, broker: &BrokerIdentifier) -> Result<()> {
// Get full list of topics
let topics = self.connections.get_full_topic_sync().await;
let topics = self.connections.get_full_topic_sync();

// Serialize the message
let raw_message = Arc::from(bail!(
Expand All @@ -110,7 +110,7 @@ impl<
/// - If we fail to serialize the message
pub async fn partial_topic_sync(self: &Arc<Self>) -> Result<()> {
// Get partial list of topics
let (additions, removals) = self.connections.get_partial_topic_sync().await;
let (additions, removals) = self.connections.get_partial_topic_sync();

// If we have some additions,
if !additions.is_empty() {
Expand Down

0 comments on commit d36b4e6

Please sign in to comment.