Skip to content

Commit

Permalink
externalize advertise addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Feb 13, 2024
1 parent 00d3987 commit 782be20
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 46 deletions.
34 changes: 17 additions & 17 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use tokio::{select, spawn, sync::RwLock};
pub struct Config<BrokerSignatureScheme: Scheme> {
/// The user (public) advertise address: what the marshals send to users upon authentication.
/// Users connect to us with this address.
pub user_advertise_address: String,
/// The uaser (public) bind address: the public-facing address we bind to.
pub user_bind_address: String,
pub public_advertise_address: String,
/// The user (public) bind address: the public-facing address we bind to.
pub public_bind_address: String,

/// The broker (private) advertise address: what other brokers use to connect to us.
pub broker_advertise_address: String,
pub private_advertise_address: String,
/// The broker (private) bind address: the private-facing address we bind to.
pub broker_bind_address: String,
pub private_bind_address: String,

/// The discovery endpoint. We use this to maintain consistency between brokers and marshals.
pub discovery_endpoint: String,
Expand Down Expand Up @@ -112,11 +112,11 @@ where
pub async fn new(config: Config<BrokerSignatureScheme>) -> Result<Self> {
// Extrapolate values from the underlying broker configuration
let Config {
user_advertise_address,
user_bind_address,
public_advertise_address,
public_bind_address,

broker_advertise_address,
broker_bind_address,
private_advertise_address,
private_bind_address,

keypair,

Expand All @@ -127,8 +127,8 @@ where

// Create a unique broker identifier
let identity = BrokerIdentifier {
user_advertise_address,
broker_advertise_address,
public_advertise_address,
private_advertise_address,
};

// Create the `Discovery` client we will use to maintain consistency
Expand All @@ -139,34 +139,34 @@ where
);

// Create the user (public) listener
let user_bind_address = parse_socket_address!(user_bind_address);
let public_bind_address = parse_socket_address!(public_bind_address);
let user_listener = bail!(
<UserProtocol as Protocol>::bind(
user_bind_address,
public_bind_address,
maybe_tls_cert_path.clone(),
maybe_tls_key_path.clone(),
)
.await,
Connection,
format!(
"failed to bind to private (broker) bind address {}",
broker_bind_address
private_bind_address
)
);

// Create the broker (private) listener
let broker_bind_address = parse_socket_address!(broker_bind_address);
let private_bind_address = parse_socket_address!(private_bind_address);
let broker_listener = bail!(
<BrokerProtocol as Protocol>::bind(
broker_bind_address,
private_bind_address,
maybe_tls_cert_path,
maybe_tls_key_path,
)
.await,
Connection,
format!(
"failed to bind to public (user) bind address {}",
user_bind_address
public_bind_address
)
);

Expand Down
46 changes: 30 additions & 16 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
use broker::{Broker, Config};
use clap::Parser;
use jf_primitives::signatures::bls_over_bn254::BLSOverBN254CurveSignatureScheme as BLS;
use local_ip_address::local_ip;
use proto::{
bail,
crypto::{generate_random_keypair, DeterministicRng},
error::Result,
error::{Error, Result},
};

#[derive(Parser, Debug)]
Expand All @@ -18,12 +20,16 @@ struct Args {
discovery_endpoint: String,

/// The port to bind to for connections from users
#[arg(short, long, default_value_t = 1738)]
user_bind_port: u16,
#[arg(long, default_value = "127.0.0.1:1738")]
public_advertise_address: String,

/// The port to bind to for connections from other brokers
#[arg(short, long, default_value_t = 1739)]
broker_bind_port: u16,
/// The (public) port to bind to for connections from users
#[arg(long, default_value_t = 1738)]
public_bind_port: u16,

/// The (private) port to bind to for connections from other brokers
#[arg(long, default_value_t = 1739)]
private_bind_port: u16,
}

#[tokio::main]
Expand All @@ -34,21 +40,29 @@ async fn main() -> Result<()> {
// Initialize tracing
tracing_subscriber::fmt::init();

// TODO: local IP address depending on whether or not we specified
// Get our local IP address
let private_ip_address = bail!(local_ip(), Connection, "failed to get local IP address");

// Create deterministic keys for brokers (for now, obviously)
let (signing_key, verification_key) = generate_random_keypair::<BLS, _>(DeterministicRng(0))?;

// Create our config (TODO stubbed out for now, do clap)
let broker_config = Config {
// These are the same because we are testing locally
// TODO: if possible make this better. Make it so that we can just specify
// a port or something.
user_advertise_address: format!("127.0.0.1:{}", args.user_bind_port),
user_bind_address: format!("127.0.0.1:{}", args.user_bind_port),

broker_advertise_address: format!("127.0.0.1:{}", args.broker_bind_port),
broker_bind_address: format!("127.0.0.1:{}", args.broker_bind_port),
// Public addresses: explicitly defined advertise address, bind address is on every interface
// but with the specified port.
public_advertise_address: args.public_advertise_address,
public_bind_address: format!("0.0.0.0:{}", args.public_bind_port),

// Private addresses: bind to the local interface with the specified port
private_advertise_address: format!(
"{}:{}",
private_ip_address,
args.private_bind_port
),
private_bind_address: format!(
"{}:{}",
private_ip_address,
args.private_bind_port
),

discovery_endpoint: args.discovery_endpoint,

Expand Down
2 changes: 1 addition & 1 deletion broker/src/tasks/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where
{
// TODO: make this into a separate function
// Extrapolate the address to connect to
let to_connect_address = broker.broker_advertise_address.clone();
let to_connect_address = broker.private_advertise_address.clone();

// Clone the inner because we need it for the possible new broker task
let inner = self.clone();
Expand Down
4 changes: 2 additions & 2 deletions process-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ processes:
condition: process_started

broker_0:
command: cargo run --package broker --features insecure --no-default-features -- -b 8082 -u 8083 -d "redis://:[email protected]:6379"
command: cargo run --package broker --features insecure --no-default-features -- --public-advertise-address 127.0.0.1:8083 --private-bind-port 8082 --public-bind-port 8083 -d "redis://:[email protected]:6379"
depends_on:
redis:
condition: process_started

broker_1:
command: cargo run --package broker --features insecure --no-default-features -- -b 8084 -u 8085 -d "redis://:[email protected]:6379"
command: cargo run --package broker --features insecure --no-default-features -- --public-advertise-address 127.0.0.1:8085 --private-bind-port 8084 --public-bind-port 8085 -d "redis://:[email protected]:6379"
depends_on:
redis:
condition: process_started
Expand Down
2 changes: 1 addition & 1 deletion proto/src/connection/auth/marshal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
// Form a response message
let response_message = Message::AuthenticateResponse(AuthenticateResponse {
permit,
context: broker_with_least_connections.user_advertise_address,
context: broker_with_least_connections.public_advertise_address,
});

// Send the permit to the user, along with the public broker advertise address
Expand Down
4 changes: 2 additions & 2 deletions proto/src/discovery/embedded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ impl DiscoveryClient for Embedded {
// We only "need" the identifier if we want to register
let identifier = identity.map_or_else(
|| BrokerIdentifier {
user_advertise_address: String::new(),
broker_advertise_address: String::new(),
public_advertise_address: String::new(),
private_advertise_address: String::new(),
},
|identifier| identifier,
);
Expand Down
10 changes: 5 additions & 5 deletions proto/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ pub trait DiscoveryClient: Sized + Clone + Sync + Send + 'static {
#[derive(Eq, PartialEq, Hash, Clone, Debug)]
pub struct BrokerIdentifier {
/// The address that a broker advertises to publicly (to users)
pub user_advertise_address: String,
pub public_advertise_address: String,
/// The address that a broker advertises to privately (to other brokers)
pub broker_advertise_address: String,
pub private_advertise_address: String,
}

/// We need this to convert in the opposite direction: to create a `String`
Expand All @@ -73,7 +73,7 @@ impl std::fmt::Display for BrokerIdentifier {
write!(
f,
"{}/{}",
self.user_advertise_address, self.broker_advertise_address
self.public_advertise_address, self.private_advertise_address
)
}
}
Expand All @@ -88,13 +88,13 @@ impl TryFrom<String> for BrokerIdentifier {

// Create a new `Self` from the split string
Ok(Self {
user_advertise_address: split
public_advertise_address: split
.next()
.ok_or_else(|| {
Error::Parse("failed to parse public advertise address from string".to_string())
})?
.to_string(),
broker_advertise_address: split
private_advertise_address: split
.next()
.ok_or_else(|| {
Error::Parse(
Expand Down
4 changes: 2 additions & 2 deletions proto/src/discovery/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ impl DiscoveryClient for Redis {
// We only "need" the identifier if we want to register with Redis
let identifier = identity.map_or_else(
|| BrokerIdentifier {
user_advertise_address: String::new(),
broker_advertise_address: String::new(),
public_advertise_address: String::new(),
private_advertise_address: String::new(),
},
|identifier| identifier,
);
Expand Down

0 comments on commit 782be20

Please sign in to comment.