Skip to content

Commit

Permalink
chore: Refactor Network Subcommands (#108)
Browse files Browse the repository at this point in the history
### Description

Refactors the network subcommands to make them more explicit.

Now there is just

Discovery subcommand
```
hera disc
```

Gossip subcommand
```
hera gossip
```

---------

Co-authored-by: nicolas <[email protected]>
  • Loading branch information
refcell and merklefruit authored Oct 24, 2024
1 parent 9c1af7a commit 5aec5a5
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 58 deletions.
45 changes: 45 additions & 0 deletions bin/hera/src/disc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//! Discovery subcommand for Hera.
use crate::globals::GlobalArgs;
use clap::Args;
use eyre::Result;
use op_net::discovery::builder::DiscoveryBuilder;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

/// The Hera discovery subcommand.
#[derive(Debug, Clone, Args)]
#[non_exhaustive]
pub struct DiscCommand {
/// Port to listen for gossip on.
#[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")]
pub gossip_port: u16,
/// Interval to send discovery packets.
#[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")]
pub interval: u64,
}

impl DiscCommand {
/// Run the discovery subcommand.
pub async fn run(self, args: &GlobalArgs) -> Result<()> {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port);
tracing::info!("Starting discovery service on {:?}", socket);

let mut discovery_builder =
DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id);
let mut discovery = discovery_builder.build()?;
discovery.interval = std::time::Duration::from_secs(self.interval);
let mut peer_recv = discovery.start()?;
tracing::info!("Discovery service started, receiving peers.");

loop {
match peer_recv.recv().await {
Some(peer) => {
tracing::info!("Received peer: {:?}", peer);
}
None => {
tracing::warn!("Failed to receive peer");
}
}
}
}
}
52 changes: 15 additions & 37 deletions bin/hera/src/network.rs → bin/hera/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,27 @@
//! Networking subcommand for Hera.
//! Gossip subcommand for Hera.
use crate::globals::GlobalArgs;
use clap::Args;
use eyre::Result;
use op_net::{discovery::builder::DiscoveryBuilder, driver::NetworkDriver};
use op_net::driver::NetworkDriver;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use superchain::ROLLUP_CONFIGS;

/// The Hera network subcommand.
/// The Hera gossip subcommand.
#[derive(Debug, Clone, Args)]
#[non_exhaustive]
pub struct NetworkCommand {
/// Run the peer discovery service.
#[clap(long, short = 'p', help = "Runs only peer discovery")]
pub only_disc: bool,
pub struct GossipCommand {
/// Port to listen for gossip on.
#[clap(long, short = 'l', default_value = "9099", help = "Port to listen for gossip on")]
pub gossip_port: u16,
/// Interval to send discovery packets.
#[clap(long, short = 'i', default_value = "1", help = "Interval to send discovery packets")]
pub interval: u64,
}

impl NetworkCommand {
/// Run the network subcommand.
impl GossipCommand {
/// Run the gossip subcommand.
pub async fn run(self, args: &GlobalArgs) -> Result<()> {
if self.only_disc {
self.run_discovery(args).await
} else {
self.run_network(args)
}
}

/// Runs the full network.
pub fn run_network(&self, args: &GlobalArgs) -> Result<()> {
let signer = ROLLUP_CONFIGS
.get(&args.l2_chain_id)
.ok_or(eyre::eyre!("No rollup config found for chain ID"))?
Expand All @@ -39,15 +30,21 @@ impl NetworkCommand {
.as_ref()
.ok_or(eyre::eyre!("No system config found for chain ID"))?
.batcher_address;
tracing::info!("Gossip configured with signer: {:?}", signer);

let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port);
tracing::info!("Starting gossip driver on {:?}", socket);

let mut driver = NetworkDriver::builder()
.with_chain_id(args.l2_chain_id)
.with_unsafe_block_signer(signer)
.with_gossip_addr(socket)
.with_interval(std::time::Duration::from_secs(self.interval))
.build()?;
let recv =
driver.take_unsafe_block_recv().ok_or(eyre::eyre!("No unsafe block receiver"))?;
driver.start()?;
tracing::info!("Gossip driver started, receiving blocks.");
loop {
match recv.recv() {
Ok(block) => {
Expand All @@ -59,23 +56,4 @@ impl NetworkCommand {
}
}
}

/// Runs only the discovery service.
pub async fn run_discovery(&self, args: &GlobalArgs) -> Result<()> {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), self.gossip_port);
let mut discovery_builder =
DiscoveryBuilder::new().with_address(socket).with_chain_id(args.l2_chain_id);
let discovery = discovery_builder.build()?;
let mut peer_recv = discovery.start()?;
loop {
match peer_recv.recv().await {
Some(peer) => {
tracing::info!("Received peer: {:?}", peer);
}
None => {
tracing::warn!("Failed to receive peer");
}
}
}
}
}
12 changes: 8 additions & 4 deletions bin/hera/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
use clap::{Parser, Subcommand};
use eyre::Result;

mod disc;
mod globals;
mod network;
mod gossip;
mod node;

/// The Hera CLI Arguments.
Expand All @@ -30,8 +31,10 @@ pub(crate) struct HeraArgs {
pub(crate) enum HeraSubcommand {
/// Run the standalone Hera node.
Node(node::NodeCommand),
/// Networking utility commands.
Network(network::NetworkCommand),
/// Discovery service command.
Disc(disc::DiscCommand),
/// Gossip service command.
Gossip(gossip::GossipCommand),
}

#[tokio::main]
Expand All @@ -45,6 +48,7 @@ async fn main() -> Result<()> {
// Dispatch on subcommand.
match args.subcommand {
HeraSubcommand::Node(node) => node.run(&args.global).await,
HeraSubcommand::Network(network) => network.run(&args.global).await,
HeraSubcommand::Disc(disc) => disc.run(&args.global).await,
HeraSubcommand::Gossip(gossip) => gossip.run(&args.global).await,
}
}
15 changes: 8 additions & 7 deletions crates/net/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ Contains a gossipsub driver to run discv5 peer discovery and block gossip.

### Example

> **Warning**
>
> Notice, the socket address uses `0.0.0.0`.
> If you are experiencing issues connecting to peers for discovery,
> check to make sure you are not using the loopback address,
> `127.0.0.1` aka "localhost", which can prevent outward facing connections.
```rust,no_run
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use alloy_primitives::address;
Expand All @@ -25,15 +32,9 @@ driver.start().expect("Failed to start network driver");
println!("NetworkDriver started.");
```

> [!WARNING]
>
> Notice, the socket address uses `0.0.0.0`.
> If you are experiencing issues connecting to peers for discovery,
> check to make sure you are not using the loopback address,
> `127.0.0.1` aka "localhost", which can prevent outward facing connections.
[!WARNING]: ###example

### Acknowledgements

Largely based off [magi](https://github.com/a16z/magi)'s [p2p module](https://github.com/a16z/magi/tree/master/src/network).

12 changes: 10 additions & 2 deletions crates/net/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ pub struct NetworkDriverBuilder {
pub discovery_addr: Option<ListenConfig>,
/// The [GossipConfig] constructs the config for `gossipsub`.
pub gossip_config: Option<GossipConfig>,

/// The interval to discovery random nodes.
pub interval: Option<Duration>,
/// The [Config] constructs the config for `discv5`.
pub discovery_config: Option<Config>,
/// The [Keypair] for the node.
Expand Down Expand Up @@ -67,6 +68,12 @@ impl NetworkDriverBuilder {
self
}

/// Specifies the interval to discovery random nodes.
pub fn with_interval(&mut self, interval: Duration) -> &mut Self {
self.interval = Some(interval);
self
}

/// Specifies the socket address that the gossip service is listening on.
pub fn with_gossip_addr(&mut self, socket: SocketAddr) -> &mut Self {
self.gossip_addr = Some(socket);
Expand Down Expand Up @@ -278,7 +285,8 @@ impl NetworkDriverBuilder {
discovery_builder = discovery_builder.with_discovery_config(discovery_config);
}

let discovery = discovery_builder.build()?;
let mut discovery = discovery_builder.build()?;
discovery.interval = self.interval.unwrap_or(Duration::from_secs(10));

Ok(NetworkDriver {
discovery,
Expand Down
6 changes: 4 additions & 2 deletions crates/net/src/discovery/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct DiscoveryDriver {
pub disc: Discv5,
/// The chain ID of the network.
pub chain_id: u64,
/// The interval to discovery random nodes.
pub interval: Duration,
}

impl DiscoveryDriver {
Expand All @@ -34,7 +36,7 @@ impl DiscoveryDriver {

/// Instantiates a new [DiscoveryDriver].
pub fn new(disc: Discv5, chain_id: u64) -> Self {
Self { disc, chain_id }
Self { disc, chain_id, interval: Duration::from_secs(10) }
}

/// Spawns a new [Discv5] discovery service in a new tokio task.
Expand Down Expand Up @@ -108,7 +110,7 @@ impl DiscoveryDriver {
}
}

sleep(Duration::from_secs(10)).await;
sleep(self.interval).await;
}
});

Expand Down
4 changes: 3 additions & 1 deletion crates/net/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ impl NetworkDriver {
loop {
select! {
peer = peer_recv.recv() => {
self.gossip.dial_opt(peer).await;
self.gossip.dial_opt(peer.clone()).await;
tracing::info!("Received peer: {:?} | Connected peers: {:?}", peer, self.gossip.connected_peers());
},
event = self.gossip.select_next_some() => {
tracing::debug!("Received event: {:?}", event);
self.gossip.handle_event(event);
},
}
Expand Down
10 changes: 8 additions & 2 deletions crates/net/src/gossip/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ impl GossipDriver {
self.swarm.select_next_some().await
}

/// Returns the number of connected peers.
pub fn connected_peers(&self) -> usize {
self.swarm.connected_peers().count()
}

/// Dials the given [`Option<Multiaddr>`].
pub async fn dial_opt(&mut self, peer: Option<impl Into<Multiaddr>>) {
let Some(addr) = peer else {
return;
};
if let Err(e) = self.dial(addr).await {
error!("Failed to dial peer: {:?}", e);
match self.dial(addr).await {
Ok(_) => info!("Dialed peer"),
Err(e) => error!("Failed to dial peer: {:?}", e),
}
}

Expand Down
10 changes: 7 additions & 3 deletions crates/net/src/gossip/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ impl Handler for BlockHandler {
tracing::debug!("received block");

let decoded = if msg.topic == self.blocks_v1_topic.hash() {
tracing::debug!("received v1 block");
OpNetworkPayloadEnvelope::decode_v1(&msg.data)
} else if msg.topic == self.blocks_v2_topic.hash() {
tracing::debug!("received v2 block");
OpNetworkPayloadEnvelope::decode_v2(&msg.data)
} else if msg.topic == self.blocks_v3_topic.hash() {
tracing::debug!("received v3 block");
OpNetworkPayloadEnvelope::decode_v3(&msg.data)
} else {
return MessageAcceptance::Reject;
Expand Down Expand Up @@ -115,11 +118,12 @@ impl BlockHandler {

let msg = envelope.payload_hash.signature_message(self.chain_id);
let block_signer = *self.unsafe_signer_recv.borrow();
let Ok(msg_signer) = envelope.signature.recover_address_from_msg(msg) else {
// TODO: add telemetry here if this happens.
let Ok(msg_signer) = envelope.signature.recover_address_from_prehash(&msg) else {
tracing::warn!("Failed to recover address from message");
return false;
};

time_valid && msg_signer == block_signer
let signer_valid = msg_signer == block_signer;
time_valid && signer_valid
}
}

0 comments on commit 5aec5a5

Please sign in to comment.