Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Issues w/ Stream Crashing Over Large Replay #996

Open
wants to merge 43 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2c68dad
fix: stream crashing.
l-monninger Jan 13, 2025
aa6d928
fix: Cargo.lock
l-monninger Jan 13, 2025
be23e69
fix: setting up for mocks.
l-monninger Jan 13, 2025
5f9d045
fix: refactor light node.
l-monninger Jan 13, 2025
4f8f2a3
fix: refactor stream.
l-monninger Jan 13, 2025
ff9136a
fix: reworking stream api.
l-monninger Jan 13, 2025
480bee7
fix: types.
l-monninger Jan 14, 2025
7aa181b
fix: refactor celestia provider.
l-monninger Jan 14, 2025
b4dc706
fix: celestia provider.
l-monninger Jan 14, 2025
40293f9
fix: refactor passthrough.
l-monninger Jan 14, 2025
3d4ea57
feat: merge in digest da.
l-monninger Jan 15, 2025
881b7d4
feat: light node refactor.
l-monninger Jan 15, 2025
d32fdf7
feat: accept both sequenced and passed through blobs.
l-monninger Jan 15, 2025
71c0ecf
fix: runners.
l-monninger Jan 15, 2025
a3d808a
fix: update build scripts.
l-monninger Jan 15, 2025
b708ae8
fix: remove cachix.
l-monninger Jan 15, 2025
2628c73
feat: merge disk-fifo.
l-monninger Jan 15, 2025
c008e29
fix: reintegrate inknown signers verifier.
l-monninger Jan 15, 2025
1e288a3
fix: use http1.
l-monninger Jan 15, 2025
54e27fa
debug: additional certificate logging.
l-monninger Jan 15, 2025
09af380
fix: enhance logging.
l-monninger Jan 15, 2025
6b43c13
fix: enhance logging.
l-monninger Jan 15, 2025
81b761f
fix: containers workflow.
l-monninger Jan 15, 2025
28c91f8
fix: use pr labels contains.
l-monninger Jan 15, 2025
c6a8593
debug: log event.
l-monninger Jan 15, 2025
9ff4337
debug: log containers.
l-monninger Jan 15, 2025
3969f77
fix: label filter.
l-monninger Jan 15, 2025
00adf06
docs: http1 warning.
l-monninger Jan 15, 2025
09f5c5e
fix: update default to http2.
l-monninger Jan 15, 2025
0be8313
fix: http2 support first class; http1 deprecated.
l-monninger Jan 16, 2025
39b8755
feat: switch to webpki.
l-monninger Jan 16, 2025
5ea357b
fix: connection protocol not passed through.
l-monninger Jan 16, 2025
922db07
fix: connection protocol.
l-monninger Jan 16, 2025
1913e58
fix: connection protocol.
l-monninger Jan 16, 2025
69f4d9a
add heartbeat blob on grpc connection
musitdev Jan 17, 2025
acfd786
create v1beta2.proto and set heatbeat with a bool
musitdev Jan 20, 2025
5f6de96
add some logs to see heartbeats
musitdev Jan 21, 2025
94c8e06
use a timer to generate heartbreak
musitdev Jan 22, 2025
f8594da
move heartbeat in the grpc send side
musitdev Jan 22, 2025
9dd4c62
remove all unwrap in the blob grpc stream processing
musitdev Jan 22, 2025
4026816
remove unnecessary comments
musitdev Jan 22, 2025
057e22d
Add sync DB archive file split in sub chunks to allow to upload bigge…
musitdev Jan 28, 2025
6e00b77
set db sync optional to allow genesis boot
musitdev Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add heartbeat blob on grpc connection
musitdev committed Jan 17, 2025
commit 69f4d9a6f3aa148820de6e9d83b921e4da22f57a
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ impl StreamBlocks {
.await
.context("Failed to stream blocks from DA")?;

println!("Streaming blocks from DA");
tracing::info!("Streaming blocks from DA");

while let Some(block_res) = blocks_from_da.next().await {
let response = block_res.context("Failed to get block")?;
@@ -43,13 +43,17 @@ impl StreamBlocks {
blob_response::BlobType::PassedThroughBlob(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::HeartbeatBlob(_) => {
tracing::info!("Receive heartbeat blob");
continue;
}
_ => {
return Err(anyhow::anyhow!("Unknown blob type"));
}
};

// pretty print (with labels) the block_id, block_timestamp, and da_height
println!(
tracing::info!(
"Block ID: {}, Block Timestamp: {}, DA Height: {}",
hex::encode(block_id),
// unix date string from the block timestamp which is in microseconds
Original file line number Diff line number Diff line change
@@ -116,6 +116,10 @@ where
blob_response::BlobType::PassedThroughBlob(blob) => {
(blob.data, blob.timestamp, blob.blob_id, blob.height)
}
blob_response::BlobType::HeartbeatBlob(_) => {
// Do nothing.
return Ok(());
}
_ => anyhow::bail!("Invalid blob type"),
};

Original file line number Diff line number Diff line change
@@ -3,6 +3,15 @@ version: "3"
processes:

setup:
environment:
- "ETH_RPC_CONNECTION_PROTOCOL=http"
- "ETH_RPC_CONNECTION_HOSTNAME=0.0.0.0"
- "ETH_RPC_CONNECTION_PORT=8090"
- "ETH_WS_CONNECTION_PROTOCOL=ws"
- "ETH_WS_CONNECTION_HOSTNAME=0.0.0.0"
- "ETH_WS_CONNECTION_PORT=8090"
- "MAYBE_RUN_LOCAL=true"
- "MOVEMENT_DA_LIGHT_NODE_HTTP1=true"
command: |
export AWS_REGION=us-west-2
export MOVEMENT_SYNC="leader::follower-test-$MOVEMENT_SHARED_RANDOM_1<=>{default_signer_address_whitelist,maptos,maptos-storage,movement-da-db}/**"
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ message BlobResponse {
Blob passed_through_blob = 1;
Blob sequenced_blob_intent = 2;
Blob sequenced_blob_block = 3;
Blob heartbeat_blob = 4;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwards compatible change, but we are now at the stage where the versioning starts to matter. I think you should break this out into a v1beta2.proto.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also not make this a Blob type.

}
}

22 changes: 19 additions & 3 deletions protocol-units/da/movement/protocol/da/src/lib.rs
Original file line number Diff line number Diff line change
@@ -130,6 +130,22 @@ pub trait DaOperations: Send + Sync {

last_height = height;
}
// Already executed Height are use to send Heartbeat.
Ok(Certificate::Height(height)) => {
//old certificate, use to send Heartbeat block.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to stream blobs from the DA here. You can just yield something that will be interpreted as a heartbeat. That is, this should basically just be yield DaBlob::heartbeat() or similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I thought you want to yield blob to keep the same type of data in BlobType. I'll change to something more simple.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed an update with v1beta2.proto file and heartbeat with a bool.

let blob_stream = self
.stream_da_blobs_between_heights(height, height)
.await?;
tokio::pin!(blob_stream);

while let Some(blob_res) = blob_stream.next().await {
let (_, blob) = blob_res?;
// Ack use heigth zero to identify heart beat block.
// Should be changed to a type.
let heart_blob = (DaHeight(0u64), blob);
yield heart_blob;
}
}
Ok(Certificate::Nolo) => {
// Ignore Nolo
}
@@ -142,9 +158,9 @@ pub trait DaOperations: Send + Sync {
yield Err(e)?;
}
// If height is less than last height, ignore
_ => {
warn!("ignoring certificate");
}
// _ => {
// warn!("ignoring certificate");
// }
}
}
};
1 change: 1 addition & 0 deletions protocol-units/da/movement/protocol/da/src/mock/mod.rs
Original file line number Diff line number Diff line change
@@ -210,6 +210,7 @@ pub mod test {
vec![
Ok(1), // First certificate
Err("internal error: fatal error".to_string()), // Fatal error
Ok(2),
]
);

Original file line number Diff line number Diff line change
@@ -133,8 +133,14 @@ where

while let Some(blob) = blob_stream.next().await {
let (height, da_blob) = blob.map_err(|e| tonic::Status::internal(e.to_string()))?;
let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string()))?;
let blob = verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?;
let blob = if height.as_u64() == 0 {
//Heart beat blob
// No need to verify the data are removed.
da_blob.to_blob_heartbeat_response()
} else {
let verifed_blob = verifier.verify(da_blob, height.as_u64()).await.map_err(|e| tonic::Status::internal(e.to_string()))?;
verifed_blob.into_inner().to_blob_passed_through_read_response(height.as_u64()).map_err(|e| tonic::Status::internal(e.to_string()))?
};
let response = StreamReadFromHeightResponse {
blob: Some(blob)
};
Original file line number Diff line number Diff line change
@@ -279,6 +279,7 @@ where
let sequenced_block = match blob_type {
BlobType::PassedThroughBlob(blob) => BlobType::SequencedBlobBlock(blob),
BlobType::SequencedBlobBlock(blob) => BlobType::SequencedBlobBlock(blob),
BlobType::HeartbeatBlob(blob) => BlobType::HeartbeatBlob(blob),
_ => {
anyhow::bail!("Invalid blob type")
}
14 changes: 14 additions & 0 deletions protocol-units/da/movement/protocol/util/src/blob/ir/blob.rs
Original file line number Diff line number Diff line change
@@ -141,6 +141,20 @@ impl DaBlob {
Ok(BlobResponse { blob_type: Some(blob_response::BlobType::SequencedBlobBlock(blob)) })
}

/// Converts a [DaBlob] into a [BlobResponse] with the blob passed through.
pub fn to_blob_heartbeat_response(self) -> BlobResponse {
//for heartbeat blob the data are removed.
let blob = Blob {
data: vec![],
signature: self.signature().to_vec(),
timestamp: self.timestamp(),
signer: self.signer().to_vec(),
blob_id: self.id().to_vec(),
height: 0,
};
BlobResponse { blob_type: Some(blob_response::BlobType::HeartbeatBlob(blob)) }
}

/// Converts a [DaBlob] into a [BlobResponse] with the blob passed through.
pub fn to_blob_passed_through_read_response(
self,