Skip to content

Commit

Permalink
Merge pull request #14 from EspressoSystems/boooooo
Browse files Browse the repository at this point in the history
Readability and general improvements
  • Loading branch information
rob-maron authored Mar 7, 2024
2 parents 0465dcb + 7b114e7 commit fc35a26
Show file tree
Hide file tree
Showing 27 changed files with 107 additions and 117 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ lazy_static = "1.4.0"
derive_builder = "0.13.1"
async-std = { version = "1.12.0", features = ["tokio1", "attributes"] }
rkyv = { version = "0.7.44", features = ["validation"] }
derivative = "2.2.0"

# Dev dependencies (can't be defined explicitly in the workspace)
# TODO: figure out if this actually builds on non-test targets
Expand Down
2 changes: 1 addition & 1 deletion broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ prometheus = { workspace = true }
lazy_static = { workspace = true }
derive_builder.workspace = true
rkyv.workspace = true
derivative = "2.2.0"
derivative.workspace = true
dashmap = "5.5.3"
parking_lot = "0.12.1"

4 changes: 2 additions & 2 deletions broker/benches/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Benchmarks for allocating and sending broadcast messages.
//! If run with `--profile-time=N seconds`, it will output a flamegraph.
use proto::connection::protocols::Sender;
use std::{sync::Arc, time::Duration};
use proto::connection::{protocols::Sender, Bytes};
use std::time::Duration;

use broker::reexports::tests::{Run, RunDefinition};
use broker::{assert_received, send_message_as};
Expand Down
4 changes: 2 additions & 2 deletions broker/benches/direct.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! Benchmarks for allocating and sending direct messages.
//! If run with `--profile-time=N seconds`, it will output a flamegraph.
use proto::connection::protocols::Sender;
use std::{sync::Arc, time::Duration};
use proto::connection::{protocols::Sender, Bytes};
use std::time::Duration;

use broker::reexports::tests::{Run, RunDefinition};
use broker::{assert_received, send_message_as};
Expand Down
4 changes: 2 additions & 2 deletions broker/src/connections/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::{
};

use parking_lot::RwLock;
use proto::{connection::Bytes, discovery::BrokerIdentifier, message::Topic};
use proto::{connection::UserPublicKey, discovery::BrokerIdentifier, message::Topic};

/// Our broadcast map is just two associative (bidirectional, multi) maps:
/// one for brokers and one for users.
pub struct BroadcastMap {
pub users: RwLock<RelationalMap<Bytes, Topic>>,
pub users: RwLock<RelationalMap<UserPublicKey, Topic>>,
pub brokers: RwLock<RelationalMap<BrokerIdentifier, Topic>>,

pub previous_subscribed_topics: RwLock<HashSet<Topic>>,
Expand Down
4 changes: 2 additions & 2 deletions broker/src/connections/direct.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! This is where we define routing for direct messages.
// TODO: write tests for this

use proto::{connection::Bytes, discovery::BrokerIdentifier};
use proto::{connection::UserPublicKey, discovery::BrokerIdentifier};

use super::versioned::VersionedMap;

/// We define the direct map as just a type alias of a `VersionedMap`, which
// deals with version vectors.
pub type DirectMap = VersionedMap<Bytes, BrokerIdentifier, BrokerIdentifier>;
pub type DirectMap = VersionedMap<UserPublicKey, BrokerIdentifier, BrokerIdentifier>;
17 changes: 7 additions & 10 deletions broker/src/connections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use parking_lot::RwLock;
use proto::{
connection::{
protocols::{Protocol, Sender},
Bytes,
Bytes, UserPublicKey,
},
discovery::BrokerIdentifier,
message::Topic,
Expand All @@ -24,16 +24,13 @@ mod broadcast;
mod direct;
mod versioned;

/// Associated type for readability
type UserPublicKey = Bytes;

/// Stores information about all current connections.
pub struct Connections<BrokerProtocol: Protocol, UserProtocol: Protocol> {
// Our identity. Used for versioned vector conflict resolution.
identity: BrokerIdentifier,

// The current users connected to us
users: DashMap<Bytes, UserProtocol::Sender>,
users: DashMap<UserPublicKey, UserProtocol::Sender>,
// The current brokers connected to us
brokers: DashMap<BrokerIdentifier, BrokerProtocol::Sender>,

Expand Down Expand Up @@ -137,7 +134,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
/// keeps track of which users are connected where.
pub fn add_user(
self: &Arc<Self>,
user_public_key: Bytes,
user_public_key: UserPublicKey,
connection: <UserProtocol as Protocol>::Sender,
) {
// Add to our map
Expand Down Expand Up @@ -166,7 +163,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
/// from our broadcast map, in case they were subscribed to any topics, and
/// the versioned vector map. This is so other brokers don't keep trying
/// to send us messages for a disconnected user.
pub fn remove_user(self: &Arc<Self>, user_public_key: Bytes) {
pub fn remove_user(self: &Arc<Self>, user_public_key: UserPublicKey) {
// Remove from user list
self.users.remove(&user_public_key);

Expand All @@ -191,7 +188,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
}

/// Locally subscribe a user to some topics.
pub fn subscribe_user_to(&self, user_public_key: &Bytes, topics: Vec<Topic>) {
pub fn subscribe_user_to(&self, user_public_key: &UserPublicKey, topics: Vec<Topic>) {
self.broadcast_map
.users
.write()
Expand All @@ -207,7 +204,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco
}

/// Locally unsubscribe a broker from some topics.
pub fn unsubscribe_user_from(&self, user_public_key: &Bytes, topics: &[Topic]) {
pub fn unsubscribe_user_from(&self, user_public_key: &UserPublicKey, topics: &[Topic]) {
self.broadcast_map
.users
.write()
Expand Down Expand Up @@ -261,7 +258,7 @@ impl<BrokerProtocol: Protocol, UserProtocol: Protocol> Connections<BrokerProtoco

/// Send a message to a user connected to us.
/// If it fails, the user is removed from our map.
pub fn send_to_user(self: &Arc<Self>, user_public_key: Bytes, message: Bytes) {
pub fn send_to_user(self: &Arc<Self>, user_public_key: UserPublicKey, message: Bytes) {
// See if the user is connected
if let Some(connection) = self.users.get(&user_public_key) {
// If they are, clone things we will need
Expand Down
17 changes: 10 additions & 7 deletions broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use proto::{
connection::{
auth::broker::BrokerAuth,
protocols::{Protocol, Receiver},
Bytes,
UserPublicKey,
},
crypto::signature::SignatureScheme,
discovery::BrokerIdentifier,
Expand Down Expand Up @@ -103,22 +103,25 @@ impl<
broker_identifier: &BrokerIdentifier,
receiver: <BrokerProtocol as Protocol>::Receiver,
) -> Result<()> {
while let Ok(message) = receiver.recv_message().await {
while let Ok(raw_message) = receiver.recv_message_raw().await {
// Attempt to deserialize the message
// TODO: FIXED SIZE RECIPIENT FOR DESERIALIZATION
let message = Message::deserialize(&raw_message)?;

match message {
// If we receive a direct message from a broker, we want to send it to the user with that key
Message::Direct(ref direct) => {
let message = Bytes::from(message.serialize().expect("serialization failed"));
let user_public_key = Bytes::from(direct.recipient.clone());
let user_public_key = UserPublicKey::from(direct.recipient.clone());

self.connections.send_direct(user_public_key, message, true);
self.connections
.send_direct(user_public_key, raw_message, true);
}

// If we receive a broadcast message from a broker, we want to send it to all interested users
Message::Broadcast(ref broadcast) => {
let message = Bytes::from(message.serialize().expect("serialization failed"));
let topics = broadcast.topics.clone();

self.connections.send_broadcast(topics, &message, true);
self.connections.send_broadcast(topics, &raw_message, true);
}

// If we receive a subscribe message from a broker, we add them as "interested" locally.
Expand Down
28 changes: 16 additions & 12 deletions broker/src/handlers/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::sync::Arc;
#[cfg(feature = "strong_consistency")]
use proto::discovery::DiscoveryClient;

use proto::connection::Bytes;
use proto::connection::UserPublicKey;
use proto::error::{Error, Result};
use proto::{
connection::{
auth::broker::BrokerAuth,
Expand Down Expand Up @@ -44,7 +45,7 @@ impl<
};

// Create a human-readable user identifier (by public key)
let public_key = Bytes::from(public_key);
let public_key = UserPublicKey::from(public_key);
let user_identifier = mnemonic(&public_key);
info!("{user_identifier} connected");

Expand Down Expand Up @@ -81,7 +82,7 @@ impl<
metrics::NUM_USERS_CONNECTED.inc();

// This runs the main loop for receiving information from the user
let () = self.user_receive_loop(&public_key, receiver).await;
let _ = self.user_receive_loop(&public_key, receiver).await;

info!("{user_identifier} disconnected");

Expand All @@ -96,26 +97,28 @@ impl<
/// should remove the user from the map.
pub async fn user_receive_loop(
&self,
public_key: &Bytes,
public_key: &UserPublicKey,
receiver: <UserProtocol as Protocol>::Receiver,
) {
while let Ok(message) = receiver.recv_message().await {
) -> Result<()> {
while let Ok(raw_message) = receiver.recv_message_raw().await {
// Attempt to deserialize the message
// TODO: FIXED SIZE RECIPIENT FOR DESERIALIZATION
let message = Message::deserialize(&raw_message)?;

match message {
// If we get a direct message from a user, send it to both users and brokers.
Message::Direct(ref direct) => {
let message = Bytes::from(message.serialize().expect("serialization failed"));
let user_public_key = Bytes::from(direct.recipient.clone());
let user_public_key = UserPublicKey::from(direct.recipient.clone());

self.connections
.send_direct(user_public_key, message, false);
.send_direct(user_public_key, raw_message, false);
}

// If we get a broadcast message from a user, send it to both brokers and users.
Message::Broadcast(ref broadcast) => {
let message = Bytes::from(message.serialize().expect("serialization failed"));
let topics = broadcast.topics.clone();

self.connections.send_broadcast(topics, &message, false);
self.connections.send_broadcast(topics, &raw_message, false);
}

// Subscribe messages from users will just update the state locally
Expand All @@ -129,8 +132,9 @@ impl<
.unsubscribe_user_from(public_key, &unsubscribe);
}

_ => return,
_ => return Err(Error::Connection("connection closed".to_string())),
}
}
Err(Error::Connection("connection closed".to_string()))
}
}
34 changes: 16 additions & 18 deletions broker/src/tasks/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Duration};

use proto::{
bail,
connection::protocols::Protocol,
connection::{protocols::Protocol, Bytes},
crypto::signature::SignatureScheme,
discovery::BrokerIdentifier,
error::{Error, Result},
Expand All @@ -25,7 +25,7 @@ macro_rules! prepare_sync_message {
);

// Wrap the message in `UserSync` and serialize it
Arc::from(bail!(
Bytes::from(bail!(
Message::UserSync(message.to_vec()).serialize(),
Serialize,
"failed to serialize full user sync map"
Expand All @@ -49,11 +49,9 @@ impl<
// Get full user sync map
let full_sync_map = self.connections.get_full_user_sync();

// Serialize the message
let raw_message = prepare_sync_message!(full_sync_map);

// Send it to the broker
self.connections.send_to_broker(broker, raw_message);
// Serialize and send the message to the broker
self.connections
.send_to_broker(broker, prepare_sync_message!(full_sync_map));

Ok(())
}
Expand Down Expand Up @@ -90,15 +88,15 @@ impl<
// Get full list of topics
let topics = self.connections.get_full_topic_sync();

// Serialize the message
let raw_message = Arc::from(bail!(
Message::Subscribe(topics).serialize(),
Serialize,
"failed to serialize topics"
));

// Send to the specified broker
self.connections.send_to_broker(broker, raw_message);
// Serialize and send the message
self.connections.send_to_broker(
broker,
Bytes::from(bail!(
Message::Subscribe(topics).serialize(),
Serialize,
"failed to serialize topics"
)),
);

Ok(())
}
Expand All @@ -115,7 +113,7 @@ impl<
// If we have some additions,
if !additions.is_empty() {
// Serialize the subscribe message
let raw_subscribe_message = Arc::from(bail!(
let raw_subscribe_message = Bytes::from(bail!(
Message::Subscribe(additions).serialize(),
Serialize,
"failed to serialize topics"
Expand All @@ -126,7 +124,7 @@ impl<
// If we have some removals,
if !removals.is_empty() {
// Serialize the unsubscribe message
let raw_unsubscribe_message = Arc::from(bail!(
let raw_unsubscribe_message = Bytes::from(bail!(
Message::Unsubscribe(removals).serialize(),
Serialize,
"failed to serialize topics"
Expand Down
4 changes: 2 additions & 2 deletions broker/src/tests/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Deterministic tests for sending and receiving broadcast messages.
//! Asserts they all go to the right place.
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use super::RunDefinition;
use crate::{assert_received, send_message_as};
use proto::{
connection::protocols::Sender,
connection::{protocols::Sender, Bytes},
message::{Broadcast, Message, Topic},
};
use tokio::time::{sleep, timeout};
Expand Down
4 changes: 2 additions & 2 deletions broker/src/tests/direct.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Deterministic tests for sending and receiving direct messages.
//! Asserts they all go to the right place.
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use super::RunDefinition;
use crate::{assert_received, send_message_as};
use proto::{
connection::protocols::Sender,
connection::{protocols::Sender, Bytes},
message::{Direct, Message, Topic},
};
use tokio::time::{sleep, timeout};
Expand Down
2 changes: 1 addition & 1 deletion broker/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ macro_rules! assert_received {
// Assert the message is the correct one
assert!(
message
== Ok(Arc::from(
== Ok(Bytes::from(
$message
.serialize()
.expect("failed to re-serialize message")
Expand Down
Loading

0 comments on commit fc35a26

Please sign in to comment.