Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4339 - Submit abft score on chain #1868

Closed
wants to merge 11 commits into from
8 changes: 6 additions & 2 deletions bin/fake-runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use pallet_transaction_payment::FeeDetails;
use pallet_transaction_payment_rpc_runtime_api::RuntimeDispatchInfo;
use primitives::{
AccountId, ApiError as AlephApiError, AuraId, AuthorityId as AlephId, Balance, Block, Nonce,
Perbill, SessionAuthorityData, SessionCommittee, SessionIndex, SessionValidatorError,
Version as FinalityVersion,
Perbill, Round, Score, ScoreSignature, SessionAuthorityData, SessionCommittee, SessionIndex,
SessionValidatorError, Version as FinalityVersion,
};
use sp_consensus_aura::SlotDuration;
use sp_core::OpaqueMetadata;
Expand Down Expand Up @@ -195,6 +195,10 @@ pub mod fake_runtime {
fn current_era_payout() -> (Balance, Balance) {
unimplemented!()
}

fn submit_abft_score(_round: Round, _score: Score, _signature: ScoreSignature) -> Option<()>{
unimplemented!()
}
}

/// There’s an important remark on how this fake runtime must be implemented - it does not need to
Expand Down
14 changes: 9 additions & 5 deletions bin/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ use parity_scale_codec::{Decode, Encode, MaxEncodedLen};
use primitives::{
staking::MAX_NOMINATORS_REWARDED_PER_VALIDATOR, wrap_methods, Address,
AlephNodeSessionKeys as SessionKeys, ApiError as AlephApiError, AuraId, AuthorityId as AlephId,
BlockNumber as AlephBlockNumber, Header as AlephHeader, SessionAuthorityData, SessionCommittee,
SessionIndex, SessionInfoProvider, SessionValidatorError,
TotalIssuanceProvider as TotalIssuanceProviderT, Version as FinalityVersion,
ADDRESSES_ENCODING, DEFAULT_BAN_REASON_LENGTH, DEFAULT_MAX_WINNERS, DEFAULT_SESSIONS_PER_ERA,
DEFAULT_SESSION_PERIOD, MAX_BLOCK_SIZE, MILLISECS_PER_BLOCK, TOKEN,
BlockNumber as AlephBlockNumber, Header as AlephHeader, Round, Score, ScoreSignature,
SessionAuthorityData, SessionCommittee, SessionIndex, SessionInfoProvider,
SessionValidatorError, TotalIssuanceProvider as TotalIssuanceProviderT,
Version as FinalityVersion, ADDRESSES_ENCODING, DEFAULT_BAN_REASON_LENGTH, DEFAULT_MAX_WINNERS,
DEFAULT_SESSIONS_PER_ERA, DEFAULT_SESSION_PERIOD, MAX_BLOCK_SIZE, MILLISECS_PER_BLOCK, TOKEN,
};
pub use primitives::{AccountId, AccountIndex, Balance, Hash, Nonce, Signature};
use sp_api::impl_runtime_apis;
Expand Down Expand Up @@ -1238,6 +1238,10 @@ impl_runtime_apis! {

ExponentialEraPayout::era_payout(total_issuance, MILLISECONDS_PER_ERA)
}

fn submit_abft_score(round: Round, score: Score, signature: ScoreSignature) -> Option<()> {
Aleph::submit_abft_score(round, score, signature)
}
}

impl pallet_nomination_pools_runtime_api::NominationPoolsApi<Block, AccountId, Balance> for Runtime {
Expand Down
9 changes: 7 additions & 2 deletions finality-aleph/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use primitives::TransactionHash;
use rate_limiter::SharedRateLimiter;
use sc_client_api::Backend;
use sc_keystore::{Keystore, LocalKeystore};
use sc_transaction_pool_api::TransactionPool;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_transaction_pool_api::{LocalTransactionPool, TransactionPool};
use sp_consensus_aura::AuraApi;

use crate::{
Expand Down Expand Up @@ -60,7 +61,9 @@ where
C: crate::ClientForAleph<Block, BE> + Send + Sync + 'static,
C::Api: AlephSessionApi<Block> + AuraApi<Block, AuraId>,
BE: Backend<Block> + 'static,
TP: TransactionPool<Block = Block, Hash = TransactionHash> + 'static,
TP: TransactionPool<Block = Block, Hash = TransactionHash>
+ LocalTransactionPool<Block = Block, Hash = TransactionHash>
+ 'static,
{
let AlephConfig {
authentication_network,
Expand Down Expand Up @@ -149,6 +152,7 @@ where
let slo_metrics = SloMetrics::new(registry.as_ref(), chain_status.clone());
let timing_metrics = slo_metrics.timing_metrics().clone();

let offchain_tx_pool_factory = OffchainTransactionPoolFactory::new(transaction_pool.clone());
spawn_handle.spawn("aleph/slo-metrics", {
let slo_metrics = slo_metrics.clone();
async move {
Expand Down Expand Up @@ -266,6 +270,7 @@ where
spawn_handle,
connection_manager,
keystore,
offchain_tx_pool_factory,
),
session_info,
});
Expand Down
118 changes: 78 additions & 40 deletions finality-aleph/src/party/manager/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, sync::Arc};

use futures::{
channel::{mpsc, oneshot},
Expand All @@ -7,6 +7,12 @@ use futures::{
StreamExt,
};
use log::{debug, error, trace};
use pallet_aleph_runtime_api::AlephSessionApi;
use primitives::Point;
use primitives::DEFAULT_SCORE_INTERVAL;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_api::ProvideRuntimeApi;
use sp_runtime::traits::Block as BlockT;
use tokio::time;

use crate::{
Expand All @@ -29,6 +35,7 @@ use crate::{
BlockId, CurrentRmcNetworkData, Keychain, LegacyRmcNetworkData, SessionBoundaries,
STATUS_REPORT_INTERVAL,
};
use sp_api::ApiExt;

#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -71,19 +78,19 @@ async fn process_new_block_data<CN, LN>(
aggregator.start_aggregation(hash).await;
}

fn process_hash<H, C, JS>(
fn process_hash<H, HB, JS>(
hash: BlockHash,
multisignature: SignatureSet<Signature>,
justifications_for_chain: &mut JS,
justification_translator: &JustificationTranslator,
client: &C,
header_backend: &HB,
) -> Result<(), ()>
where
H: Header,
C: HeaderBackend<H> + 'static,
HB: HeaderBackend<H> + 'static,
JS: JustificationSubmissions<Justification> + Send + Sync + Clone,
{
let number = client.hash_to_id(hash).unwrap().unwrap().number();
let number = header_backend.hash_to_id(hash).unwrap().unwrap().number();
// The unwrap might actually fail if data availability is not implemented correctly.
let justification = match justification_translator.translate(
AlephJustification::CommitteeMultisignature(multisignature),
Expand All @@ -102,19 +109,24 @@ where
Ok(())
}

async fn run_aggregator<H, C, CN, LN, JS>(
async fn run_aggregator<B, C, H, HB, CN, LN, JS>(
mut aggregator: Aggregator<CN, LN>,
io: IO<JS>,
client: C,
header_backend: HB,
session_boundaries: &SessionBoundaries,
mut metrics: TimingBlockMetrics,
mut exit_rx: oneshot::Receiver<()>,
client: Arc<C>,
offchain_tx_pool_factory: OffchainTransactionPoolFactory<B>,
) -> Result<(), Error>
where
B: BlockT<Hash = BlockHash>,
H: Header,
JS: JustificationSubmissions<Justification> + Send + Sync + Clone,
C: HeaderBackend<H> + 'static,
HB: HeaderBackend<H> + 'static,
LN: Network<LegacyRmcNetworkData>,
C: ProvideRuntimeApi<B> + Send + Sync + 'static,
C::Api: AlephSessionApi<B>,
CN: Network<CurrentRmcNetworkData>,
{
let IO {
Expand All @@ -138,38 +150,57 @@ where

let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL);

let interval = DEFAULT_SCORE_INTERVAL;
let signature = 0;
let score: Vec<Point> = (0..14).collect();
loop {
trace!(target: "aleph-party", "Aggregator Loop started a next iteration");
tokio::select! {
maybe_block = blocks_from_interpreter.next(), if !no_more_blocks => match maybe_block {
Some(block) => {
hash_of_last_block = Some(block.hash());
process_new_block_data::<CN, LN>(
&mut aggregator,
block,
&mut metrics
).await;
},
None => {
debug!(target: "aleph-party", "Blocks ended in aggregator.");
no_more_blocks = true;
},
},
multisigned_hash = aggregator.next_multisigned_hash() => {
let (hash, multisignature) = multisigned_hash.ok_or(Error::MultisignaturesStreamTerminated)?;
process_hash(hash, multisignature, &mut justifications_for_chain, &justification_translator, &client).map_err(|_| Error::UnableToProcessHash)?;
if Some(hash) == hash_of_last_block {
hash_of_last_block = None;
maybe_block = blocks_from_interpreter.next(), if !no_more_blocks => match maybe_block {
Some(block) => {
hash_of_last_block = Some(block.hash());

let round = block.number();
if round % interval == 0 {

let mut runtime_api = client.runtime_api();
runtime_api.register_extension(
offchain_tx_pool_factory.offchain_transaction_pool(block.hash()),
);

if let Ok(_) = runtime_api.submit_abft_score(block.hash(), round, score.clone(), signature) {
debug!(target: "aleph-party", "Submited new score for round {}", round);
} else {
debug!(target: "aleph-party", "Error submitting score.");
}
}
},
_ = status_ticker.tick() => {
aggregator.status_report();
},
_ = &mut exit_rx => {
debug!(target: "aleph-party", "Aggregator received exit signal. Terminating.");
break;
}
}

process_new_block_data::<CN, LN>(
&mut aggregator,
block,
&mut metrics
).await;
},
None => {
debug!(target: "aleph-party", "Blocks ended in aggregator.");
no_more_blocks = true;
},
},
multisigned_hash = aggregator.next_multisigned_hash() => {
let (hash, multisignature) = multisigned_hash.ok_or(Error::MultisignaturesStreamTerminated)?;
process_hash(hash, multisignature, &mut justifications_for_chain, &justification_translator, &header_backend).map_err(|_| Error::UnableToProcessHash)?;
if Some(hash) == hash_of_last_block {
hash_of_last_block = None;
}
},
_ = status_ticker.tick() => {
aggregator.status_report();
},
_ = &mut exit_rx => {
debug!(target: "aleph-party", "Aggregator received exit signal. Terminating.");
break;
}
}
if hash_of_last_block.is_none() && no_more_blocks {
debug!(target: "aleph-party", "Aggregator processed all provided blocks. Terminating.");
break;
Expand All @@ -185,21 +216,26 @@ pub enum AggregatorVersion<CN, LN> {
}

/// Runs the justification signature aggregator within a single session.
pub fn task<H, C, CN, LN, JS>(
pub fn task<B, C, H, HB, CN, LN, JS>(
subtask_common: AuthoritySubtaskCommon,
client: C,
header_backend: HB,
io: IO<JS>,
session_boundaries: SessionBoundaries,
metrics: TimingBlockMetrics,
multikeychain: Keychain,
version: AggregatorVersion<CN, LN>,
client: Arc<C>,
offchain_tx_pool_factory: OffchainTransactionPoolFactory<B>,
) -> Task
where
B: BlockT<Hash = BlockHash>,
H: Header,
JS: JustificationSubmissions<Justification> + Send + Sync + Clone + 'static,
C: HeaderBackend<H> + 'static,
HB: HeaderBackend<H> + 'static,
LN: Network<LegacyRmcNetworkData> + 'static,
CN: Network<CurrentRmcNetworkData> + 'static,
C: ProvideRuntimeApi<B> + Send + Sync + 'static,
C::Api: AlephSessionApi<B>,
{
let AuthoritySubtaskCommon {
spawn_handle,
Expand All @@ -216,10 +252,12 @@ where
let result = run_aggregator(
aggregator_io,
io,
client,
header_backend,
&session_boundaries,
metrics,
exit,
client,
offchain_tx_pool_factory,
)
.await;
let result = match result {
Expand Down
8 changes: 8 additions & 0 deletions finality-aleph/src/party/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use log::{debug, info, trace, warn};
use network_clique::SpawnHandleExt;
use pallet_aleph_runtime_api::AlephSessionApi;
use sc_keystore::{Keystore, LocalKeystore};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_application_crypto::RuntimeAppPublic;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};

Expand Down Expand Up @@ -114,6 +115,7 @@ where
spawn_handle: SpawnHandle,
session_manager: SM,
keystore: Arc<LocalKeystore>,
offchain_tx_pool_factory: OffchainTransactionPoolFactory<B>,
_phantom: PhantomData<(B, H)>,
}

Expand Down Expand Up @@ -146,6 +148,7 @@ where
spawn_handle: SpawnHandle,
session_manager: SM,
keystore: Arc<LocalKeystore>,
offchain_tx_pool_factory: OffchainTransactionPoolFactory<B>,
) -> Self {
Self {
client,
Expand All @@ -161,6 +164,7 @@ where
spawn_handle,
session_manager,
keystore,
offchain_tx_pool_factory,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -231,6 +235,8 @@ where
self.metrics.clone(),
multikeychain,
AggregatorVersion::<CurrentNetworkType, _>::Legacy(rmc_network),
self.client.clone(),
self.offchain_tx_pool_factory.clone(),
),
task::task(subtask_common.clone(), chain_tracker, "chain tracker"),
task::task(subtask_common, data_store, "data store"),
Expand Down Expand Up @@ -303,6 +309,8 @@ where
self.metrics.clone(),
multikeychain,
AggregatorVersion::<_, LegacyNetworkType>::Current(rmc_network),
self.client.clone(),
self.offchain_tx_pool_factory.clone(),
),
task::task(subtask_common.clone(), chain_tracker, "chain tracker"),
task::task(subtask_common, data_store, "data store"),
Expand Down
6 changes: 4 additions & 2 deletions pallets/aleph-runtime-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
#![cfg_attr(not(feature = "std"), no_std)]

use primitives::{
AccountId, ApiError, AuthorityId, Balance, Perbill, SessionAuthorityData, SessionCommittee,
SessionIndex, SessionValidatorError, Version,
AccountId, ApiError, AuthorityId, Balance, Perbill, Round, Score, ScoreSignature,
SessionAuthorityData, SessionCommittee, SessionIndex, SessionValidatorError, Version,
};
pub use sp_consensus_aura::sr25519::AuthorityId as AuraId;
use sp_std::vec::Vec;
Expand Down Expand Up @@ -37,5 +37,7 @@ sp_api::decl_runtime_apis! {
fn yearly_inflation() -> Perbill;
/// Returns payout. First tuple item is a validators payout, 2nd is the rest.
fn current_era_payout() -> (Balance, Balance);
/// Submits score for a round in a session of performance of finality committee members.
fn submit_abft_score(round: Round, score: Score, signature: ScoreSignature) -> Option<()>;
}
}
Loading
Loading