Skip to content

Commit

Permalink
feat: (re)genesis graceful shutdown (#1821)
Browse files Browse the repository at this point in the history
closes: #1800 
closes: #1819
closes: #1820 

Resumability:
(Re)genesis can now be paused/resumed. In order to do so the start of
`FuelService` in `cli/run` needed to be split into `starting` and
`running` phases, both interrupt-able by signals.

Progress:
Progress bars are now shown when `stderr` is a `tty`. Otherwise logs
will be printed containing the current group, number of groups, table
name and ETA to completion.

Bars:

![image](https://github.com/FuelLabs/fuel-core/assets/37515857/b3d9467d-85a9-42b0-aa8b-7118ba612ae2)

Logs:

![image](https://github.com/FuelLabs/fuel-core/assets/37515857/d985dd82-d87e-4558-8ca7-935ef8404dc0)


Empty tables:
@MujkicA found an edge case while e2e testing. Fixed here by allowing
tables to not be present in snapshot when using parquet.

---------

Co-authored-by: Green Baneling <[email protected]>
Co-authored-by: Hannes Karppila <[email protected]>
  • Loading branch information
3 people authored Apr 15, 2024
1 parent 2e71605 commit 030c67f
Show file tree
Hide file tree
Showing 21 changed files with 532 additions and 225 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ Description of the upcoming release here.

### Fixed

- [#1821](https://github.com/FuelLabs/fuel-core/pull/1821): Can handle missing tables in snapshot.
- [#1814](https://github.com/FuelLabs/fuel-core/pull/1814): Bugfix: the `iter_all_by_prefix` was not working for all tables. The change adds a `Rust` level filtering.

### Added

- [#1821](https://github.com/FuelLabs/fuel-core/pull/1821): Propagate shutdown signal to (re)genesis. Also add progress bar for (re)genesis.
- [#1813](https://github.com/FuelLabs/fuel-core/pull/1813): Added back support for `/health` endpoint.
- [#1799](https://github.com/FuelLabs/fuel-core/pull/1799): Snapshot creation is now concurrent.

Expand Down
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ pin-project-lite = "0.2"
axum = "0.5"
once_cell = "1.16"
prometheus-client = "0.22.0"
indicatif = { version = "0.17", default-features = false }
itertools = { version = "0.12", default-features = false }
insta = "1.8"
tempfile = "3.4"
Expand Down
25 changes: 21 additions & 4 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use anyhow::Context;
use clap::Parser;
use fuel_core::{
chain_config::default_consensus_dev_key,
combined_database::CombinedDatabaseConfig,
combined_database::{
CombinedDatabase,
CombinedDatabaseConfig,
},
producer::Config as ProducerConfig,
service::{
config::Trigger,
Expand Down Expand Up @@ -377,16 +380,30 @@ pub async fn exec(command: Command) -> anyhow::Result<()> {
info!("Fuel Core version v{}", env!("CARGO_PKG_VERSION"));
trace!("Initializing in TRACE mode.");
// initialize the server
let server = FuelService::new_node(config).await?;
let combined_database = CombinedDatabase::from_config(&config.combined_db_config)?;

let service = FuelService::new(combined_database, config)?;

// Genesis could take a long time depending on the snapshot size. Start needs to be
// interruptible by the shutdown_signal
tokio::select! {
result = service.start_and_await() => {
result?;
}
_ = shutdown_signal() => {
service.stop();
}
}

// pause the main task while service is running
tokio::select! {
result = server.await_stop() => {
result = service.await_stop() => {
result?;
}
_ = shutdown_signal() => {}
}

server.stop_and_await().await?;
service.stop_and_await().await?;

Ok(())
}
Expand Down
8 changes: 2 additions & 6 deletions bin/fuel-core/src/cli/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ mod tests {
reader
.read::<T>()
.unwrap()
.map_ok(|group| group.data)
.into_iter()
.flatten_ok()
.try_collect()
.unwrap()
Expand Down Expand Up @@ -784,11 +784,7 @@ mod tests {
T::OwnedValue: serde::de::DeserializeOwned + core::fmt::Debug + PartialEq,
StateConfig: AsTable<T>,
{
let actual = reader
.read()
.unwrap()
.map(|group| group.unwrap().data)
.collect_vec();
let actual: Vec<_> = reader.read().unwrap().into_iter().try_collect().unwrap();

let expected = expected_data
.into_iter()
Expand Down
59 changes: 37 additions & 22 deletions crates/chain-config/src/config/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,47 +418,47 @@ impl StateConfig {

let coins = reader
.read::<Coins>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(coins);

let messages = reader
.read::<Messages>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(messages);

let contract_state = reader
.read::<ContractsState>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(contract_state);

let contract_balance = reader
.read::<ContractsAssets>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(contract_balance);

let contract_code = reader
.read::<ContractsRawCode>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

builder.add(contract_code);

let contract_utxo = reader
.read::<ContractsLatestUtxo>()?
.map_ok(|batch| batch.data)
.into_iter()
.flatten_ok()
.try_collect()?;

Expand Down Expand Up @@ -532,7 +532,8 @@ impl StateConfig {
}

pub use reader::{
IntoIter,
GroupIter,
Groups,
SnapshotReader,
};
#[cfg(feature = "parquet")]
Expand All @@ -544,13 +545,6 @@ pub use writer::{
};
pub const MAX_GROUP_SIZE: usize = usize::MAX;

#[derive(Debug, PartialEq, Eq, Clone)]
pub struct Group<T> {
pub index: usize,
pub data: Vec<T>,
}
pub(crate) type GroupResult<T> = anyhow::Result<Group<T>>;

#[cfg(test)]
mod tests {
use std::path::Path;
Expand Down Expand Up @@ -717,6 +711,25 @@ mod tests {
pretty_assertions::assert_eq!(da_block_height, da_block_height_decoded);
}

#[test_case::test_case(given_parquet_writer)]
#[test_case::test_case(given_json_writer)]
fn missing_tables_tolerated(writer: impl FnOnce(&Path) -> SnapshotWriter) {
// given
let temp_dir = tempfile::tempdir().unwrap();
let writer = writer(temp_dir.path());
let snapshot = writer
.close(13.into(), 14u64.into(), &ChainConfig::local_testnet())
.unwrap();

let reader = SnapshotReader::open(snapshot).unwrap();

// when
let coins = reader.read::<Coins>().unwrap();

// then
assert_eq!(coins.into_iter().count(), 0);
}

fn assert_roundtrip<T>(
writer: impl FnOnce(&Path) -> SnapshotWriter,
reader: impl FnOnce(SnapshotMetadata, usize) -> SnapshotReader,
Expand Down Expand Up @@ -755,7 +768,11 @@ mod tests {
.close(10.into(), DaBlockHeight(11), &ChainConfig::local_testnet())
.unwrap();

let actual_groups = reader(snapshot, group_size).read().unwrap().collect_vec();
let actual_groups = reader(snapshot, group_size)
.read()
.unwrap()
.into_iter()
.collect_vec();

// then
assert_groups_identical(&expected_groups, actual_groups, skip_n_groups);
Expand All @@ -779,7 +796,7 @@ mod tests {
fn write_groups<T>(
&mut self,
encoder: &mut SnapshotWriter,
) -> Vec<Group<TableEntry<T>>>
) -> Vec<Vec<TableEntry<T>>>
where
T: TableWithBlueprint,
T::OwnedKey: serde::Serialize,
Expand All @@ -789,29 +806,27 @@ mod tests {
{
let groups = self.generate_groups();
for group in &groups {
encoder.write(group.data.clone()).unwrap();
encoder.write(group.clone()).unwrap();
}
groups
}

fn generate_groups<T>(&mut self) -> Vec<Group<T>>
fn generate_groups<T>(&mut self) -> Vec<Vec<T>>
where
T: Randomize,
{
::std::iter::repeat_with(|| T::randomize(&mut self.rand))
.chunks(self.group_size)
.into_iter()
.map(|chunk| chunk.collect_vec())
.enumerate()
.map(|(index, data)| Group { index, data })
.take(self.num_groups)
.collect()
}
}

fn assert_groups_identical<T>(
original: &[Group<T>],
read: impl IntoIterator<Item = Result<Group<T>, anyhow::Error>>,
original: &[Vec<T>],
read: impl IntoIterator<Item = Result<Vec<T>, anyhow::Error>>,
skip: usize,
) where
Vec<T>: PartialEq,
Expand Down
3 changes: 1 addition & 2 deletions crates/chain-config/src/config/state/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub mod encode;

#[cfg(test)]
mod tests {
use crate::Group;
use bytes::Bytes;
use itertools::Itertools;
use parquet::{
Expand Down Expand Up @@ -128,7 +127,7 @@ mod tests {
let mut decoder = Decoder::new(bytes).unwrap();

// when
let _: Group<_> = decoder.nth(1).unwrap().unwrap();
let _: Vec<_> = decoder.nth(1).unwrap().unwrap();

// then
let actually_read = bytes_read.load(std::sync::atomic::Ordering::SeqCst);
Expand Down
24 changes: 7 additions & 17 deletions crates/chain-config/src/config/state/parquet/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,20 @@ use parquet::{
record::RowAccessor,
};

use crate::config::state::{
Group,
GroupResult,
};

pub struct Decoder<R: ChunkReader> {
data_source: SerializedFileReader<R>,
group_index: usize,
}

pub trait Decode<T> {
fn decode(bytes: &[u8]) -> anyhow::Result<T>
where
Self: Sized;
}

impl<R> Decoder<R>
where
R: ChunkReader + 'static,
{
fn current_group(&self) -> anyhow::Result<Group<Vec<u8>>> {
pub fn num_groups(&self) -> usize {
self.data_source.num_row_groups()
}

fn current_group(&self) -> anyhow::Result<Vec<Vec<u8>>> {
let data = self
.data_source
.get_row_group(self.group_index)?
Expand All @@ -51,18 +44,15 @@ where
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Group {
index: self.group_index,
data,
})
Ok(data)
}
}

impl<R> Iterator for Decoder<R>
where
R: ChunkReader + 'static,
{
type Item = GroupResult<Vec<u8>>;
type Item = anyhow::Result<Vec<Vec<u8>>>;

fn next(&mut self) -> Option<Self::Item> {
if self.group_index >= self.data_source.metadata().num_row_groups() {
Expand Down
Loading

0 comments on commit 030c67f

Please sign in to comment.