Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Libp2p] Finish generic DHT persistence #2507

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ jobs:
# Build in release without `testing` feature, this should work without `hotshot_example` config.
run: |
cargo build --locked --release --workspace
- name: Build sequencer-sqlite
run: cargo build --locked --release --manifest-path ./sequencer-sqlite/Cargo.toml --target-dir ./target

- name: Build sequencer-sqlite
run: cargo build --release --manifest-path ./sequencer-sqlite/Cargo.toml --target-dir ./target

- name: Build Espresso Dev Node
# Espresso Dev Node currently requires testing feature, so it is built separately.
Expand Down Expand Up @@ -104,8 +104,8 @@ jobs:
run: |
cargo build --locked --release --workspace

- name: Build sequencer-sqlite
run: cargo build --locked --release --manifest-path ./sequencer-sqlite/Cargo.toml --target-dir ./target
- name: Build sequencer-sqlite
run: cargo build --release --manifest-path ./sequencer-sqlite/Cargo.toml --target-dir ./target

- name: Build Espresso Dev Node
# Espresso Dev Node currently requires testing feature, so it is built separately.
Expand Down Expand Up @@ -500,7 +500,7 @@ jobs:
docker pull ${{ needs.build-dockers.outputs.marketplace-builder-tag }}
docker pull ${{ needs.build-dockers.outputs.node-validator-tag }}
docker pull ${{ needs.build-dockers.outputs.dev-rollup-tag }}

- name: Tag new docker images
run: |
docker tag ${{ needs.build-dockers.outputs.sequencer-tag }} ghcr.io/espressosystems/espresso-sequencer/sequencer:main
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/build_static.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ jobs:
- name: Compile all executables
# timeout-minutes: 120
run: |
nix develop $DEVSHELL --accept-flake-config --option sandbox relaxed -c cargo build --locked --release
-c cargo build --locked --release --manifest-path ./sequencer-sqlite/Cargo.toml --target-dir 'echo $CARGO_TARGET_DIR'
nix develop $DEVSHELL --accept-flake-config --option sandbox relaxed -c cargo build --locked --release
-c cargo build --release --manifest-path ./sequencer-sqlite/Cargo.toml --target-dir 'echo $CARGO_TARGET_DIR'

- name: Upload artifacts
uses: actions/upload-artifact@v4
Expand All @@ -95,7 +95,7 @@ jobs:
${{ env.CARGO_TARGET_DIR }}/${{ env.TARGET_TRIPLET }}/release/marketplace-solver
${{ env.CARGO_TARGET_DIR }}/${{ env.TARGET_TRIPLET }}/release/marketplace-builder
${{ env.CARGO_TARGET_DIR }}/${{ env.TARGET_TRIPLET }}/release/node-metrics
${{ env.CARGO_TARGET_DIR }}/${{ env.TARGET_TRIPLET }}/release/dev-rollup
${{ env.CARGO_TARGET_DIR }}/${{ env.TARGET_TRIPLET }}/release/dev-rollup
${{ env.CARGO_TARGET_DIR }}/${{ env.TARGET_TRIPLET }}/release/sequencer-sqlite
static-dockers:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -223,7 +223,7 @@ jobs:
with:
images: ghcr.io/espressosystems/espresso-sequencer/node-validator
flavor: suffix=musl

- name: Generate dev-rollup metadata
uses: docker/metadata-action@v5
id: dev-rollup
Expand Down Expand Up @@ -379,4 +379,4 @@ jobs:
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.dev-rollup.outputs.tags }}
labels: ${{ steps.dev-rollup.outputs.labels }}
labels: ${{ steps.dev-rollup.outputs.labels }}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions sequencer/api/migrations/postgres/V43__libp2p_dht.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE libp2p_dht (
id INTEGER PRIMARY KEY,
serialized_records BYTEA NOT NULL
);
4 changes: 4 additions & 0 deletions sequencer/api/migrations/sqlite/V204__libp2p_dht.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE libp2p_dht (
id INTEGER PRIMARY KEY,
serialized_records BLOB NOT NULL
);
3 changes: 1 addition & 2 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use proposal_fetcher::ProposalFetcherConfig;
use std::sync::Arc;
use tokio::select;
// Should move `STAKE_TABLE_CAPACITY` in the sequencer repo when we have variate stake table support
use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtNoPersistence;
use libp2p::Multiaddr;
use network::libp2p::split_off_peer_id;
use options::Identity;
Expand Down Expand Up @@ -496,7 +495,7 @@ pub async fn init_node<P: SequencerPersistence, V: Versions>(
let network = {
let p2p_network = Libp2pNetwork::from_config(
network_config.clone(),
DhtNoPersistence,
persistence.clone(),
Arc::new(async_lock::RwLock::new(membership.clone())),
gossip_config,
request_response_config,
Expand Down
65 changes: 65 additions & 0 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use espresso_types::{
v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
Leaf, Leaf2, NetworkConfig, Payload, SeqTypes,
};
use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
DhtPersistentStorage, SerializableRecord,
};
use hotshot_query_service::VidCommitment;
use hotshot_types::{
consensus::CommitmentMap,
Expand Down Expand Up @@ -174,6 +177,10 @@ impl Inner {
self.path.join("next_epoch_quorum_certificate")
}

fn libp2p_dht_path(&self) -> PathBuf {
self.path.join("libp2p_dht")
}

/// Overwrite a file if a condition is met.
///
/// The file at `path`, if it exists, is opened in read mode and passed to `pred`. If `pred`
Expand Down Expand Up @@ -905,6 +912,64 @@ impl SequencerPersistence for Persistence {
}
}

#[async_trait]
impl DhtPersistentStorage for Persistence {
/// Save the DHT to the file on disk
///
/// # Errors
/// - If we fail to serialize the records
/// - If we fail to write the serialized records to the file
async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
// Bincode-serialize the records
let to_save =
bincode::serialize(&records).with_context(|| "failed to serialize records")?;

// Get the path to save the file to
let path = self.inner.read().await.libp2p_dht_path();

// Create the directory if it doesn't exist
fs::create_dir_all(path.parent().with_context(|| "directory had no parent")?)
.with_context(|| "failed to create directory")?;

// Get a write lock on the inner struct
let mut inner = self.inner.write().await;

// Save the file, replacing the previous one if it exists
inner
.replace(
&path,
|_| {
// Always overwrite the previous file
Ok(true)
},
|mut file| {
file.write_all(&to_save)
.with_context(|| "failed to write records to file")?;
Ok(())
},
)
.with_context(|| "failed to save records to file")?;

Ok(())
}

/// Load the DHT from the file on disk
///
/// # Errors
/// - If we fail to read the file
/// - If we fail to deserialize the records
async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
// Read the contents of the file
let contents = std::fs::read(self.inner.read().await.libp2p_dht_path())
.with_context(|| "Failed to read records from file")?;

// Deserialize the contents
let records: Vec<SerializableRecord> =
bincode::deserialize(&contents).with_context(|| "Failed to deserialize records")?;

Ok(records)
}
}
/// Update a `NetworkConfig` that may have originally been persisted with an old version.
fn migrate_network_config(
mut network_config: serde_json::Value,
Expand Down
16 changes: 16 additions & 0 deletions sequencer/src/persistence/no_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use espresso_types::{
v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence},
Leaf, Leaf2, NetworkConfig,
};
use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
DhtPersistentStorage, SerializableRecord,
};
use hotshot_query_service::VidCommitment;
use hotshot_types::{
consensus::CommitmentMap,
Expand Down Expand Up @@ -198,3 +201,16 @@ impl SequencerPersistence for NoStorage {
Ok(None)
}
}

#[async_trait]
impl DhtPersistentStorage for NoStorage {
/// Don't do anything
async fn save(&self, _records: Vec<SerializableRecord>) -> anyhow::Result<()> {
Ok(())
}

/// Don't do anything
async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
Ok(vec![])
}
}
59 changes: 59 additions & 0 deletions sequencer/src/persistence/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use espresso_types::{
BackoffParams, BlockMerkleTree, FeeMerkleTree, Leaf, Leaf2, NetworkConfig, Payload,
};
use futures::stream::StreamExt;
use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::{
DhtPersistentStorage, SerializableRecord,
};
use hotshot_query_service::{
availability::LeafQueryData,
data_source::{
Expand Down Expand Up @@ -1389,6 +1392,62 @@ impl SequencerPersistence for Persistence {
}
}

#[async_trait]
impl DhtPersistentStorage for Persistence {
/// Save the DHT to the database
///
/// # Errors
/// - If we fail to serialize the records
/// - If we fail to write the serialized records to the DB
async fn save(&self, records: Vec<SerializableRecord>) -> anyhow::Result<()> {
// Bincode-serialize the records
let to_save =
bincode::serialize(&records).with_context(|| "failed to serialize records")?;

// Prepare the statement
let stmt = "INSERT INTO libp2p_dht (id, serialized_records) VALUES (0, $1) ON CONFLICT (id) DO UPDATE SET serialized_records = $1";

// Execute the query
let mut tx = self
.db
.write()
.await
.with_context(|| "failed to start an atomic DB transaction")?;
tx.execute(query(stmt).bind(to_save))
.await
.with_context(|| "failed to execute DB query")?;

// Commit the state
tx.commit().await.with_context(|| "failed to commit to DB")
}

/// Load the DHT from the database
///
/// # Errors
/// - If we fail to read from the DB
/// - If we fail to deserialize the records
async fn load(&self) -> anyhow::Result<Vec<SerializableRecord>> {
// Fetch the results from the DB
let result = self
.db
.read()
.await
.with_context(|| "failed to start a DB read transaction")?
.fetch_one("SELECT * FROM libp2p_dht where id = 0")
.await
.with_context(|| "failed to fetch from DB")?;

// Get the `serialized_records` row
let serialied_records: Vec<u8> = result.get("serialized_records");

// Deserialize it
let records: Vec<SerializableRecord> = bincode::deserialize(&serialied_records)
.with_context(|| "Failed to deserialize records")?;

Ok(records)
}
}

#[async_trait]
impl Provider<SeqTypes, VidCommonRequest> for Persistence {
#[tracing::instrument(skip(self))]
Expand Down
1 change: 1 addition & 0 deletions types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fluent-asserter = "0.1.9"
futures = { workspace = true }
hotshot = { workspace = true }
hotshot-contract-adapter = { workspace = true }
hotshot-libp2p-networking = { workspace = true }
hotshot-query-service = { workspace = true }
hotshot-types = { workspace = true }
itertools = { workspace = true }
Expand Down
5 changes: 4 additions & 1 deletion types/src/v0/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_trait::async_trait;
use committable::{Commitment, Committable};
use futures::{FutureExt, TryFutureExt};
use hotshot::{types::EventType, HotShotInitializer};
use hotshot_libp2p_networking::network::behaviours::dht::store::persistent::DhtPersistentStorage;
use hotshot_types::{
consensus::CommitmentMap,
data::{
Expand Down Expand Up @@ -421,7 +422,9 @@ pub trait PersistenceOptions: Clone + Send + Sync + 'static {
}

#[async_trait]
pub trait SequencerPersistence: Sized + Send + Sync + Clone + 'static {
pub trait SequencerPersistence:
Sized + Send + Sync + Clone + 'static + DhtPersistentStorage
{
/// Use this storage as a state catchup backend, if supported.
fn into_catchup_provider(
self,
Expand Down
Loading