Skip to content

Commit

Permalink
Test framework for restartability
Browse files Browse the repository at this point in the history
Set up some Rust automation for tests that spin up a sequencer
network and restart various combinations of nodes, checking that we
recover liveness. Instantiate the framework with several combinations
of nodes as outlined in https://www.notion.so/espressosys/Persistence-catchup-and-restartability-cf4ddb79df2e41a993e60e3beaa28992.

As expected, the tests where we restart >f nodes do not pass yet,
and are ignored. The others pass locally.

There are many things left to test here, including:
* Testing with a valid libp2p setup
* Testing with _only_ libp2p and no CDN
* Checking integrity of the DA/query service during and after restart
But this is a pretty good starting point.

I considered doing this with something more dynamic like Bash or
Python scripting, leaning on our existing docker-compose or process-compose
infrastructure to spin up a network. I avoided this for a few reasons:
* process-compose is annoying to script and in particular has limited
  capabilities for shutting down and starting up processes
* both docker-compose and process-compose make it hard to dynamically
  choose the network topology
* once the basic test infrastructure is out of the way, Rust is far
  easier to work with for writing new checks and assertions. For
  example, checking for progress is way easier when we can plug
  directly into the HotShot event stream, vs subscribing to some
  stream via HTTP and parsing responses with jq
  • Loading branch information
jbearer committed Jun 17, 2024
1 parent 7336345 commit 6fa3af4
Show file tree
Hide file tree
Showing 9 changed files with 696 additions and 79 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ rand_chacha = "0.3"
rand_distr = "0.4"
reqwest = "0.12"
serde = { version = "1.0.195", features = ["derive"] }
tempfile = "3.9"
toml = "0.8"
url = "2.3"
vbs = "0.1"
Expand Down
5 changes: 3 additions & 2 deletions sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
testing = ["hotshot-testing"]
testing = ["hotshot-testing", "tempfile"]
libp2p = []

[[bin]]
Expand All @@ -22,7 +22,7 @@ hotshot-testing = { workspace = true }
pretty_assertions = { workspace = true }
rand = "0.8.5"
reqwest = { workspace = true }
tempfile = "3.9.0"
tempfile = { workspace = true }

[build-dependencies]
anyhow = { workspace = true }
Expand Down Expand Up @@ -80,6 +80,7 @@ itertools = { workspace = true }
jf-crhf = { workspace = true }
jf-merkle-tree = { workspace = true }
jf-rescue = { workspace = true }
tempfile = { workspace = true, optional = true }

jf-signature = { workspace = true, features = ["bls", "schnorr"] }
jf-utils = { workspace = true } # TODO temporary: used only for test_rng()
Expand Down
122 changes: 60 additions & 62 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,6 @@ pub mod test_helpers {
#[cfg(test)]
#[espresso_macros::generic_tests]
mod api_tests {
use self::options::HotshotEvents;

use super::*;
use crate::{
persistence::no_storage,
Expand Down Expand Up @@ -827,66 +825,6 @@ mod api_tests {
let storage = D::create_storage().await;
catchup_test_helper(|opt| D::options(&storage, opt)).await
}

#[async_std::test]
pub(crate) async fn test_hotshot_event_streaming<D: TestableSequencerDataSource>() {
use hotshot_events_service::events_source::BuilderEvent;
use HotshotEvents;

setup_logging();

setup_backtrace();

let hotshot_event_streaming_port =
pick_unused_port().expect("No ports free for hotshot event streaming");
let query_service_port = pick_unused_port().expect("No ports free for query service");

let url = format!("http://localhost:{hotshot_event_streaming_port}")
.parse()
.unwrap();

let hotshot_events = HotshotEvents {
events_service_port: hotshot_event_streaming_port,
};

let client: Client<ServerError, SequencerVersion> = Client::new(url);

let options = Options::with_port(query_service_port).hotshot_events(hotshot_events);

let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let _network = TestNetwork::new(
options,
[no_storage::Options; TestConfig::NUM_NODES],
l1,
None,
)
.await;

let mut subscribed_events = client
.socket("hotshot-events/events")
.subscribe::<BuilderEvent<SeqTypes>>()
.await
.unwrap();

let total_count = 5;
// wait for these events to receive on client 1
let mut receive_count = 0;
loop {
let event = subscribed_events.next().await.unwrap();
tracing::info!(
"Received event in hotshot event streaming Client 1: {:?}",
event
);
receive_count += 1;
if receive_count > total_count {
tracing::info!("Client Received atleast desired events, exiting loop");
break;
}
}
// Offset 1 is due to the startup event info
assert_eq!(receive_count, total_count + 1);
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1281,4 +1219,64 @@ mod test {
.unwrap();
assert_eq!(chain, new_chain);
}

#[async_std::test]
async fn test_hotshot_event_streaming() {
use hotshot_events_service::events_source::BuilderEvent;
use options::HotshotEvents;

setup_logging();

setup_backtrace();

let hotshot_event_streaming_port =
pick_unused_port().expect("No ports free for hotshot event streaming");
let query_service_port = pick_unused_port().expect("No ports free for query service");

let url = format!("http://localhost:{hotshot_event_streaming_port}")
.parse()
.unwrap();

let hotshot_events = HotshotEvents {
events_service_port: hotshot_event_streaming_port,
};

let client: Client<ServerError, SequencerVersion> = Client::new(url);

let options = Options::with_port(query_service_port).hotshot_events(hotshot_events);

let anvil = Anvil::new().spawn();
let l1 = anvil.endpoint().parse().unwrap();
let _network = TestNetwork::new(
options,
[no_storage::Options; TestConfig::NUM_NODES],
l1,
None,
)
.await;

let mut subscribed_events = client
.socket("hotshot-events/events")
.subscribe::<BuilderEvent<SeqTypes>>()
.await
.unwrap();

let total_count = 5;
// wait for these events to receive on client 1
let mut receive_count = 0;
loop {
let event = subscribed_events.next().await.unwrap();
tracing::info!(
"Received event in hotshot event streaming Client 1: {:?}",
event
);
receive_count += 1;
if receive_count > total_count {
tracing::info!("Client Received atleast desired events, exiting loop");
break;
}
}
// Offset 1 is due to the startup event info
assert_eq!(receive_count, total_count + 1);
}
}
6 changes: 3 additions & 3 deletions sequencer/src/api/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,13 @@ impl From<HotShotConfig<PubKey>> for PublicHotShotConfig {
}
}

#[cfg(test)]
pub(crate) mod testing {
#[cfg(any(test, feature = "testing"))]
pub mod testing {
use super::super::Options;
use super::*;

#[async_trait]
pub(crate) trait TestableSequencerDataSource: SequencerDataSource {
pub trait TestableSequencerDataSource: SequencerDataSource {
type Storage: Sync;

async fn create_storage() -> Self::Storage;
Expand Down
2 changes: 1 addition & 1 deletion sequencer/src/api/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl SequencerDataSource for DataSource {

impl CatchupDataSource for DataSource {}

#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
mod impl_testable_data_source {
use super::*;
use crate::api::{self, data_source::testing::TestableSequencerDataSource};
Expand Down
2 changes: 1 addition & 1 deletion sequencer/src/api/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl CatchupDataSource for DataSource {
}
}

#[cfg(test)]
#[cfg(any(test, feature = "testing"))]
mod impl_testable_data_source {
use super::*;
use crate::api::{self, data_source::testing::TestableSequencerDataSource};
Expand Down
4 changes: 4 additions & 0 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ impl<N: network::Type, P: SequencerPersistence, Ver: StaticVersionType + 'static
self.handle.read().await.decided_state().await
}

pub fn node_id(&self) -> u64 {
self.node_state.node_id
}

pub fn node_state(&self) -> NodeState {
self.node_state.clone()
}
Expand Down
37 changes: 27 additions & 10 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use futures::future::FutureExt;
use hotshot_types::traits::metrics::NoMetrics;
use sequencer::{
api::{self, data_source::DataSourceOptions},
init_node,
context::SequencerContext,
init_node, network,
options::{Modules, Options},
persistence, Genesis, L1Params, NetworkParams,
};
Expand All @@ -24,12 +25,12 @@ async fn main() -> anyhow::Result<()> {
tracing::warn!("modules: {:?}", modules);

if let Some(storage) = modules.storage_fs.take() {
init_with_storage(modules, opt, storage, SEQUENCER_VERSION).await
run_with_storage(modules, opt, storage, SEQUENCER_VERSION).await
} else if let Some(storage) = modules.storage_sql.take() {
init_with_storage(modules, opt, storage, SEQUENCER_VERSION).await
run_with_storage(modules, opt, storage, SEQUENCER_VERSION).await
} else {
// Persistence is required. If none is provided, just use the local file system.
init_with_storage(
run_with_storage(
modules,
opt,
persistence::fs::Options::default(),
Expand All @@ -39,12 +40,30 @@ async fn main() -> anyhow::Result<()> {
}
}

async fn init_with_storage<S, Ver: StaticVersionType + 'static>(
async fn run_with_storage<S, Ver: StaticVersionType + 'static>(
modules: Modules,
opt: Options,
storage_opt: S,
bind_version: Ver,
) -> anyhow::Result<()>
where
S: DataSourceOptions,
{
let ctx = init_with_storage(modules, opt, storage_opt, bind_version).await?;

// Start doing consensus.
ctx.start_consensus().await;
ctx.join().await;

Ok(())
}

async fn init_with_storage<S, Ver: StaticVersionType + 'static>(
modules: Modules,
opt: Options,
storage_opt: S,
bind_version: Ver,
) -> anyhow::Result<SequencerContext<network::Production, S::Persistence, Ver>>
where
S: DataSourceOptions,
{
Expand Down Expand Up @@ -152,13 +171,11 @@ where
}
};

// Start doing consensus.
ctx.start_consensus().await;
ctx.join().await;

Ok(())
Ok(ctx)
}

mod restart_tests;

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading

0 comments on commit 6fa3af4

Please sign in to comment.