Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
chainHead: Limit ongoing operations (#14699)
Browse files Browse the repository at this point in the history
* chainHead/api: Make storage/body/call pure RPC methods

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Add mpsc channel between RPC methods

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscriptions: Extract mpsc::Sender via BlockGuard

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscriptions: Generate and provide the method operation ID

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Generate `chainHead_body` response

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Generate `chainHead_call` response

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Generate `chainHead_storage` responses

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Propagate responses of methods to chainHead_follow

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Adjust `chainHead_body` responses

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Adjust `chainHead_call` responses

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Ensure unique operation IDs across methods

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/events: Remove old method events

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscriptions: Add limit helper

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscription: Expose limits to `BlockGuard`

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Adjust testing to ongoing operations

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Make limits configurable via `ChainHeadConfig`

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Adjust testing to `ChainHeadConfig`

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/tests: Ensure operation limits discards items

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Improve documentation

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Rename `OngoingOperations` -> `LimitOperations`

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Rename reserve -> reserve_at_most

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead: Use duration const instead of u64

Signed-off-by: Alexandru Vasile <[email protected]>

* chainHead/subscription: Use tokio::sync::Semaphore for limits

Signed-off-by: Alexandru Vasile <[email protected]>

* Update client/rpc-spec-v2/src/chain_head/subscription/inner.rs

Co-authored-by: Sebastian Kunert <[email protected]>

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: parity-processbot <>
Co-authored-by: Sebastian Kunert <[email protected]>
  • Loading branch information
2 people authored and Ank4n committed Aug 20, 2023
1 parent bd55f14 commit 981de06
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 100 deletions.
1 change: 1 addition & 0 deletions client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ hex = "0.4"
futures = "0.3.21"
parking_lot = "0.12.1"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio = { version = "1.22.0", features = ["sync"] }
array-bytes = "6.1"
log = "0.4.17"
futures-util = { version = "0.3.19", default-features = false }
Expand Down
96 changes: 67 additions & 29 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,41 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// The configuration of [`ChainHead`].
pub struct ChainHeadConfig {
/// The maximum number of pinned blocks across all subscriptions.
pub global_max_pinned_blocks: usize,
/// The maximum duration that a block is allowed to be pinned per subscription.
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
}

/// Maximum pinned blocks across all connections.
/// This number is large enough to consider immediate blocks.
/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db.
const MAX_PINNED_BLOCKS: usize = 512;

/// Any block of any subscription should not be pinned more than
/// this constant. When a subscription contains a block older than this,
/// the subscription becomes subject to termination.
/// Note: This should be enough for immediate blocks.
const MAX_PINNED_DURATION: Duration = Duration::from_secs(60);

/// The maximum number of ongoing operations per subscription.
/// Note: The lower limit imposed by the spec is 16.
const MAX_ONGOING_OPERATIONS: usize = 16;

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
}
}
}

/// An API for chain head RPC calls.
pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand All @@ -76,17 +111,17 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend: Arc<BE>,
executor: SubscriptionTaskExecutor,
genesis_hash: GenesisHash,
max_pinned_blocks: usize,
max_pinned_duration: Duration,
config: ChainHeadConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend: backend.clone(),
executor,
subscriptions: Arc::new(SubscriptionManagement::new(
max_pinned_blocks,
max_pinned_duration,
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
backend,
)),
genesis_hash,
Expand Down Expand Up @@ -197,12 +232,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<MethodResponse> {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
Expand Down Expand Up @@ -252,12 +285,10 @@ where
follow_subscription: String,
hash: Block::Hash,
) -> RpcResult<Option<String>> {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(None)
},
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(None),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
Expand Down Expand Up @@ -306,21 +337,27 @@ where
.transpose()?
.map(ChildInfo::new_default_from_vec);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};
let block_guard =
match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached),
Err(SubscriptionManagementError::BlockHashAbsent) => {
// Block is not part of the subscription.
return Err(ChainHeadRpcError::InvalidBlock.into())
},
Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()),
};

let storage_client = ChainHeadStorage::<Client, Block, BE>::new(self.client.clone());
let operation_id = block_guard.operation_id();

// The number of operations we are allowed to execute.
let num_operations = block_guard.num_reserved();
let discarded = items.len().saturating_sub(num_operations);
let mut items = items;
items.truncate(num_operations);

let fut = async move {
storage_client.generate_events(block_guard, hash, items, child_trie);
};
Expand All @@ -329,7 +366,7 @@ where
.spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed());
Ok(MethodResponse::Started(MethodResponseStarted {
operation_id,
discarded_items: Some(0),
discarded_items: Some(discarded),
}))
}

Expand All @@ -342,9 +379,10 @@ where
) -> RpcResult<MethodResponse> {
let call_parameters = Bytes::from(parse_hex_param(call_parameters)?);

let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) {
let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) {
Ok(block) => block,
Err(SubscriptionManagementError::SubscriptionAbsent) => {
Err(SubscriptionManagementError::SubscriptionAbsent) |
Err(SubscriptionManagementError::ExceededLimits) => {
// Invalid invalid subscription ID.
return Ok(MethodResponse::LimitReached)
},
Expand Down
2 changes: 1 addition & 1 deletion client/rpc-spec-v2/src/chain_head/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod chain_head_storage;
mod subscription;

pub use api::ChainHeadApiServer;
pub use chain_head::ChainHead;
pub use chain_head::{ChainHead, ChainHeadConfig};
pub use event::{
BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent,
RuntimeVersionEvent,
Expand Down
8 changes: 4 additions & 4 deletions client/rpc-spec-v2/src/chain_head/subscription/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use sp_blockchain::Error;
/// Subscription management error.
#[derive(Debug, thiserror::Error)]
pub enum SubscriptionManagementError {
/// The block cannot be pinned into memory because
/// the subscription has exceeded the maximum number
/// of blocks pinned.
#[error("Exceeded pinning limits")]
/// The subscription has exceeded the internal limits
/// regarding the number of pinned blocks in memory or
/// the number of ongoing operations.
#[error("Exceeded pinning or operation limits")]
ExceededLimits,
/// Error originated from the blockchain (client or backend).
#[error("Blockchain error {0}")]
Expand Down
Loading

0 comments on commit 981de06

Please sign in to comment.