Skip to content

Commit

Permalink
Merge pull request #3 from EspressoSystems/rm/file-reorg
Browse files Browse the repository at this point in the history
file reorganization
  • Loading branch information
rob-maron authored Feb 12, 2024
2 parents 2b226a7 + d6582f7 commit df65606
Show file tree
Hide file tree
Showing 20 changed files with 754 additions and 715 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ ark-serialize = "0.4.2"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
clap = { version = "4.4.18", features = ["derive"] }
async-trait = "0.1.77"
paste = "1.0.14"
3 changes: 2 additions & 1 deletion broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ clap.workspace = true
local-ip-address = "0.5.7"
slotmap = "1.0.7"
delegate = "0.12.0"
paste = "1.0.14"
async-trait.workspace = true
paste.workspace = true
232 changes: 146 additions & 86 deletions broker/src/handlers/broker.rs
Original file line number Diff line number Diff line change
@@ -1,97 +1,157 @@
//! This file defines the broker connection handler.
use std::{sync::Arc, time::Duration};

use jf_primitives::signatures::SignatureScheme as JfSignatureScheme;
// TODO: figure out if we should use Tokio's here
use proto::{
authenticate_with_broker,
connection::{auth::broker::BrokerAuth, batch::BatchedSender, protocols::Protocol},
crypto::Serializable,
connection::{
auth::broker::BrokerAuth,
batch::{BatchedSender, Position},
protocols::{Protocol, Receiver},
},
crypto::{Scheme, Serializable},
error::{Error, Result},
message::Message,
verify_broker,
};
use tracing::{error, info};

use crate::{get_lock, Inner};

/// This function is the callback for handling a broker (private) connection.
pub async fn handle_connection<
BrokerSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
BrokerProtocolType: Protocol,
UserSignatureScheme: JfSignatureScheme<PublicParameter = (), MessageUnit = u8>,
UserProtocolType: Protocol,
>(
inner: Arc<
Inner<BrokerSignatureScheme, BrokerProtocolType, UserSignatureScheme, UserProtocolType>,
>,
mut connection: (BrokerProtocolType::Sender, BrokerProtocolType::Receiver),
is_outbound: bool,
) where
UserSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
use crate::{
get_lock, send_broadcast, send_direct, send_or_remove_many, state::ConnectionId, Inner,
};

impl<
BrokerSignatureScheme: Scheme,
BrokerProtocolType: Protocol,
UserSignatureScheme: Scheme,
UserProtocolType: Protocol,
> Inner<BrokerSignatureScheme, BrokerProtocolType, UserSignatureScheme, UserProtocolType>
where
BrokerSignatureScheme::VerificationKey: Serializable,
BrokerSignatureScheme::SigningKey: Serializable,
BrokerSignatureScheme::Signature: Serializable,
UserSignatureScheme::VerificationKey: Serializable,
UserSignatureScheme::Signature: Serializable,
{
// Depending on which way the direction came in, we will want to authenticate with a different
// flow.
let broker_address = if is_outbound {
// If we reached out to the other broker first, authenticate first.
let broker_address = authenticate_with_broker!(connection, inner);
verify_broker!(connection, inner);
broker_address
} else {
// If the other broker reached out to us first, authenticate second.
verify_broker!(connection, inner);
authenticate_with_broker!(connection, inner)
};

// Create new batch sender
let (sender, receiver) = connection;
// TODO: parameterize max interval and max size
let sender = Arc::from(BatchedSender::<BrokerProtocolType>::from(
sender,
Duration::from_millis(50),
1500,
));

// Add to our connected broker identities so we don't try to reconnect
get_lock!(inner.connected_broker_identities, write).insert(broker_address.clone());

// Freeze the sender before adding it to our connections so we don't receive messages out of order.
// This is to enforce message ordering
let _ = sender.freeze();

// Add our connection to the list of connections
let connection_id = inner
.broker_connection_lookup
.write()
.await
.add_connection(sender.clone());

// Get all brokers (excluding ourselves)
let all_brokers = get_lock!(inner.broker_connection_lookup, read).get_all_connections();

// Send all relevant updates to brokers, flushing our updates. Send the partial updates
// to everyone, and the full to the new broker.
let _ = inner
.send_updates_to_brokers(all_brokers, vec![(connection_id, sender.clone())])
.await;

// Unfreeze the sender, flushing the updates
let _ = sender.unfreeze();

info!("connected to broker {}", broker_address);

// If we error, come back to the callback so we can remove the connection from the list.
if let Err(err) = inner.broker_recv_loop(connection_id, receiver).await {
error!("broker disconnected with error: {err}");
};

info!("disconnected from broker {}", broker_address);

// Remove from the connected broker identities so that we may
// try to reconnect inthe future.
get_lock!(inner.connected_broker_identities, write).remove(&broker_address);

// Remove from our connections so that we don't send any more data
// their way.
get_lock!(inner.broker_connection_lookup, write).remove_connection(connection_id);
/// This function is the callback for handling a broker (private) connection.
pub async fn handle_broker_connection(
self: Arc<Self>,
mut connection: (BrokerProtocolType::Sender, BrokerProtocolType::Receiver),
is_outbound: bool,
) {
// Depending on which way the direction came in, we will want to authenticate with a different
// flow.
let broker_address = if is_outbound {
// If we reached out to the other broker first, authenticate first.
let broker_address = authenticate_with_broker!(connection, self);
verify_broker!(connection, self);
broker_address
} else {
// If the other broker reached out to us first, authenticate second.
verify_broker!(connection, self);
authenticate_with_broker!(connection, self)
};

// Create new batch sender
let (sender, receiver) = connection;
// TODO: parameterize max interval and max size
let sender = Arc::from(BatchedSender::<BrokerProtocolType>::from(
sender,
Duration::from_millis(50),
1500,
));

// Add to our connected broker identities so we don't try to reconnect
get_lock!(self.connected_broker_identities, write).insert(broker_address.clone());

// Freeze the sender before adding it to our connections so we don't receive messages out of order.
// This is to enforce message ordering
let _ = sender.freeze();

// Add our connection to the list of connections
let connection_id = self
.broker_connection_lookup
.write()
.await
.add_connection(sender.clone());

// Get all brokers (excluding ourselves)
let all_brokers = get_lock!(self.broker_connection_lookup, read).get_all_connections();

// Send all relevant updates to brokers, flushing our updates. Send the partial updates
// to everyone, and the full to the new broker.
let _ = self
.send_updates_to_brokers(all_brokers, vec![(connection_id, sender.clone())])
.await;

// Unfreeze the sender, flushing the updates
let _ = sender.unfreeze();

info!("connected to broker {}", broker_address);

// If we error, come back to the callback so we can remove the connection from the list.
if let Err(err) = self.broker_receive_loop(connection_id, receiver).await {
error!("broker disconnected with error: {err}");
};

info!("disconnected from broker {}", broker_address);

// Remove from the connected broker identities so that we may
// try to reconnect inthe future.
get_lock!(self.connected_broker_identities, write).remove(&broker_address);

// Remove from our connections so that we don't send any more data
// their way.
get_lock!(self.broker_connection_lookup, write).remove_connection(connection_id);
}

pub async fn broker_receive_loop(
&self,
connection_id: ConnectionId,
mut receiver: BrokerProtocolType::Receiver,
) -> Result<()> {
while let Ok(message) = receiver.recv_message().await {
match message {
// If we receive a direct message from a broker, we want to send it to all users with that key
Message::Direct(ref direct) => {
let message: Arc<Vec<u8>> =
Arc::from(message.serialize().expect("serialization failed"));

send_direct!(self.user_connection_lookup, direct.recipient, message);
}

// If we receive a broadcast message from a broker, we want to send it to all interested users
Message::Broadcast(ref broadcast) => {
let message: Arc<Vec<u8>> =
Arc::from(message.serialize().expect("serialization failed"));

send_broadcast!(self.user_connection_lookup, broadcast.topics, message);
}

// If we receive a subscribe message from a broker, we add them as "interested" locally.
Message::Subscribe(subscribe) => get_lock!(self.broker_connection_lookup, write)
.subscribe_connection_id_to_topics(connection_id, subscribe),

// If we receive a subscribe message from a broker, we remove them as "interested" locally.
Message::Unsubscribe(unsubscribe) => {
get_lock!(self.broker_connection_lookup, write)
.unsubscribe_connection_id_from_topics(connection_id, unsubscribe);
}

// If a broker has told us they have some users connected, we update our map as such
Message::UsersConnected(users) => get_lock!(self.broker_connection_lookup, write)
.subscribe_connection_id_to_keys(connection_id, users),

// If a broker has told us they have some users disconnected, we update our map as such
Message::UsersDisconnected(users) => {
get_lock!(self.broker_connection_lookup, write)
.unsubscribe_connection_id_from_keys(connection_id, users);
}

// Do nothing if we receive an unexpected message
_ => {}
}
}

Err(Error::Connection("connection closed".to_string()))
}
}
69 changes: 69 additions & 0 deletions broker/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,71 @@
//! This file defines the handler module, wherein we define connection handlers for
//! `Arc<Inner>`.
pub mod broker;
pub mod user;

/// This macro is a helper macro that lets us "send many messages", and remove
/// the actor from the local state if the message failed to send
#[macro_export]
macro_rules! send_or_remove_many {
($connections: expr, $lookup:expr, $message: expr, $position: expr) => {
// For each connection,
for connection in $connections {
// Queue a message back
if connection
.1
.queue_message($message.clone(), $position)
.is_err()
{
// If it fails, remove the connection.
get_lock!($lookup, write).remove_connection(connection.0);
};
}
};
}

/// We use this macro to help send direct messages. It just makes the code easier
/// to look at.
#[macro_export]
macro_rules! send_direct {
($lookup: expr, $key: expr, $message: expr) => {{
let connections = $lookup.read().await.get_connections_by_key(&$key).clone();
send_or_remove_many!(connections, $lookup, $message, Position::Back);
}};
}

/// We use this macro to help send broadcast messages. It just makes the code easier
/// to look at.
#[macro_export]
macro_rules! send_broadcast {
($lookup:expr, $topics: expr, $message: expr) => {{
let connections = $lookup
.read()
.await
.get_connections_by_topic($topics.clone())
.clone();
send_or_remove_many!(connections, $lookup, $message, Position::Back);
}};
}

/// This is a macro to acquire an async lock, which helps readability.
#[macro_export]
macro_rules! get_lock {
($lock :expr, $type: expr) => {
paste::item! {
$lock.$type().await
}
};
}

// Creates and serializes a new message of the specified type with the specified data.
#[macro_export]
macro_rules! new_serialized_message {
($type: ident, $data: expr) => {
Arc::<Vec<u8>>::from(bail!(
Message::$type($data).serialize(),
Connection,
"broker disconnected"
))
};
}
Loading

0 comments on commit df65606

Please sign in to comment.