Skip to content

Commit

Permalink
feat(code/app/starknet): Adapt Starknet app to latest P2P protos (#819)
Browse files Browse the repository at this point in the history
* Update Starknet protos to their latest version

* Adapt Starknet app to new protos

* Use height and round as stream id in example app

* Use `ConsensusStreamId` proto message for stream id in Starknet app

* Fix formatting of stream ids

* Update protos for interop

* Add `ProposalCommitment` type

* Add `BlockInfo` type

* Update proposal building and assembly from parts to account for BlockInfo and ProposalCommitment

* Update protos

* Adapt to latest proto changes

* Post-merge fixes

* Disable integration tests with proposal mode, keep parts only

* Set parts-only in spwan.bash

* Add detail to panic when explicit proposal

* Fix fmt

---------

Co-authored-by: Anca Zamfir <[email protected]>
  • Loading branch information
romac and ancazamfir authored Feb 10, 2025
1 parent 614a46c commit fec4b6e
Show file tree
Hide file tree
Showing 57 changed files with 1,089 additions and 867 deletions.
33 changes: 29 additions & 4 deletions code/crates/engine/src/util/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,31 @@
pub type StreamId = u64;
use core::fmt;

use bytes::Bytes;

pub type Sequence = u64;

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct StreamId(Bytes);

impl StreamId {
pub fn new(bytes: Bytes) -> Self {
Self(bytes)
}

pub fn to_bytes(&self) -> Bytes {
self.0.clone()
}
}

impl fmt::Display for StreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for byte in &self.0 {
write!(f, "{byte:02x}")?;
}
Ok(())
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StreamMessage<T> {
/// Receivers identify streams by (sender, stream_id).
Expand Down Expand Up @@ -38,8 +63,8 @@ pub enum StreamContent<T> {
/// Serialized content.
Data(T),

/// Fin must be set to true.
Fin(bool),
/// Indicates the end of the stream.
Fin,
}

impl<T> StreamContent<T> {
Expand All @@ -58,6 +83,6 @@ impl<T> StreamContent<T> {
}

pub fn is_fin(&self) -> bool {
matches!(self, Self::Fin(true))
matches!(self, Self::Fin)
}
}
56 changes: 34 additions & 22 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use malachitebft_engine::util::streaming::{StreamContent, StreamMessage};
use malachitebft_metrics::Metrics;
use malachitebft_sync::RawDecidedValue;

use crate::host::proposal::compute_proposal_signature;
use crate::host::state::HostState;
use crate::host::{Host as _, StarknetHost};
use crate::mempool::{MempoolMsg, MempoolRef};
Expand Down Expand Up @@ -51,10 +50,12 @@ impl Host {
std::fs::create_dir_all(&db_dir).map_err(|e| SpawnErr::StartupFailed(e.into()))?;
let db_path = db_dir.join("blocks.db");

let ctx = MockContext::new();

let (actor_ref, _) = Actor::spawn(
None,
Self::new(mempool, network, metrics, span),
HostState::new(host, db_path, &mut StdRng::from_entropy()),
HostState::new(ctx, host, db_path, &mut StdRng::from_entropy()),
)
.await?;

Expand Down Expand Up @@ -330,7 +331,7 @@ async fn on_get_value(

let (mut rx_part, rx_hash) = state.host.build_new_proposal(height, round, deadline).await;

let stream_id = state.next_stream_id();
let stream_id = state.stream_id();

let mut sequence = 0;

Expand All @@ -340,15 +341,19 @@ async fn on_get_value(
if state.host.params.value_payload.include_parts() {
debug!(%stream_id, %sequence, "Broadcasting proposal part");

let msg = StreamMessage::new(stream_id, sequence, StreamContent::Data(part.clone()));
let msg = StreamMessage::new(
stream_id.clone(),
sequence,
StreamContent::Data(part.clone()),
);
network.cast(NetworkMsg::PublishProposalPart(msg))?;
}

sequence += 1;
}

if state.host.params.value_payload.include_parts() {
let msg = StreamMessage::new(stream_id, sequence, StreamContent::Fin(true));
let msg = StreamMessage::new(stream_id, sequence, StreamContent::Fin);
network.cast(NetworkMsg::PublishProposalPart(msg))?;
}

Expand Down Expand Up @@ -382,14 +387,14 @@ async fn on_get_value(
}

async fn on_extend_vote(
state: &mut HostState,
height: Height,
round: Round,
_state: &mut HostState,
_height: Height,
_round: Round,
_value_id: ValueId<MockContext>,
reply_to: RpcReplyPort<Option<Bytes>>,
) -> Result<(), ActorProcessingErr> {
let extension = state.host.generate_vote_extension(height, round);
reply_to.send(extension)?;
// let extension = state.host.generate_vote_extension(height, round);
reply_to.send(None)?;
Ok(())
}

Expand Down Expand Up @@ -430,27 +435,30 @@ async fn on_restream_value(
network: &NetworkRef<MockContext>,
height: Height,
round: Round,
value_id: Hash,
proposal_commitment_hash: Hash,
valid_round: Round,
address: Address,
proposer: Address,
) -> Result<(), ActorProcessingErr> {
debug!(%height, %round, "Restreaming existing proposal...");

let mut rx_part = state.host.send_known_proposal(value_id).await;
let mut rx_part = state
.host
.send_known_proposal(proposal_commitment_hash)
.await;

let stream_id = state.next_stream_id();
let stream_id = state.stream_id();

let init = ProposalInit {
height,
proposal_round: round,
round,
valid_round,
proposer: address,
proposer,
};

let signature = compute_proposal_signature(&init, &value_id, &state.host.private_key);

let init_part = ProposalPart::Init(init);
let fin_part = ProposalPart::Fin(ProposalFin { signature });
let fin_part = ProposalPart::Fin(ProposalFin {
proposal_commitment_hash,
});

debug!(%height, %round, "Created new Init part: {init_part:?}");

Expand All @@ -459,16 +467,19 @@ async fn on_restream_value(
while let Some(part) = rx_part.recv().await {
let new_part = match part.part_type() {
PartType::Init => init_part.clone(),
PartType::BlockInfo => part,
PartType::Transactions => part,
PartType::ProposalCommitment => part,
PartType::Fin => fin_part.clone(),
PartType::Transactions | PartType::BlockProof => part,
};

state.host.part_store.store(height, round, new_part.clone());

if state.host.params.value_payload.include_parts() {
debug!(%stream_id, %sequence, "Broadcasting proposal part");

let msg = StreamMessage::new(stream_id, sequence, StreamContent::Data(new_part));
let msg =
StreamMessage::new(stream_id.clone(), sequence, StreamContent::Data(new_part));

network.cast(NetworkMsg::PublishProposalPart(msg))?;

Expand Down Expand Up @@ -544,7 +555,8 @@ async fn on_received_proposal_part(
from: PeerId,
reply_to: RpcReplyPort<ProposedValue<MockContext>>,
) -> Result<(), ActorProcessingErr> {
// TODO - use state.host.receive_proposal() and move some of the logic below there
// TODO: Use state.host.receive_proposal() and move some of the logic below there

let sequence = part.sequence;

let Some(parts) = state.part_streams_map.insert(from, part) else {
Expand Down
4 changes: 2 additions & 2 deletions code/crates/starknet/host/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use malachitebft_proto::Protobuf;
use crate::codec::{self, ProtobufCodec};
use crate::proto::{self as proto, Error as ProtoError};
use crate::types::MockContext;
use crate::types::{Block, BlockHash, Height, Transaction, Transactions};
use crate::types::{Block, BlockHash, Height, Transaction, TransactionBatch};

mod keys;
use keys::{HeightKey, UndecidedValueKey};
Expand Down Expand Up @@ -276,7 +276,7 @@ impl BlockStore {
block: Block {
height: certificate.height,
block_hash: certificate.value_id,
transactions: Transactions::new(txes.to_vec()),
transactions: TransactionBatch::new(txes.to_vec()),
},
certificate: certificate.clone(),
};
Expand Down
Loading

0 comments on commit fec4b6e

Please sign in to comment.