From c83c1ae4e7e057d4791a1178ed0dcd224ee37f4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Lachowski?= Date: Mon, 22 Apr 2024 14:26:29 +0200 Subject: [PATCH] review changes for the Unit-Batch Api --- consensus/src/extension/mod.rs | 6 +-- consensus/src/member.rs | 74 +++++++++++----------------------- consensus/src/runway/mod.rs | 14 +++---- types/src/dataio.rs | 2 +- 4 files changed, 34 insertions(+), 62 deletions(-) diff --git a/consensus/src/extension/mod.rs b/consensus/src/extension/mod.rs index e6f8453d..8676f8c5 100644 --- a/consensus/src/extension/mod.rs +++ b/consensus/src/extension/mod.rs @@ -4,7 +4,7 @@ mod election; mod extender; mod units; -use aleph_bft_types::{FinalizationHandler, OrderedUnit}; +use aleph_bft_types::{BatchOfUnits, FinalizationHandler}; use extender::Extender; /// A struct responsible for executing the Consensus protocol on a local copy of the Dag. @@ -20,13 +20,13 @@ pub struct Ordering< H: Hasher, D: Data, MK: MultiKeychain, - FH: FinalizationHandler>>, + FH: FinalizationHandler>, > { extender: Extender>, finalization_handler: FH, } -impl>>> +impl>> Ordering { pub fn new(finalization_handler: FH) -> Self { diff --git a/consensus/src/member.rs b/consensus/src/member.rs index 1f615b37..49f3120f 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -11,7 +11,7 @@ use crate::{ Config, Data, DataProvider, Hasher, MultiKeychain, Network, NodeIndex, Receiver, Recipient, Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned, }; -use aleph_bft_types::{FinalizationHandler, NodeMap, OrderedUnit}; +use aleph_bft_types::{BatchOfUnits, FinalizationHandler, NodeMap}; use codec::{Decode, Encode}; use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt}; use futures_timer::Delay; @@ -23,7 +23,6 @@ use std::{ collections::HashSet, convert::TryInto, fmt::{self, Debug}, - marker::PhantomData, time::Duration, }; @@ -107,29 +106,44 @@ enum TaskDetails { } #[derive(Clone)] -pub struct LocalIO, US: AsyncWrite, UL: AsyncRead> { +pub struct LocalIO { data_provider: DP, finalization_handler: FH, unit_saver: US, unit_loader: UL, - _phantom: PhantomData, } -impl, US: AsyncWrite, UL: AsyncRead> - LocalIO +impl, US: AsyncWrite, UL: AsyncRead> + LocalIO { pub fn new( data_provider: DP, finalization_handler: FH, unit_saver: US, unit_loader: UL, - ) -> LocalIO { + ) -> Self { + LocalIO { + data_provider, + finalization_handler, + unit_saver, + unit_loader, + } + } + + pub fn new_with_unit_finalization_handler< + H: Hasher, + UFH: FinalizationHandler>, + >( + data_provider: DP, + finalization_handler: UFH, + unit_saver: US, + unit_loader: UL, + ) -> LocalIO { LocalIO { data_provider, finalization_handler, unit_saver, unit_loader, - _phantom: PhantomData, } } } @@ -568,48 +582,6 @@ where /// [docs for devs](https://cardinal-cryptography.github.io/AlephBFT/index.html) /// or the [original paper](https://arxiv.org/abs/1908.05156). pub async fn run_session< - H: Hasher, - DP: DataProvider, - FH: FinalizationHandler, - US: AsyncWrite + Send + Sync + 'static, - UL: AsyncRead + Send + Sync + 'static, - N: Network> + 'static, - SH: SpawnHandle, - MK: MultiKeychain, ->( - config: Config, - local_io: LocalIO, - network: N, - keychain: MK, - spawn_handle: SH, - terminator: Terminator, -) { - let local_io: LocalIO, DP, FH, US, UL> = LocalIO { - data_provider: local_io.data_provider, - finalization_handler: local_io.finalization_handler, - unit_saver: local_io.unit_saver, - unit_loader: local_io.unit_loader, - _phantom: PhantomData, - }; - run_session_for_units( - config, - local_io, - network, - keychain, - spawn_handle, - terminator, - ) - .await -} - -pub type BatchOfUnits = Vec>; - -/// Starts the consensus algorithm as an async task. It stops establishing consensus for new data items after -/// reaching the threshold specified in [`Config::max_round`] or upon receiving a stop signal from `exit`. -/// Please note that this interface is less stable than [`run_session`] as it exposes intrinsics (i.e. units) -/// which migh be subject to change. -#[doc(hidden)] -pub async fn run_session_for_units< H: Hasher, DP: DataProvider, FH: FinalizationHandler>, @@ -620,7 +592,7 @@ pub async fn run_session_for_units< MK: MultiKeychain, >( config: Config, - local_io: LocalIO, DP, FH, US, UL>, + local_io: LocalIO, network: N, keychain: MK, spawn_handle: SH, diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 66b96dc3..a27581e2 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -12,7 +12,7 @@ use crate::{ Config, Data, DataProvider, Hasher, Index, Keychain, MultiKeychain, NodeIndex, Receiver, Round, Sender, Signature, Signed, SpawnHandle, Terminator, UncheckedSigned, }; -use aleph_bft_types::{FinalizationHandler, OrderedUnit, Recipient}; +use aleph_bft_types::{BatchOfUnits, FinalizationHandler, Recipient}; use futures::{ channel::{mpsc, oneshot}, future::pending, @@ -103,7 +103,7 @@ struct Runway where H: Hasher, D: Data, - FH: FinalizationHandler>>, + FH: FinalizationHandler>, MK: MultiKeychain, { missing_coords: HashSet, @@ -208,7 +208,7 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> { struct RunwayConfig< H: Hasher, D: Data, - FH: FinalizationHandler>>, + FH: FinalizationHandler>, MK: MultiKeychain, > { finalization_handler: FH, @@ -228,7 +228,7 @@ impl Runway where H: Hasher, D: Data, - FH: FinalizationHandler>>, + FH: FinalizationHandler>, MK: MultiKeychain, { fn new(config: RunwayConfig, keychain: MK, validator: Validator) -> Self { @@ -673,7 +673,7 @@ pub struct RunwayIO< W: AsyncWrite + Send + Sync + 'static, R: AsyncRead + Send + Sync + 'static, DP: DataProvider, - FH: FinalizationHandler>>, + FH: FinalizationHandler>, > { pub data_provider: DP, pub finalization_handler: FH, @@ -688,7 +688,7 @@ impl< W: AsyncWrite + Send + Sync + 'static, R: AsyncRead + Send + Sync + 'static, DP: DataProvider, - FH: FinalizationHandler>>, + FH: FinalizationHandler>, > RunwayIO { pub fn new( @@ -719,7 +719,7 @@ pub(crate) async fn run( US: AsyncWrite + Send + Sync + 'static, UL: AsyncRead + Send + Sync + 'static, DP: DataProvider, - FH: FinalizationHandler>>, + FH: FinalizationHandler>, MK: MultiKeychain, SH: SpawnHandle, { diff --git a/types/src/dataio.rs b/types/src/dataio.rs index a5f60dd1..d4e116af 100644 --- a/types/src/dataio.rs +++ b/types/src/dataio.rs @@ -48,7 +48,7 @@ pub type BatchOfUnits = Vec>; impl> FinalizationHandler> for FH { - fn data_finalized(&mut self, batch: Vec>) { + fn data_finalized(&mut self, batch: BatchOfUnits) { for unit in batch { if let Some(data) = unit.data { self.data_finalized(data)