Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Issues w/ Stream Crashing Over Large Replay #996

Open
wants to merge 43 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2c68dad
fix: stream crashing.
l-monninger Jan 13, 2025
aa6d928
fix: Cargo.lock
l-monninger Jan 13, 2025
be23e69
fix: setting up for mocks.
l-monninger Jan 13, 2025
5f9d045
fix: refactor light node.
l-monninger Jan 13, 2025
4f8f2a3
fix: refactor stream.
l-monninger Jan 13, 2025
ff9136a
fix: reworking stream api.
l-monninger Jan 13, 2025
480bee7
fix: types.
l-monninger Jan 14, 2025
7aa181b
fix: refactor celestia provider.
l-monninger Jan 14, 2025
b4dc706
fix: celestia provider.
l-monninger Jan 14, 2025
40293f9
fix: refactor passthrough.
l-monninger Jan 14, 2025
3d4ea57
feat: merge in digest da.
l-monninger Jan 15, 2025
881b7d4
feat: light node refactor.
l-monninger Jan 15, 2025
d32fdf7
feat: accept both sequenced and passed through blobs.
l-monninger Jan 15, 2025
71c0ecf
fix: runners.
l-monninger Jan 15, 2025
a3d808a
fix: update build scripts.
l-monninger Jan 15, 2025
b708ae8
fix: remove cachix.
l-monninger Jan 15, 2025
2628c73
feat: merge disk-fifo.
l-monninger Jan 15, 2025
c008e29
fix: reintegrate inknown signers verifier.
l-monninger Jan 15, 2025
1e288a3
fix: use http1.
l-monninger Jan 15, 2025
54e27fa
debug: additional certificate logging.
l-monninger Jan 15, 2025
09af380
fix: enhance logging.
l-monninger Jan 15, 2025
6b43c13
fix: enhance logging.
l-monninger Jan 15, 2025
81b761f
fix: containers workflow.
l-monninger Jan 15, 2025
28c91f8
fix: use pr labels contains.
l-monninger Jan 15, 2025
c6a8593
debug: log event.
l-monninger Jan 15, 2025
9ff4337
debug: log containers.
l-monninger Jan 15, 2025
3969f77
fix: label filter.
l-monninger Jan 15, 2025
00adf06
docs: http1 warning.
l-monninger Jan 15, 2025
09f5c5e
fix: update default to http2.
l-monninger Jan 15, 2025
0be8313
fix: http2 support first class; http1 deprecated.
l-monninger Jan 16, 2025
39b8755
feat: switch to webpki.
l-monninger Jan 16, 2025
5ea357b
fix: connection protocol not passed through.
l-monninger Jan 16, 2025
922db07
fix: connection protocol.
l-monninger Jan 16, 2025
1913e58
fix: connection protocol.
l-monninger Jan 16, 2025
69f4d9a
add heartbeat blob on grpc connection
musitdev Jan 17, 2025
acfd786
create v1beta2.proto and set heatbeat with a bool
musitdev Jan 20, 2025
5f6de96
add some logs to see heartbeats
musitdev Jan 21, 2025
94c8e06
use a timer to generate heartbreak
musitdev Jan 22, 2025
f8594da
move heartbeat in the grpc send side
musitdev Jan 22, 2025
9dd4c62
remove all unwrap in the blob grpc stream processing
musitdev Jan 22, 2025
4026816
remove unnecessary comments
musitdev Jan 22, 2025
057e22d
Add sync DB archive file split in sub chunks to allow to upload bigge…
musitdev Jan 28, 2025
6e00b77
set db sync optional to allow genesis boot
musitdev Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion protocol-units/da/movement/protocol/da/Cargo.toml
Original file line number Diff line number Diff line change
@@ -16,7 +16,9 @@ integration-tests = []
thiserror = { workspace = true }
movement-da-util = { workspace = true }
movement-da-light-node-proto = { workspace = true }
futures = { workspace = true}
futures = { workspace = true }
tokio-stream = { workspace = true }
async-stream = { workspace = true }

[lints]
workspace = true
2 changes: 2 additions & 0 deletions protocol-units/da/movement/protocol/da/src/fifo/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// A First-In-First-Out (FIFO) DA implementation.
pub struct Fifo {}
113 changes: 106 additions & 7 deletions protocol-units/da/movement/protocol/da/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use movement_da_light_node_proto::Blob;
pub mod fifo;

use movement_da_util::blob::ir::blob::IntermediateBlobRepresentation;
use std::error;
use std::future::Future;
use tokio_stream::{Stream, StreamExt};

pub type CertificateStream =
std::pin::Pin<Box<dyn Stream<Item = Result<Certificate, DaError>> + Send>>;
pub type IntermediateBlobRepresentationStream =
std::pin::Pin<Box<dyn Stream<Item = Result<IntermediateBlobRepresentation, DaError>> + Send>>;

/// A blob meant for the DA.
#[derive(Debug, Clone)]
@@ -24,6 +31,22 @@ impl DaBlob {
}
}

/// A height for a blob on the DA.
#[derive(Debug, Clone)]
pub struct DaHeight(u64);

impl DaHeight {
/// Creates a new [DaHeight] from a u64.
pub fn new(height: u64) -> Self {
Self(height)
}

/// Returns the inner u64.
pub fn as_u64(&self) -> u64 {
self.0
}
}

/// A certificate from consensus indicating a height.
#[derive(Debug, Clone)]
pub enum Certificate {
@@ -46,25 +69,32 @@ pub enum DaError {
Internal(String),
}

pub trait DaOperations {
pub trait DaOperations
where
Self: Send + Sync + 'static,
{
/// Submits a blob to the DA.
///
/// A DA must allow for submission of raw [DaBlob]s and return a [Blob].
fn submit_blob(&self, data: DaBlob) -> impl Future<Output = Result<Blob, DaError>>;
/// A DA must allow for submission of raw [DaBlob]s and return a [IntermediateBlobRepresentation].
fn submit_blob(
&self,
data: DaBlob,
) -> impl Future<Output = Result<IntermediateBlobRepresentation, DaError>>;

/// Gets the blobs at a given height.
///
/// A DA must allow for retrieval of [IntermediateBlobRepresentation]s at a given height.
fn get_ir_blobs_at_height(
&self,
height: u64,
) -> impl Future<Output = Result<Vec<IntermediateBlobRepresentation>, DaError>>;
) -> impl Future<Output = Result<Vec<IntermediateBlobRepresentation>, DaError>> + Send + Sync + 'static;

/// Gets the IR blobs at a given height as would be used by the stream.
fn get_ir_blobs_at_height_for_stream(
&self,
height: u64,
) -> impl Future<Output = Result<Vec<IntermediateBlobRepresentation>, DaError>> {
) -> impl Future<Output = Result<Vec<IntermediateBlobRepresentation>, DaError>> + Send + Sync + 'static
{
async move {
// get the blobs at a given height, if the error is NonFatal, return an empty vec
match self.get_ir_blobs_at_height(height).await {
@@ -78,5 +108,74 @@ pub trait DaOperations {
/// Streams certificates from the DA.
///
/// A DA must allow for streaming of [Certificate]s. This is used to inform [Blob] polling.
fn stream_certificates(&self) -> impl futures::Stream<Item = Result<Certificate, DaError>>;
fn stream_certificates(&self) -> impl Future<Output = Result<CertificateStream, DaError>>;

/// Streams [IntermediateBlobRepresentation]s from the between two heights.
///
/// A DA implements a standard API for streaming [IntermediateBlobRepresentation]s.
fn stream_ir_blobs_between_heights(
&self,
start_height: u64,
end_height: u64,
) -> impl Future<Output = Result<IntermediateBlobRepresentationStream, DaError>> {
async move {
let stream = async_stream::try_stream! {

for height in start_height..end_height {
let blobs = self.get_ir_blobs_at_height_for_stream(height).await?;
for blob in blobs {
yield blob;
}
}

};

Ok(Box::pin(stream) as IntermediateBlobRepresentationStream)
}
}

/// Streams ir blobs from a certain height.
///
/// A DA implements a standard API for streaming [IntermediateBlobRepresentation]s.
fn stream_ir_blobs_from_height(
&self,
start_height: u64,
) -> impl Future<Output = Result<IntermediateBlobRepresentationStream, DaError>> {
async move {
let stream = async_stream::try_stream! {

// record the last height
let mut last_height = start_height;

// listen to the certificate stream to find the next height
let mut certificate_stream = self.stream_certificates().await?;

// loop through the certificate stream
while let Some(certificate) = certificate_stream.next().await {
match certificate {
Ok(Certificate::Height(height)) => {
// if the certificate height is greater than the last height, stream the blobs between the last height and the certificate height
if height > last_height {
let blobs = self.stream_ir_blobs_between_heights(last_height, height).await?;
for blob in blobs {
yield Ok(blob);
}
last_height = height;
}

}
Ok(Certificate::Nolo) => {
// do nothing
}
Err(e) => {
yield Err(e);
}
}
}

};

Ok(stream)
}
}
}