From b3a725e4c37e8ff09c96f30d823c9861af104d26 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Thu, 19 Oct 2023 17:05:07 -0700 Subject: [PATCH 1/2] Hide PartyIdx from the code running a protocol session --- synedrion/src/cggmp21/protocols/threshold.rs | 2 +- synedrion/src/lib.rs | 2 +- synedrion/src/sessions/broadcast.rs | 2 +- synedrion/src/sessions/constructors.rs | 44 ++-- synedrion/src/sessions/error.rs | 9 +- synedrion/src/sessions/states.rs | 225 ++++++++++++------- synedrion/src/sessions/type_erased.rs | 5 +- synedrion/tests/sessions.rs | 81 ++++--- 8 files changed, 212 insertions(+), 158 deletions(-) diff --git a/synedrion/src/cggmp21/protocols/threshold.rs b/synedrion/src/cggmp21/protocols/threshold.rs index 0b7dafa2..09fb3cfa 100644 --- a/synedrion/src/cggmp21/protocols/threshold.rs +++ b/synedrion/src/cggmp21/protocols/threshold.rs @@ -165,8 +165,8 @@ mod tests { use rand_core::OsRng; use super::ThresholdKeyShare; + use crate::cggmp21::{PartyIdx, TestParams}; use crate::curve::Scalar; - use crate::{PartyIdx, TestParams}; #[test] fn threshold_key_share_centralized() { diff --git a/synedrion/src/lib.rs b/synedrion/src/lib.rs index 4da75236..3829dce5 100644 --- a/synedrion/src/lib.rs +++ b/synedrion/src/lib.rs @@ -37,7 +37,7 @@ pub use k256::ecdsa; pub use signature; pub use cggmp21::{ - KeyShare, KeyShareChange, PartyIdx, ProductionParams, ProtocolResult, SchemeParams, TestParams, + KeyShare, KeyShareChange, ProductionParams, ProtocolResult, SchemeParams, TestParams, ThresholdKeyShare, }; pub use curve::RecoverableSignature; diff --git a/synedrion/src/sessions/broadcast.rs b/synedrion/src/sessions/broadcast.rs index 080e847e..b1bbae52 100644 --- a/synedrion/src/sessions/broadcast.rs +++ b/synedrion/src/sessions/broadcast.rs @@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize}; use super::signed_message::{SignedMessage, VerifiedMessage}; use super::type_erased::{deserialize_message, serialize_message}; +use crate::cggmp21::PartyIdx; use crate::tools::collections::HoleVecAccum; -use crate::PartyIdx; #[derive(Clone)] pub(crate) struct BroadcastConsensus { diff --git a/synedrion/src/sessions/constructors.rs b/synedrion/src/sessions/constructors.rs index 564cfe20..dcff6e55 100644 --- a/synedrion/src/sessions/constructors.rs +++ b/synedrion/src/sessions/constructors.rs @@ -1,14 +1,18 @@ use alloc::format; +use core::fmt::Debug; use rand_core::CryptoRngCore; use serde::{Deserialize, Serialize}; -use signature::hazmat::{PrehashVerifier, RandomizedPrehashSigner}; +use signature::{ + hazmat::{PrehashVerifier, RandomizedPrehashSigner}, + Keypair, +}; use super::error::{Error, LocalError}; use super::states::Session; use crate::cggmp21::{ auxiliary, interactive_signing, keygen_and_aux, InteractiveSigningResult, KeyRefreshResult, - KeyShare, KeygenAndAuxResult, PartyIdx, SchemeParams, + KeyShare, KeygenAndAuxResult, SchemeParams, }; use crate::curve::Scalar; @@ -22,22 +26,17 @@ pub fn make_keygen_and_aux_session( shared_randomness: &[u8], signer: Signer, verifiers: &[Verifier], - party_idx: PartyIdx, -) -> Result, Sig, Signer, Verifier>, Error>> +) -> Result< + Session, Sig, Signer, Verifier>, + Error, Verifier>, +> where Sig: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq, P: SchemeParams + 'static, - Signer: RandomizedPrehashSigner, - Verifier: PrehashVerifier + Clone, + Signer: RandomizedPrehashSigner + Keypair, + Verifier: PrehashVerifier + Debug + Clone + Ord, { - Session::new::>( - rng, - shared_randomness, - signer, - party_idx, - verifiers, - (), - ) + Session::new::>(rng, shared_randomness, signer, verifiers, ()) } /// Creates the initial state for the KeyRefresh+Auxiliary protocol. @@ -47,15 +46,14 @@ pub fn make_key_refresh_session( shared_randomness: &[u8], signer: Signer, verifiers: &[Verifier], - party_idx: PartyIdx, -) -> Result, Sig, Signer, Verifier>, Error>> +) -> Result, Sig, Signer, Verifier>, Error, Verifier>> where Sig: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq, P: SchemeParams + 'static, - Signer: RandomizedPrehashSigner, - Verifier: PrehashVerifier + Clone, + Signer: RandomizedPrehashSigner + Keypair, + Verifier: PrehashVerifier + Debug + Clone + Ord, { - Session::new::>(rng, shared_randomness, signer, party_idx, verifiers, ()) + Session::new::>(rng, shared_randomness, signer, verifiers, ()) } /// Creates the initial state for the joined Presigning and Signing protocols. @@ -69,14 +67,15 @@ pub fn make_interactive_signing_session( prehashed_message: &PrehashedMessage, ) -> Result< Session, Sig, Signer, Verifier>, - Error>, + Error, Verifier>, > where Sig: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq, P: SchemeParams + 'static, - Signer: RandomizedPrehashSigner, - Verifier: PrehashVerifier + Clone, + Signer: RandomizedPrehashSigner + Keypair, + Verifier: PrehashVerifier + Debug + Clone + Ord, { + // TODO: check that key share party index corresponds to the signer's position among the verifiers if verifiers.len() != key_share.num_parties() { return Err(Error::Local(LocalError::Init(format!( concat![ @@ -99,7 +98,6 @@ where rng, shared_randomness, signer, - key_share.party_index(), verifiers, context, ) diff --git a/synedrion/src/sessions/error.rs b/synedrion/src/sessions/error.rs index d797bd5c..02d813a5 100644 --- a/synedrion/src/sessions/error.rs +++ b/synedrion/src/sessions/error.rs @@ -2,18 +2,18 @@ use alloc::string::String; use super::broadcast::ConsensusError; use super::type_erased::{AccumAddError, AccumFinalizeError}; -use crate::cggmp21::{PartyIdx, ProtocolResult}; +use crate::cggmp21::ProtocolResult; /// Possible errors returned by session methods. #[derive(Clone, Debug)] -pub enum Error { +pub enum Error { /// Indicates an error on this party's side. /// Can be caused by an incorrect usage, a bug in the implementation, or some environment error. Local(LocalError), /// An unprovable fault of another party. Remote { /// The index of the failed party. - party: PartyIdx, + party: Verifier, /// The error that occurred. error: RemoteError, }, @@ -21,7 +21,7 @@ pub enum Error { // TODO: attach the party's messages up to this round for this to be verifiable by a third party Provable { /// The index of the failed party. - party: PartyIdx, + party: Verifier, /// The error that occurred. error: ProvableError, }, @@ -56,6 +56,7 @@ pub enum LocalError { CannotSign(String), AccumFinalize(AccumFinalizeError), AccumAdd(AccumAddError), + VerifierNotFound(String), } #[derive(Clone, Debug)] diff --git a/synedrion/src/sessions/states.rs b/synedrion/src/sessions/states.rs index c75ce1e9..8a989e66 100644 --- a/synedrion/src/sessions/states.rs +++ b/synedrion/src/sessions/states.rs @@ -1,11 +1,16 @@ use alloc::boxed::Box; +use alloc::collections::BTreeMap; use alloc::format; use alloc::vec::Vec; +use core::fmt::Debug; use core::marker::PhantomData; use rand_core::CryptoRngCore; use serde::{Deserialize, Serialize}; -use signature::hazmat::{PrehashVerifier, RandomizedPrehashSigner}; +use signature::{ + hazmat::{PrehashVerifier, RandomizedPrehashSigner}, + Keypair, +}; use super::broadcast::{BcConsensusAccum, BroadcastConsensus}; use super::error::{Error, LocalError, ProvableError, RemoteError}; @@ -13,15 +18,15 @@ use super::signed_message::{MessageType, SessionId, SignedMessage, VerifiedMessa use super::type_erased::{ self, DynBcPayload, DynDmArtefact, DynDmPayload, DynFinalizable, DynRoundAccum, ReceiveError, }; -use crate::cggmp21::{self, FirstRound, ProtocolResult, Round}; +use crate::cggmp21::{self, FirstRound, PartyIdx, ProtocolResult, Round}; use crate::tools::collections::HoleRange; -use crate::PartyIdx; struct Context { signer: Signer, verifiers: Vec, session_id: SessionId, party_idx: PartyIdx, + verifier_to_idx: BTreeMap, } enum SessionType { @@ -91,21 +96,21 @@ fn route_message_bc( MessageFor::OutOfOrder } -fn wrap_receive_result( - from: PartyIdx, +fn wrap_receive_result( + from: &Verifier, result: Result>, -) -> Result> { +) -> Result> { // TODO: we need to attach all the necessary messages here, // to make sure that every provable error can be independently verified // given the party's verifying key. result.map_err(|err| match err { ReceiveError::CannotDeserialize(msg) => Error::Provable { - party: from, + party: from.clone(), error: ProvableError::CannotDeserialize(msg), }, ReceiveError::Protocol(err) => match err { crate::cggmp21::ReceiveError::Provable(err) => Error::Provable { - party: from, + party: from.clone(), error: ProvableError::Protocol(err), }, crate::cggmp21::ReceiveError::InvalidType => { @@ -120,28 +125,39 @@ pub enum FinalizeOutcome { /// The protocol result is available. Success(Res::Success), /// Starting the next round. - AnotherRound( - Session, - Vec<(PartyIdx, SignedMessage)>, - ), + AnotherRound { + /// The new session object. + session: Session, + /// The messages for the new round received during the previous round. + cached_messages: Vec<(Verifier, SignedMessage)>, + }, } impl Session where Res: ProtocolResult, - Signer: RandomizedPrehashSigner, - Verifier: Clone + PrehashVerifier, + Signer: RandomizedPrehashSigner + Keypair, + Verifier: Debug + Clone + PrehashVerifier + Ord, Sig: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq, { pub(crate) fn new + Round + 'static>( rng: &mut impl CryptoRngCore, shared_randomness: &[u8], - // TODO: merge signers and verifiers into one struct to make getting party_idx more natural? signer: Signer, - party_idx: PartyIdx, verifiers: &[Verifier], context: R::Context, - ) -> Result> { + ) -> Result> { + let verifier_to_idx = verifiers + .iter() + .enumerate() + .map(|(idx, verifier)| (verifier.clone(), PartyIdx::from_usize(idx))) + .collect::>(); + let party_idx = *verifier_to_idx + .get(&signer.verifying_key()) + .ok_or(Error::Local(LocalError::Init( + "The given signer's verifying key is not among the verifiers".into(), + )))?; + // CHECK: is this enough? Do we need to hash in e.g. the verifier public keys? // TODO: Need to specify the requirements for the shared randomness in the docstring. let session_id = SessionId::from_seed(shared_randomness); @@ -153,6 +169,7 @@ where verifiers: verifiers.into(), session_id, party_idx, + verifier_to_idx, }; Ok(Self { tp: SessionType::Normal(round), @@ -160,6 +177,11 @@ where }) } + /// This session's verifier object. + pub fn verifier(&self) -> Verifier { + self.context.signer.verifying_key() + } + /// Returns a pair of the current round index and whether it is a broadcast consensus stage. pub fn current_round(&self) -> (u8, bool) { match &self.tp { @@ -188,16 +210,25 @@ where /// Returns the party indices to which the broadcast of this round should be sent; /// if `None`, there is no broadcast in this round. - pub fn broadcast_destinations(&self) -> Option> { - let range = HoleRange::new( - self.context.verifiers.len(), - self.context.party_idx.as_usize(), - ); + pub fn broadcast_destinations(&self) -> Option> { match &self.tp { - SessionType::Normal(round) => round - .broadcast_destinations() - .map(|range| range.map(PartyIdx::from_usize).collect()), - SessionType::Bc { .. } => Some(range.map(PartyIdx::from_usize).collect()), + SessionType::Normal(round) => round.broadcast_destinations().map(|range| { + range + .map(|idx| self.context.verifiers[idx].clone()) + .collect() + }), + SessionType::Bc { .. } => { + // TODO: technically we should remember the range to which the initial broadcasts + // were sent to and use that. + let range = HoleRange::new( + self.context.verifiers.len(), + self.context.party_idx.as_usize(), + ); + let verifiers = range + .map(|idx| self.context.verifiers[idx].clone()) + .collect(); + Some(verifiers) + } } } @@ -236,11 +267,13 @@ where /// Returns the party indices to which the direct messages of this round should be sent; /// if `None`, there are no direct messages in this round. - pub fn direct_message_destinations(&self) -> Option> { + pub fn direct_message_destinations(&self) -> Option> { match &self.tp { - SessionType::Normal(round) => round - .direct_message_destinations() - .map(|range| range.map(PartyIdx::from_usize).collect()), + SessionType::Normal(round) => round.direct_message_destinations().map(|range| { + range + .map(|idx| self.context.verifiers[idx].clone()) + .collect() + }), _ => None, } } @@ -250,12 +283,17 @@ where pub fn make_direct_message( &self, rng: &mut impl CryptoRngCore, - destination: &PartyIdx, + destination: &Verifier, ) -> Result<(SignedMessage, Artefact), LocalError> { match &self.tp { SessionType::Normal(round) => { + let destination_idx = *self + .context + .verifier_to_idx + .get(destination) + .ok_or(LocalError::VerifierNotFound(format!("{:?}", destination)))?; let round_num = round.round_num(); - let (payload, artefact) = round.make_direct_message(rng, *destination)?; + let (payload, artefact) = round.make_direct_message(rng, destination_idx)?; let message = VerifiedMessage::new( rng, &self.context.signer, @@ -268,7 +306,7 @@ where Ok(( message, Artefact { - destination: *destination, + destination: destination_idx, artefact, }, )) @@ -282,13 +320,13 @@ where /// Process a received message from another party. pub fn verify_message( &self, - from: PartyIdx, + from: &Verifier, message: SignedMessage, - ) -> Result, Error> { + ) -> Result, Error> { // This is an unprovable fault (may be a replay attack) if message.session_id() != &self.context.session_id { return Err(Error::Remote { - party: from, + party: from.clone(), error: RemoteError::UnexpectedSessionId, }); } @@ -302,12 +340,12 @@ where MessageFor::ThisRound => self.verify_message_inner(from, message), // TODO: should we cache the verified or the unverified message? MessageFor::NextRound => Ok(ProcessedMessage(ProcessedMessageEnum::Cache { - from, + from: self.context.verifier_to_idx[from], message, })), // This is an unprovable fault (may be a replay attack) MessageFor::OutOfOrder => Err(Error::Remote { - party: from, + party: from.clone(), error: RemoteError::OutOfOrderMessage, }), } @@ -315,33 +353,36 @@ where fn verify_message_inner( &self, - from: PartyIdx, + from: &Verifier, message: SignedMessage, - ) -> Result, Error> { - let verified_message = message - .verify(&self.context.verifiers[from.as_usize()]) - .map_err(|err| Error::Remote { - party: from, - error: RemoteError::InvalidSignature(err), - })?; + ) -> Result, Error> { + let verified_message = message.verify(from).map_err(|err| Error::Remote { + party: from.clone(), + error: RemoteError::InvalidSignature(err), + })?; + + let from_idx = *self.context.verifier_to_idx.get(from).ok_or(Error::Local( + LocalError::VerifierNotFound(format!("{:?}", from)), + ))?; match &self.tp { SessionType::Normal(round) => { match verified_message.message_type() { MessageType::Direct => { - let result = round.verify_direct_message(from, verified_message.payload()); + let result = + round.verify_direct_message(from_idx, verified_message.payload()); let payload = wrap_receive_result(from, result)?; Ok(ProcessedMessage(ProcessedMessageEnum::DmPayload { - from, + from: from_idx, payload, message: verified_message, })) } MessageType::Broadcast => { - let result = round.verify_broadcast(from, verified_message.payload()); + let result = round.verify_broadcast(from_idx, verified_message.payload()); let payload = wrap_receive_result(from, result)?; Ok(ProcessedMessage(ProcessedMessageEnum::BcPayload { - from, + from: from_idx, payload, message: verified_message, })) @@ -357,12 +398,14 @@ where } } SessionType::Bc { bc, .. } => { - bc.verify_broadcast(from, verified_message) + bc.verify_broadcast(from_idx, verified_message) .map_err(|err| Error::Provable { - party: from, + party: from.clone(), error: ProvableError::Consensus(err), })?; - Ok(ProcessedMessage(ProcessedMessageEnum::Bc { from })) + Ok(ProcessedMessage(ProcessedMessageEnum::Bc { + from: from_idx, + })) } } } @@ -372,7 +415,7 @@ where self, rng: &mut impl CryptoRngCore, accum: RoundAccumulator, - ) -> Result, Error> { + ) -> Result, Error> { match self.tp { SessionType::Normal(round) => { Self::finalize_regular_round(self.context, round, rng, accum) @@ -388,17 +431,20 @@ where round: Box>, rng: &mut impl CryptoRngCore, accum: RoundAccumulator, - ) -> Result, Error> { + ) -> Result, Error> { let requires_bc = round.requires_broadcast_consensus(); let outcome = round .finalize(rng, accum.processed) .map_err(|err| match err { type_erased::FinalizeError::Protocol(err) => match err { - cggmp21::FinalizeError::Provable { party, error } => Error::Provable { - party, - error: ProvableError::Protocol(error), - }, + cggmp21::FinalizeError::Provable { party, error } => { + let party = context.verifiers[party.as_usize()].clone(); + Error::Provable { + party, + error: ProvableError::Protocol(error), + } + } cggmp21::FinalizeError::Init(err) => { Error::Local(LocalError::Init(format!("{:?}", err))) } @@ -409,29 +455,35 @@ where } })?; + let cached_messages = accum + .cached_messages + .into_iter() + .map(|(idx, message)| (context.verifiers[idx.as_usize()].clone(), message)) + .collect(); + match outcome { type_erased::FinalizeOutcome::Success(res) => Ok(FinalizeOutcome::Success(res)), type_erased::FinalizeOutcome::AnotherRound(next_round) => { if requires_bc { let broadcasts = accum.received_broadcasts; let bc = BroadcastConsensus::new(broadcasts); - let new_session = Session { + let session = Session { tp: SessionType::Bc { next_round, bc }, context, }; - Ok(FinalizeOutcome::AnotherRound( - new_session, - accum.cached_messages, - )) + Ok(FinalizeOutcome::AnotherRound { + session, + cached_messages, + }) } else { - let new_session = Session { + let session = Session { tp: SessionType::Normal(next_round), context, }; - Ok(FinalizeOutcome::AnotherRound( - new_session, - accum.cached_messages, - )) + Ok(FinalizeOutcome::AnotherRound { + session, + cached_messages, + }) } } } @@ -441,21 +493,29 @@ where context: Context, round: Box>, accum: RoundAccumulator, - ) -> Result, Error> { + ) -> Result, Error> { accum .bc_accum .finalize() .ok_or(Error::Local(LocalError::InvalidState( "Cannot finalize".into(), )))?; - let new_session = Session { + + let cached_messages = accum + .cached_messages + .into_iter() + .map(|(idx, message)| (context.verifiers[idx.as_usize()].clone(), message)) + .collect(); + + let session = Session { tp: SessionType::Normal(round), context, }; - Ok(FinalizeOutcome::AnotherRound( - new_session, - accum.cached_messages, - )) + + Ok(FinalizeOutcome::AnotherRound { + session, + cached_messages, + }) } } @@ -482,14 +542,14 @@ impl RoundAccumulator { } /// Save an artefact produced by [`Session::make_direct_message`]. - pub fn add_artefact(&mut self, artefact: Artefact) -> Result<(), Error> { + pub fn add_artefact(&mut self, artefact: Artefact) -> Result<(), LocalError> { self.processed .add_dm_artefact(artefact.destination, artefact.artefact) - .map_err(|err| Error::Local(LocalError::AccumAdd(err))) + .map_err(LocalError::AccumAdd) } /// Save a processed message produced by [`Session::verify_message`]. - pub fn add_processed_message(&mut self, pm: ProcessedMessage) -> Result<(), Error> { + pub fn add_processed_message(&mut self, pm: ProcessedMessage) -> Result<(), LocalError> { // TODO: add a check that the index is in range, and wasn't filled yet match pm.0 { ProcessedMessageEnum::BcPayload { @@ -499,7 +559,7 @@ impl RoundAccumulator { } => { self.processed .add_bc_payload(from, payload) - .map_err(|err| Error::Local(LocalError::AccumAdd(err)))?; + .map_err(LocalError::AccumAdd)?; self.received_broadcasts.push((from, message)); } ProcessedMessageEnum::DmPayload { @@ -509,7 +569,7 @@ impl RoundAccumulator { } => { self.processed .add_dm_payload(from, payload) - .map_err(|err| Error::Local(LocalError::AccumAdd(err)))?; + .map_err(LocalError::AccumAdd)?; self.received_direct_messages.push((from, message)); } ProcessedMessageEnum::Cache { from, message } => { @@ -518,10 +578,7 @@ impl RoundAccumulator { self.cached_messages.push((from, message)); } ProcessedMessageEnum::Bc { from } => { - self.bc_accum.add_echo_received(from).ok_or(Error::Remote { - party: from, - error: RemoteError::DuplicateMessage, - })? + self.bc_accum.add_echo_received(from).unwrap(); } } Ok(()) diff --git a/synedrion/src/sessions/type_erased.rs b/synedrion/src/sessions/type_erased.rs index 10f889b1..a78fa1fd 100644 --- a/synedrion/src/sessions/type_erased.rs +++ b/synedrion/src/sessions/type_erased.rs @@ -13,11 +13,10 @@ use serde::{Deserialize, Serialize}; use super::error::LocalError; use crate::cggmp21::{ - self, BroadcastRound, DirectRound, FinalizableToNextRound, FinalizableToResult, ProtocolResult, - Round, ToNextRound, ToResult, + self, BroadcastRound, DirectRound, FinalizableToNextRound, FinalizableToResult, PartyIdx, + ProtocolResult, Round, ToNextRound, ToResult, }; use crate::tools::collections::{HoleRange, HoleVec, HoleVecAccum}; -use crate::PartyIdx; pub(crate) fn serialize_message( message: &impl Serialize, diff --git a/synedrion/tests/sessions.rs b/synedrion/tests/sessions.rs index 3bbe6eb4..3237327f 100644 --- a/synedrion/tests/sessions.rs +++ b/synedrion/tests/sessions.rs @@ -11,29 +11,26 @@ use synedrion::{ make_interactive_signing_session, make_keygen_and_aux_session, FinalizeOutcome, Session, SignedMessage, }, - KeyShare, PartyIdx, ProtocolResult, TestParams, + KeyShare, ProtocolResult, TestParams, }; -type MessageOut = (PartyIdx, PartyIdx, SignedMessage); -type MessageIn = (PartyIdx, SignedMessage); +type MessageOut = (VerifyingKey, VerifyingKey, SignedMessage); +type MessageIn = (VerifyingKey, SignedMessage); async fn run_session( tx: mpsc::Sender, rx: mpsc::Receiver, session: Session, - party_idx: PartyIdx, ) -> ::Success { let mut rx = rx; let mut session = session; - let mut cached_messages = Vec::new(); + let mut cached_messages = Vec::<(VerifyingKey, SignedMessage)>::new(); + + let key = session.verifier(); loop { - println!( - "*** {:?}: starting round {:?}", - party_idx, - session.current_round() - ); + println!("*** {key:?}: starting round {:?}", session.current_round()); // This is kept in the main task since it's mutable, // and we don't want to bother with synchronization. @@ -47,59 +44,62 @@ async fn run_session( if let Some(destinations) = destinations { // In production usage, this will happen in a spawned task let message = session.make_broadcast(&mut OsRng).unwrap(); - for idx_to in destinations.iter() { - println!("{party_idx:?}: sending a broadcast to {idx_to:?}"); - tx.send((party_idx, *idx_to, message.clone())) - .await - .unwrap(); + for destination in destinations.iter() { + println!("{key:?}: sending a broadcast to {destination:?}"); + tx.send((key, *destination, message.clone())).await.unwrap(); } } let destinations = session.direct_message_destinations(); if let Some(destinations) = destinations { - for idx_to in destinations.iter() { + for destination in destinations.iter() { // In production usage, this will happen in a spawned task // (since it can take some time to create a message), // and the artefact will be sent back to the host task // to be added to the accumulator. - let (message, artefact) = session.make_direct_message(&mut OsRng, idx_to).unwrap(); - println!("{party_idx:?}: sending a direct message to {idx_to:?}"); - tx.send((party_idx, *idx_to, message)).await.unwrap(); + let (message, artefact) = session + .make_direct_message(&mut OsRng, destination) + .unwrap(); + println!("{key:?}: sending a direct message to {destination:?}"); + tx.send((key, *destination, message)).await.unwrap(); // This will happen in a host task accum.add_artefact(artefact).unwrap(); } } - for (idx_from, message) in cached_messages { + for (from, message) in cached_messages { // In production usage, this will happen in a spawned task. - println!("{party_idx:?}: applying a cached message from {idx_from:?}"); - let result = session.verify_message(idx_from, message).unwrap(); + println!("{key:?}: applying a cached message from {from:?}"); + let result = session.verify_message(&from, message).unwrap(); // This will happen in a host task. accum.add_processed_message(result).unwrap(); } while !session.can_finalize(&accum) { - println!("{party_idx:?}: waiting for a message"); - let (idx_from, message) = rx.recv().await.unwrap(); + println!("{key:?}: waiting for a message"); + let (from, message) = rx.recv().await.unwrap(); // TODO: check here that the message from this origin hasn't been already processed // if accum.already_processed(message) { ... } // In production usage, this will happen in a spawned task. - println!("{party_idx:?}: applying a message from {idx_from:?}"); - let result = session.verify_message(idx_from, message).unwrap(); + println!("{key:?}: applying a message from {from:?}"); + let result = session.verify_message(&from, message).unwrap(); // This will happen in a host task. accum.add_processed_message(result).unwrap(); } - println!("{party_idx:?}: finalizing the round"); + println!("{key:?}: finalizing the round"); match session.finalize_round(&mut OsRng, accum).unwrap() { FinalizeOutcome::Success(res) => break res, - FinalizeOutcome::AnotherRound(new_session, new_cached_messages) => { + FinalizeOutcome::AnotherRound { + session: new_session, + cached_messages: new_cached_messages, + } => { session = new_session; cached_messages = new_cached_messages; } @@ -108,7 +108,7 @@ async fn run_session( } async fn message_dispatcher( - txs: BTreeMap>, + txs: BTreeMap>, rx: mpsc::Receiver, ) { let mut rx = rx; @@ -159,25 +159,26 @@ where ::Success: Send + 'static, { let num_parties = sessions.len(); - let parties = (0..num_parties) - .map(PartyIdx::from_usize) - .collect::>(); let (dispatcher_tx, dispatcher_rx) = mpsc::channel::(100); let channels = (0..num_parties).map(|_| mpsc::channel::(100)); let (txs, rxs): (Vec>, Vec>) = channels.unzip(); - let tx_map = parties.iter().cloned().zip(txs.into_iter()).collect(); - let rx_map = parties.iter().cloned().zip(rxs.into_iter()); + let tx_map = sessions + .iter() + .map(|session| session.verifier()) + .zip(txs.into_iter()) + .collect(); let dispatcher_task = message_dispatcher(tx_map, dispatcher_rx); let dispatcher = tokio::spawn(dispatcher_task); - let handles: Vec::Success>> = rx_map + let handles: Vec::Success>> = rxs + .into_iter() .zip(sessions.into_iter()) - .map(|((party_idx, rx), session)| { - let node_task = run_session(dispatcher_tx.clone(), rx, session, party_idx); + .map(|(rx, session)| { + let node_task = run_session(dispatcher_tx.clone(), rx, session); tokio::spawn(node_task) }) .collect(); @@ -204,14 +205,12 @@ async fn keygen_and_aux() { let sessions = signers .into_iter() - .enumerate() - .map(|(idx, signer)| { + .map(|signer| { make_keygen_and_aux_session::( &mut OsRng, shared_randomness, signer, &verifiers, - PartyIdx::from_usize(idx), ) .unwrap() }) @@ -220,7 +219,7 @@ async fn keygen_and_aux() { let key_shares = run_nodes(sessions).await; for (idx, key_share) in key_shares.iter().enumerate() { - assert_eq!(key_share.party_index(), PartyIdx::from_usize(idx)); + assert_eq!(key_share.party_index().as_usize(), idx); assert_eq!(key_share.num_parties(), num_parties); assert_eq!(key_share.verifying_key(), key_shares[0].verifying_key()); } From fb875008b920c4459374deca3f585dd976daba9f Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Sun, 22 Oct 2023 15:03:59 -0700 Subject: [PATCH 2/2] Restrict high-level errors --- synedrion/src/sessions/constructors.rs | 21 +- synedrion/src/sessions/error.rs | 43 +--- synedrion/src/sessions/signed_message.rs | 2 +- synedrion/src/sessions/states.rs | 289 ++++++++++++++--------- synedrion/src/sessions/type_erased.rs | 33 ++- synedrion/tests/sessions.rs | 6 +- 6 files changed, 222 insertions(+), 172 deletions(-) diff --git a/synedrion/src/sessions/constructors.rs b/synedrion/src/sessions/constructors.rs index dcff6e55..1ef7df15 100644 --- a/synedrion/src/sessions/constructors.rs +++ b/synedrion/src/sessions/constructors.rs @@ -8,7 +8,7 @@ use signature::{ Keypair, }; -use super::error::{Error, LocalError}; +use super::error::LocalError; use super::states::Session; use crate::cggmp21::{ auxiliary, interactive_signing, keygen_and_aux, InteractiveSigningResult, KeyRefreshResult, @@ -20,16 +20,12 @@ use crate::curve::Scalar; pub type PrehashedMessage = [u8; 32]; /// Creates the initial state for the joined KeyGen and KeyRefresh+Auxiliary protocols. -#[allow(clippy::type_complexity)] pub fn make_keygen_and_aux_session( rng: &mut impl CryptoRngCore, shared_randomness: &[u8], signer: Signer, verifiers: &[Verifier], -) -> Result< - Session, Sig, Signer, Verifier>, - Error, Verifier>, -> +) -> Result, Sig, Signer, Verifier>, LocalError> where Sig: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq, P: SchemeParams + 'static, @@ -40,13 +36,12 @@ where } /// Creates the initial state for the KeyRefresh+Auxiliary protocol. -#[allow(clippy::type_complexity)] pub fn make_key_refresh_session( rng: &mut impl CryptoRngCore, shared_randomness: &[u8], signer: Signer, verifiers: &[Verifier], -) -> Result, Sig, Signer, Verifier>, Error, Verifier>> +) -> Result, Sig, Signer, Verifier>, LocalError> where Sig: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq, P: SchemeParams + 'static, @@ -57,7 +52,6 @@ where } /// Creates the initial state for the joined Presigning and Signing protocols. -#[allow(clippy::type_complexity)] pub fn make_interactive_signing_session( rng: &mut impl CryptoRngCore, shared_randomness: &[u8], @@ -65,10 +59,7 @@ pub fn make_interactive_signing_session( verifiers: &[Verifier], key_share: &KeyShare

, prehashed_message: &PrehashedMessage, -) -> Result< - Session, Sig, Signer, Verifier>, - Error, Verifier>, -> +) -> Result, Sig, Signer, Verifier>, LocalError> where Sig: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq + Eq, P: SchemeParams + 'static, @@ -77,14 +68,14 @@ where { // TODO: check that key share party index corresponds to the signer's position among the verifiers if verifiers.len() != key_share.num_parties() { - return Err(Error::Local(LocalError::Init(format!( + return Err(LocalError(format!( concat![ "Number of verifiers (got: {}) must be equal ", "to the number of parties in the key share (got: {})" ], verifiers.len(), key_share.num_parties() - )))); + ))); } let scalar_message = Scalar::from_reduced_bytes(prehashed_message); diff --git a/synedrion/src/sessions/error.rs b/synedrion/src/sessions/error.rs index 02d813a5..c97e09ee 100644 --- a/synedrion/src/sessions/error.rs +++ b/synedrion/src/sessions/error.rs @@ -1,7 +1,6 @@ use alloc::string::String; use super::broadcast::ConsensusError; -use super::type_erased::{AccumAddError, AccumFinalizeError}; use crate::cggmp21::ProtocolResult; /// Possible errors returned by session methods. @@ -10,13 +9,6 @@ pub enum Error { /// Indicates an error on this party's side. /// Can be caused by an incorrect usage, a bug in the implementation, or some environment error. Local(LocalError), - /// An unprovable fault of another party. - Remote { - /// The index of the failed party. - party: Verifier, - /// The error that occurred. - error: RemoteError, - }, /// A provable fault of another party. // TODO: attach the party's messages up to this round for this to be verifiable by a third party Provable { @@ -33,34 +25,25 @@ pub enum Error { /// The proof of correctness. proof: Res::CorrectnessProof, }, + /// An error caused by remote party, unprovable at this level. + /// + /// This error may be eventually provable if there are some external guarantees + /// provided by the communication channel. + Remote(RemoteError), } #[derive(Clone, Debug)] -pub enum LocalError { - /// An error while initializing the first round of a protocol. - /// - /// Note that it can be returned in the middle of the session in case of - /// sequentially merged protocols (e.g. Presigning and Signing). - Init(String), - /// A mutable object was in an invalid state for calling a method. - /// - /// This indicates a logic error either in the calling code or in the method code. - InvalidState(String), - /// A message could not be serialized. - /// - /// Refer to the documentation of the chosen serialization library for more info. - CannotSerialize(String), - /// A message could not be signed. - /// - /// Refer to the documentation of the chosen ECDSA library for more info. - CannotSign(String), - AccumFinalize(AccumFinalizeError), - AccumAdd(AccumAddError), - VerifierNotFound(String), +pub struct LocalError(pub(crate) String); + +/// An unprovable fault of another party. +#[derive(Clone, Debug)] +pub struct RemoteError { + pub party: Verifier, + pub error: RemoteErrorEnum, } #[derive(Clone, Debug)] -pub enum RemoteError { +pub enum RemoteErrorEnum { UnexpectedSessionId, OutOfOrderMessage, DuplicateMessage, diff --git a/synedrion/src/sessions/signed_message.rs b/synedrion/src/sessions/signed_message.rs index 2b115783..7cb33a97 100644 --- a/synedrion/src/sessions/signed_message.rs +++ b/synedrion/src/sessions/signed_message.rs @@ -131,7 +131,7 @@ impl VerifiedMessage { rng, message_hash(session_id, round, message_type, message_bytes).as_ref(), ) - .map_err(|err| LocalError::CannotSign(err.to_string()))?; + .map_err(|err| LocalError(err.to_string()))?; Ok(Self(SignedMessage { session_id: *session_id, round, diff --git a/synedrion/src/sessions/states.rs b/synedrion/src/sessions/states.rs index 8a989e66..8b9b7ca0 100644 --- a/synedrion/src/sessions/states.rs +++ b/synedrion/src/sessions/states.rs @@ -3,7 +3,6 @@ use alloc::collections::BTreeMap; use alloc::format; use alloc::vec::Vec; use core::fmt::Debug; -use core::marker::PhantomData; use rand_core::CryptoRngCore; use serde::{Deserialize, Serialize}; @@ -13,10 +12,11 @@ use signature::{ }; use super::broadcast::{BcConsensusAccum, BroadcastConsensus}; -use super::error::{Error, LocalError, ProvableError, RemoteError}; +use super::error::{Error, LocalError, ProvableError, RemoteError, RemoteErrorEnum}; use super::signed_message::{MessageType, SessionId, SignedMessage, VerifiedMessage}; use super::type_erased::{ - self, DynBcPayload, DynDmArtefact, DynDmPayload, DynFinalizable, DynRoundAccum, ReceiveError, + self, AccumAddError, DynBcPayload, DynDmArtefact, DynDmPayload, DynFinalizable, DynRoundAccum, + ReceiveError, }; use crate::cggmp21::{self, FirstRound, PartyIdx, ProtocolResult, Round}; use crate::tools::collections::HoleRange; @@ -114,7 +114,7 @@ fn wrap_receive_result( error: ProvableError::Protocol(err), }, crate::cggmp21::ReceiveError::InvalidType => { - Error::Local(LocalError::InvalidState("Invalid state".into())) + Error::Local(LocalError("Invalid state".into())) } }, }) @@ -146,7 +146,7 @@ where signer: Signer, verifiers: &[Verifier], context: R::Context, - ) -> Result> { + ) -> Result { let verifier_to_idx = verifiers .iter() .enumerate() @@ -154,15 +154,15 @@ where .collect::>(); let party_idx = *verifier_to_idx .get(&signer.verifying_key()) - .ok_or(Error::Local(LocalError::Init( + .ok_or(LocalError( "The given signer's verifying key is not among the verifiers".into(), - )))?; + ))?; // CHECK: is this enough? Do we need to hash in e.g. the verifier public keys? // TODO: Need to specify the requirements for the shared randomness in the docstring. let session_id = SessionId::from_seed(shared_randomness); let typed_round = R::new(rng, shared_randomness, verifiers.len(), party_idx, context) - .map_err(|err| Error::Local(LocalError::Init(format!("{:?}", err))))?; + .map_err(|err| LocalError(format!("Failed to initialize the protocol: {err:?}")))?; let round: Box> = Box::new(typed_round); let context = Context { signer, @@ -191,20 +191,35 @@ where } /// Create an accumulator to store message creation and processing results of this round. - pub fn make_accumulator(&self) -> RoundAccumulator { + pub fn make_accumulator(&self) -> RoundAccumulator { RoundAccumulator::new( self.context.verifiers.len(), self.context.party_idx, self.broadcast_destinations().is_some(), self.direct_message_destinations().is_some(), + self.is_broadcast_consensus_round(), ) } /// Returns `true` if the round can be finalized. - pub fn can_finalize(&self, accum: &RoundAccumulator) -> bool { + pub fn can_finalize(&self, accum: &RoundAccumulator) -> Result { match &self.tp { - SessionType::Normal(_) => accum.processed.can_finalize(), - SessionType::Bc { .. } => accum.bc_accum.can_finalize(), + SessionType::Normal(_) => Ok(accum.processed.can_finalize()), + SessionType::Bc { .. } => Ok(accum + .bc_accum + .as_ref() + .ok_or(LocalError( + "This is a BC consensus round, but the accumulator is in an invalid state" + .into(), + ))? + .can_finalize()), + } + } + + fn is_broadcast_consensus_round(&self) -> bool { + match &self.tp { + SessionType::Normal(_) => false, + SessionType::Bc { .. } => true, } } @@ -284,14 +299,14 @@ where &self, rng: &mut impl CryptoRngCore, destination: &Verifier, - ) -> Result<(SignedMessage, Artefact), LocalError> { + ) -> Result<(SignedMessage, Artefact), LocalError> { match &self.tp { SessionType::Normal(round) => { let destination_idx = *self .context .verifier_to_idx .get(destination) - .ok_or(LocalError::VerifierNotFound(format!("{:?}", destination)))?; + .ok_or(LocalError(format!("Verifier not found: {destination:?}")))?; let round_num = round.round_num(); let (payload, artefact) = round.make_direct_message(rng, destination_idx)?; let message = VerifiedMessage::new( @@ -306,12 +321,13 @@ where Ok(( message, Artefact { - destination: destination_idx, + destination: destination.clone(), + destination_idx, artefact, }, )) } - _ => Err(LocalError::InvalidState( + _ => Err(LocalError( "This is a consensus broadcast round which does not send direct messages".into(), )), } @@ -322,13 +338,13 @@ where &self, from: &Verifier, message: SignedMessage, - ) -> Result, Error> { + ) -> Result, Error> { // This is an unprovable fault (may be a replay attack) if message.session_id() != &self.context.session_id { - return Err(Error::Remote { + return Err(Error::Remote(RemoteError { party: from.clone(), - error: RemoteError::UnexpectedSessionId, - }); + error: RemoteErrorEnum::UnexpectedSessionId, + })); } let message_for = match &self.tp { @@ -336,18 +352,21 @@ where SessionType::Bc { next_round, .. } => route_message_bc(next_round.as_ref(), &message), }; + let from_idx = self.context.verifier_to_idx[from]; + match message_for { MessageFor::ThisRound => self.verify_message_inner(from, message), // TODO: should we cache the verified or the unverified message? - MessageFor::NextRound => Ok(ProcessedMessage(ProcessedMessageEnum::Cache { - from: self.context.verifier_to_idx[from], - message, - })), + MessageFor::NextRound => Ok(ProcessedMessage { + from: from.clone(), + from_idx, + message: ProcessedMessageEnum::Cache { message }, + }), // This is an unprovable fault (may be a replay attack) - MessageFor::OutOfOrder => Err(Error::Remote { + MessageFor::OutOfOrder => Err(Error::Remote(RemoteError { party: from.clone(), - error: RemoteError::OutOfOrderMessage, - }), + error: RemoteErrorEnum::OutOfOrderMessage, + })), } } @@ -355,15 +374,21 @@ where &self, from: &Verifier, message: SignedMessage, - ) -> Result, Error> { - let verified_message = message.verify(from).map_err(|err| Error::Remote { - party: from.clone(), - error: RemoteError::InvalidSignature(err), + ) -> Result, Error> { + let verified_message = message.verify(from).map_err(|err| { + Error::Remote(RemoteError { + party: from.clone(), + error: RemoteErrorEnum::InvalidSignature(err), + }) })?; - let from_idx = *self.context.verifier_to_idx.get(from).ok_or(Error::Local( - LocalError::VerifierNotFound(format!("{:?}", from)), - ))?; + let from_idx = *self + .context + .verifier_to_idx + .get(from) + .ok_or(Error::Local(LocalError(format!( + "Verifier not found: {from:?}" + ))))?; match &self.tp { SessionType::Normal(round) => { @@ -372,26 +397,32 @@ where let result = round.verify_direct_message(from_idx, verified_message.payload()); let payload = wrap_receive_result(from, result)?; - Ok(ProcessedMessage(ProcessedMessageEnum::DmPayload { - from: from_idx, - payload, - message: verified_message, - })) + Ok(ProcessedMessage { + from: from.clone(), + from_idx, + message: ProcessedMessageEnum::DmPayload { + payload, + message: verified_message, + }, + }) } MessageType::Broadcast => { let result = round.verify_broadcast(from_idx, verified_message.payload()); let payload = wrap_receive_result(from, result)?; - Ok(ProcessedMessage(ProcessedMessageEnum::BcPayload { - from: from_idx, - payload, - message: verified_message, - })) + Ok(ProcessedMessage { + from: from.clone(), + from_idx, + message: ProcessedMessageEnum::BcPayload { + payload, + message: verified_message, + }, + }) } _ => { // TODO: this branch will never really be reached because we already routed // the message in the calling method. // Can we modify the code so that this branch is eliminated? - Err(Error::Local(LocalError::InvalidState( + Err(Error::Local(LocalError( "Unexpected broadcast consensus message".into(), ))) } @@ -403,9 +434,11 @@ where party: from.clone(), error: ProvableError::Consensus(err), })?; - Ok(ProcessedMessage(ProcessedMessageEnum::Bc { - from: from_idx, - })) + Ok(ProcessedMessage { + from: from.clone(), + from_idx, + message: ProcessedMessageEnum::Bc, + }) } } } @@ -414,7 +447,7 @@ where pub fn finalize_round( self, rng: &mut impl CryptoRngCore, - accum: RoundAccumulator, + accum: RoundAccumulator, ) -> Result, Error> { match self.tp { SessionType::Normal(round) => { @@ -430,7 +463,7 @@ where context: Context, round: Box>, rng: &mut impl CryptoRngCore, - accum: RoundAccumulator, + accum: RoundAccumulator, ) -> Result, Error> { let requires_bc = round.requires_broadcast_consensus(); @@ -445,13 +478,13 @@ where error: ProvableError::Protocol(error), } } - cggmp21::FinalizeError::Init(err) => { - Error::Local(LocalError::Init(format!("{:?}", err))) - } + cggmp21::FinalizeError::Init(err) => Error::Local(LocalError(format!( + "Failed to initialize the protocol: {err:?}" + ))), cggmp21::FinalizeError::Proof(proof) => Error::Proof { proof }, }, type_erased::FinalizeError::Accumulator(err) => { - Error::Local(LocalError::AccumFinalize(err)) + Error::Local(LocalError(format!("Failed to finalize: {err:?}"))) } })?; @@ -492,14 +525,15 @@ where fn finalize_bc_round( context: Context, round: Box>, - accum: RoundAccumulator, + accum: RoundAccumulator, ) -> Result, Error> { - accum - .bc_accum + let bc_accum = accum.bc_accum.ok_or(Error::Local(LocalError( + "The accumulator is in the invalid state for the broadcast consensus round".into(), + )))?; + + bc_accum .finalize() - .ok_or(Error::Local(LocalError::InvalidState( - "Cannot finalize".into(), - )))?; + .ok_or(Error::Local(LocalError("Cannot finalize".into())))?; let cached_messages = accum .cached_messages @@ -519,95 +553,138 @@ where } } -pub struct RoundAccumulator { +pub struct RoundAccumulator { received_direct_messages: Vec<(PartyIdx, VerifiedMessage)>, received_broadcasts: Vec<(PartyIdx, VerifiedMessage)>, processed: DynRoundAccum, cached_messages: Vec<(PartyIdx, SignedMessage)>, - bc_accum: BcConsensusAccum, - phantom_res: PhantomData, + bc_accum: Option, } -impl RoundAccumulator { - fn new(num_parties: usize, party_idx: PartyIdx, is_bc_round: bool, is_dm_round: bool) -> Self { +impl RoundAccumulator { + fn new( + num_parties: usize, + party_idx: PartyIdx, + is_bc_round: bool, + is_dm_round: bool, + is_bc_consensus_round: bool, + ) -> Self { // TODO: can return an error if party_idx is out of bounds Self { received_direct_messages: Vec::new(), received_broadcasts: Vec::new(), processed: DynRoundAccum::new(num_parties, party_idx, is_bc_round, is_dm_round), cached_messages: Vec::new(), - bc_accum: BcConsensusAccum::new(num_parties, party_idx), - phantom_res: PhantomData, + bc_accum: if is_bc_consensus_round { + Some(BcConsensusAccum::new(num_parties, party_idx)) + } else { + None + }, } } /// Save an artefact produced by [`Session::make_direct_message`]. - pub fn add_artefact(&mut self, artefact: Artefact) -> Result<(), LocalError> { + pub fn add_artefact( + &mut self, + artefact: Artefact, + ) -> Result<(), LocalError> { self.processed - .add_dm_artefact(artefact.destination, artefact.artefact) - .map_err(LocalError::AccumAdd) + .add_dm_artefact(artefact.destination_idx, artefact.artefact) + .map_err(|err| match err { + AccumAddError::SlotTaken => LocalError(format!( + "Artefact for the destination {:?} was already added", + artefact.destination + )), + AccumAddError::NoAccumulator => { + LocalError("This round does not send out direct messages".into()) + } + }) } /// Save a processed message produced by [`Session::verify_message`]. - pub fn add_processed_message(&mut self, pm: ProcessedMessage) -> Result<(), LocalError> { - // TODO: add a check that the index is in range, and wasn't filled yet - match pm.0 { - ProcessedMessageEnum::BcPayload { - from, - payload, - message, - } => { - self.processed - .add_bc_payload(from, payload) - .map_err(LocalError::AccumAdd)?; - self.received_broadcasts.push((from, message)); + pub fn add_processed_message( + &mut self, + pm: ProcessedMessage, + ) -> Result>, LocalError> { + match pm.message { + ProcessedMessageEnum::BcPayload { payload, message } => { + match self.processed.add_bc_payload(pm.from_idx, payload) { + Err(AccumAddError::SlotTaken) => { + return Ok(Err(RemoteError { + party: pm.from, + error: RemoteErrorEnum::DuplicateMessage, + })) + } + Err(AccumAddError::NoAccumulator) => { + return Err(LocalError( + "This round does not send out broadcast messages".into(), + )) + } + Ok(()) => {} + }; + self.received_broadcasts.push((pm.from_idx, message)); } - ProcessedMessageEnum::DmPayload { - from, - payload, - message, - } => { - self.processed - .add_dm_payload(from, payload) - .map_err(LocalError::AccumAdd)?; - self.received_direct_messages.push((from, message)); + ProcessedMessageEnum::DmPayload { payload, message } => { + match self.processed.add_dm_payload(pm.from_idx, payload) { + Err(AccumAddError::SlotTaken) => { + return Ok(Err(RemoteError { + party: pm.from, + error: RemoteErrorEnum::DuplicateMessage, + })) + } + Err(AccumAddError::NoAccumulator) => { + return Err(LocalError( + "This round does not send out direct messages".into(), + )) + } + Ok(()) => {} + }; + self.received_direct_messages.push((pm.from_idx, message)); } - ProcessedMessageEnum::Cache { from, message } => { + ProcessedMessageEnum::Cache { message } => { // TODO: check at this stage that there are no duplicate messages, // without waiting for the next round - self.cached_messages.push((from, message)); - } - ProcessedMessageEnum::Bc { from } => { - self.bc_accum.add_echo_received(from).unwrap(); + self.cached_messages.push((pm.from_idx, message)); } + ProcessedMessageEnum::Bc => match &mut self.bc_accum { + Some(accum) => { + if accum.add_echo_received(pm.from_idx).is_none() { + return Ok(Err(RemoteError { + party: pm.from, + error: RemoteErrorEnum::DuplicateMessage, + })); + } + } + None => return Err(LocalError("This is not a broadcast consensus round".into())), + }, } - Ok(()) + Ok(Ok(())) } } -pub struct Artefact { - destination: PartyIdx, +pub struct Artefact { + destination: Verifier, + destination_idx: PartyIdx, artefact: DynDmArtefact, } -pub struct ProcessedMessage(ProcessedMessageEnum); +pub struct ProcessedMessage { + from: Verifier, + from_idx: PartyIdx, + message: ProcessedMessageEnum, +} enum ProcessedMessageEnum { BcPayload { - from: PartyIdx, payload: DynBcPayload, message: VerifiedMessage, }, DmPayload { - from: PartyIdx, payload: DynDmPayload, message: VerifiedMessage, }, Cache { - from: PartyIdx, message: SignedMessage, }, - Bc { - from: PartyIdx, - }, + Bc, } diff --git a/synedrion/src/sessions/type_erased.rs b/synedrion/src/sessions/type_erased.rs index a78fa1fd..2bb37f4a 100644 --- a/synedrion/src/sessions/type_erased.rs +++ b/synedrion/src/sessions/type_erased.rs @@ -18,16 +18,16 @@ use crate::cggmp21::{ }; use crate::tools::collections::{HoleRange, HoleVec, HoleVecAccum}; -pub(crate) fn serialize_message( - message: &impl Serialize, -) -> Result, rmp_serde::encode::Error> { - rmp_serde::encode::to_vec(message).map(|serialized| serialized.into_boxed_slice()) +pub(crate) fn serialize_message(message: &impl Serialize) -> Result, LocalError> { + rmp_serde::encode::to_vec(message) + .map(|serialized| serialized.into_boxed_slice()) + .map_err(|err| LocalError(format!("Failed to serialize: {err:?}"))) } pub(crate) fn deserialize_message Deserialize<'de>>( message_bytes: &[u8], -) -> Result { - rmp_serde::decode::from_slice(message_bytes) +) -> Result { + rmp_serde::decode::from_slice(message_bytes).map_err(|err| err.to_string()) } pub(crate) enum FinalizeOutcome { @@ -38,7 +38,7 @@ pub(crate) enum FinalizeOutcome { #[derive(Debug, Clone, Copy)] pub enum AccumAddError { /// An item with the given origin has already been added to the accumulator. - SlotTaken(PartyIdx), + SlotTaken, /// Trying to add an item to an accumulator that was not initialized on construction. NoAccumulator, } @@ -142,7 +142,7 @@ where ) -> Result> { let typed_message: ::Message = match deserialize_message(message) { Ok(message) => message, - Err(err) => return Err(ReceiveError::CannotDeserialize(err.to_string())), + Err(err) => return Err(ReceiveError::CannotDeserialize(err)), }; let payload = self @@ -159,7 +159,7 @@ where ) -> Result> { let typed_message: ::Message = match deserialize_message(message) { Ok(message) => message, - Err(err) => return Err(ReceiveError::CannotDeserialize(err.to_string())), + Err(err) => return Err(ReceiveError::CannotDeserialize(err)), }; let payload = self @@ -177,8 +177,8 @@ where let mut boxed_rng = BoxedRng(rng); let serialized = self .make_broadcast(&mut boxed_rng) - .map_err(LocalError::InvalidState)?; - serialize_message(&serialized).map_err(|err| LocalError::CannotSerialize(err.to_string())) + .map_err(|err| LocalError(format!("Failed to make a broadcast message: {err:?}")))?; + serialize_message(&serialized) } fn requires_broadcast_consensus(&self) -> bool { @@ -197,9 +197,8 @@ where let mut boxed_rng = BoxedRng(rng); let (typed_message, typed_artefact) = self .make_direct_message(&mut boxed_rng, destination) - .map_err(LocalError::InvalidState)?; - let message = serialize_message(&typed_message) - .map_err(|err| LocalError::CannotSerialize(err.to_string()))?; + .map_err(|err| LocalError(format!("Failed to make a direct message: {err:?}")))?; + let message = serialize_message(&typed_message)?; Ok((message, DynDmArtefact(Box::new(typed_artefact)))) } } @@ -245,7 +244,7 @@ impl DynRoundAccum { match &mut self.bc_payloads { Some(payloads) => payloads .insert(from.as_usize(), payload) - .ok_or(AccumAddError::SlotTaken(from)), + .ok_or(AccumAddError::SlotTaken), None => Err(AccumAddError::NoAccumulator), } } @@ -258,7 +257,7 @@ impl DynRoundAccum { match &mut self.dm_payloads { Some(payloads) => payloads .insert(from.as_usize(), payload) - .ok_or(AccumAddError::SlotTaken(from)), + .ok_or(AccumAddError::SlotTaken), None => Err(AccumAddError::NoAccumulator), } } @@ -271,7 +270,7 @@ impl DynRoundAccum { match &mut self.dm_artefacts { Some(artefacts) => artefacts .insert(destination.as_usize(), artefact) - .ok_or(AccumAddError::SlotTaken(destination)), + .ok_or(AccumAddError::SlotTaken), None => Err(AccumAddError::NoAccumulator), } } diff --git a/synedrion/tests/sessions.rs b/synedrion/tests/sessions.rs index 3237327f..5332f2dd 100644 --- a/synedrion/tests/sessions.rs +++ b/synedrion/tests/sessions.rs @@ -74,10 +74,10 @@ async fn run_session( let result = session.verify_message(&from, message).unwrap(); // This will happen in a host task. - accum.add_processed_message(result).unwrap(); + accum.add_processed_message(result).unwrap().unwrap(); } - while !session.can_finalize(&accum) { + while !session.can_finalize(&accum).unwrap() { println!("{key:?}: waiting for a message"); let (from, message) = rx.recv().await.unwrap(); @@ -89,7 +89,7 @@ async fn run_session( let result = session.verify_message(&from, message).unwrap(); // This will happen in a host task. - accum.add_processed_message(result).unwrap(); + accum.add_processed_message(result).unwrap().unwrap(); } println!("{key:?}: finalizing the round");