From 981de06a83d1272e779f1519f044a2629280c497 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 15 Aug 2023 15:17:41 +0300 Subject: [PATCH] chainHead: Limit ongoing operations (#14699) * chainHead/api: Make storage/body/call pure RPC methods Signed-off-by: Alexandru Vasile * chainHead: Add mpsc channel between RPC methods Signed-off-by: Alexandru Vasile * chainHead/subscriptions: Extract mpsc::Sender via BlockGuard Signed-off-by: Alexandru Vasile * chainHead/subscriptions: Generate and provide the method operation ID Signed-off-by: Alexandru Vasile * chainHead: Generate `chainHead_body` response Signed-off-by: Alexandru Vasile * chainHead: Generate `chainHead_call` response Signed-off-by: Alexandru Vasile * chainHead: Generate `chainHead_storage` responses Signed-off-by: Alexandru Vasile * chainHead: Propagate responses of methods to chainHead_follow Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust `chainHead_body` responses Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust `chainHead_call` responses Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust `chainHead_call` responses Signed-off-by: Alexandru Vasile * chainHead/tests: Ensure unique operation IDs across methods Signed-off-by: Alexandru Vasile * chainHead/events: Remove old method events Signed-off-by: Alexandru Vasile * chainHead/subscriptions: Add limit helper Signed-off-by: Alexandru Vasile * chainHead/subscription: Expose limits to `BlockGuard` Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust testing to ongoing operations Signed-off-by: Alexandru Vasile * chainHead: Make limits configurable via `ChainHeadConfig` Signed-off-by: Alexandru Vasile * chainHead/tests: Adjust testing to `ChainHeadConfig` Signed-off-by: Alexandru Vasile * chainHead/tests: Ensure operation limits discards items Signed-off-by: Alexandru Vasile * chainHead: Improve documentation Signed-off-by: Alexandru Vasile * chainHead: Rename `OngoingOperations` -> `LimitOperations` Signed-off-by: Alexandru Vasile * chainHead: Rename reserve -> reserve_at_most Signed-off-by: Alexandru Vasile * chainHead: Use duration const instead of u64 Signed-off-by: Alexandru Vasile * chainHead/subscription: Use tokio::sync::Semaphore for limits Signed-off-by: Alexandru Vasile * Update client/rpc-spec-v2/src/chain_head/subscription/inner.rs Co-authored-by: Sebastian Kunert --------- Signed-off-by: Alexandru Vasile Co-authored-by: parity-processbot <> Co-authored-by: Sebastian Kunert --- client/rpc-spec-v2/Cargo.toml | 1 + .../rpc-spec-v2/src/chain_head/chain_head.rs | 96 ++++++--- client/rpc-spec-v2/src/chain_head/mod.rs | 2 +- .../src/chain_head/subscription/error.rs | 8 +- .../src/chain_head/subscription/inner.rs | 157 ++++++++++++-- .../src/chain_head/subscription/mod.rs | 13 +- client/rpc-spec-v2/src/chain_head/tests.rs | 196 +++++++++++++++--- client/service/src/builder.rs | 21 +- 8 files changed, 394 insertions(+), 100 deletions(-) diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index b1ab2a8799744..599596777b7b6 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -32,6 +32,7 @@ hex = "0.4" futures = "0.3.21" parking_lot = "0.12.1" tokio-stream = { version = "0.1", features = ["sync"] } +tokio = { version = "1.22.0", features = ["sync"] } array-bytes = "6.1" log = "0.4.17" futures-util = { version = "0.3.19", default-features = false } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 16881b05fd7b9..79cf251f18068 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -53,6 +53,41 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; +/// The configuration of [`ChainHead`]. +pub struct ChainHeadConfig { + /// The maximum number of pinned blocks across all subscriptions. + pub global_max_pinned_blocks: usize, + /// The maximum duration that a block is allowed to be pinned per subscription. + pub subscription_max_pinned_duration: Duration, + /// The maximum number of ongoing operations per subscription. + pub subscription_max_ongoing_operations: usize, +} + +/// Maximum pinned blocks across all connections. +/// This number is large enough to consider immediate blocks. +/// Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. +const MAX_PINNED_BLOCKS: usize = 512; + +/// Any block of any subscription should not be pinned more than +/// this constant. When a subscription contains a block older than this, +/// the subscription becomes subject to termination. +/// Note: This should be enough for immediate blocks. +const MAX_PINNED_DURATION: Duration = Duration::from_secs(60); + +/// The maximum number of ongoing operations per subscription. +/// Note: The lower limit imposed by the spec is 16. +const MAX_ONGOING_OPERATIONS: usize = 16; + +impl Default for ChainHeadConfig { + fn default() -> Self { + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: MAX_PINNED_DURATION, + subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, + } + } +} + /// An API for chain head RPC calls. pub struct ChainHead, Block: BlockT, Client> { /// Substrate client. @@ -76,8 +111,7 @@ impl, Block: BlockT, Client> ChainHead { backend: Arc, executor: SubscriptionTaskExecutor, genesis_hash: GenesisHash, - max_pinned_blocks: usize, - max_pinned_duration: Duration, + config: ChainHeadConfig, ) -> Self { let genesis_hash = hex_string(&genesis_hash.as_ref()); Self { @@ -85,8 +119,9 @@ impl, Block: BlockT, Client> ChainHead { backend: backend.clone(), executor, subscriptions: Arc::new(SubscriptionManagement::new( - max_pinned_blocks, - max_pinned_duration, + config.global_max_pinned_blocks, + config.subscription_max_pinned_duration, + config.subscription_max_ongoing_operations, backend, )), genesis_hash, @@ -197,12 +232,10 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult { - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) => { - // Invalid invalid subscription ID. - return Ok(MethodResponse::LimitReached) - }, + Err(SubscriptionManagementError::SubscriptionAbsent) | + Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached), Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. return Err(ChainHeadRpcError::InvalidBlock.into()) @@ -252,12 +285,10 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult> { - let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { + let _block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) => { - // Invalid invalid subscription ID. - return Ok(None) - }, + Err(SubscriptionManagementError::SubscriptionAbsent) | + Err(SubscriptionManagementError::ExceededLimits) => return Ok(None), Err(SubscriptionManagementError::BlockHashAbsent) => { // Block is not part of the subscription. return Err(ChainHeadRpcError::InvalidBlock.into()) @@ -306,21 +337,27 @@ where .transpose()? .map(ChildInfo::new_default_from_vec); - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { - Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) => { - // Invalid invalid subscription ID. - return Ok(MethodResponse::LimitReached) - }, - Err(SubscriptionManagementError::BlockHashAbsent) => { - // Block is not part of the subscription. - return Err(ChainHeadRpcError::InvalidBlock.into()) - }, - Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), - }; + let block_guard = + match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) { + Ok(block) => block, + Err(SubscriptionManagementError::SubscriptionAbsent) | + Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached), + Err(SubscriptionManagementError::BlockHashAbsent) => { + // Block is not part of the subscription. + return Err(ChainHeadRpcError::InvalidBlock.into()) + }, + Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), + }; let storage_client = ChainHeadStorage::::new(self.client.clone()); let operation_id = block_guard.operation_id(); + + // The number of operations we are allowed to execute. + let num_operations = block_guard.num_reserved(); + let discarded = items.len().saturating_sub(num_operations); + let mut items = items; + items.truncate(num_operations); + let fut = async move { storage_client.generate_events(block_guard, hash, items, child_trie); }; @@ -329,7 +366,7 @@ where .spawn_blocking("substrate-rpc-subscription", Some("rpc"), fut.boxed()); Ok(MethodResponse::Started(MethodResponseStarted { operation_id, - discarded_items: Some(0), + discarded_items: Some(discarded), })) } @@ -342,9 +379,10 @@ where ) -> RpcResult { let call_parameters = Bytes::from(parse_hex_param(call_parameters)?); - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash) { + let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, - Err(SubscriptionManagementError::SubscriptionAbsent) => { + Err(SubscriptionManagementError::SubscriptionAbsent) | + Err(SubscriptionManagementError::ExceededLimits) => { // Invalid invalid subscription ID. return Ok(MethodResponse::LimitReached) }, diff --git a/client/rpc-spec-v2/src/chain_head/mod.rs b/client/rpc-spec-v2/src/chain_head/mod.rs index f0fa898f9f7e1..1bd2288578025 100644 --- a/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/mod.rs @@ -37,7 +37,7 @@ mod chain_head_storage; mod subscription; pub use api::ChainHeadApiServer; -pub use chain_head::ChainHead; +pub use chain_head::{ChainHead, ChainHeadConfig}; pub use event::{ BestBlockChanged, ErrorEvent, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, RuntimeVersionEvent, diff --git a/client/rpc-spec-v2/src/chain_head/subscription/error.rs b/client/rpc-spec-v2/src/chain_head/subscription/error.rs index 443ee9fb87a25..38e8fd7384fcb 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/error.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/error.rs @@ -21,10 +21,10 @@ use sp_blockchain::Error; /// Subscription management error. #[derive(Debug, thiserror::Error)] pub enum SubscriptionManagementError { - /// The block cannot be pinned into memory because - /// the subscription has exceeded the maximum number - /// of blocks pinned. - #[error("Exceeded pinning limits")] + /// The subscription has exceeded the internal limits + /// regarding the number of pinned blocks in memory or + /// the number of ongoing operations. + #[error("Exceeded pinning or operation limits")] ExceededLimits, /// Error originated from the blockchain (client or backend). #[error("Blockchain error {0}")] diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index c0c2701c5e145..9f42be4a2f7f6 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -107,6 +107,62 @@ impl BlockStateMachine { } } +/// Limit the number of ongoing operations across methods. +struct LimitOperations { + /// Limit the number of ongoing operations for this subscription. + semaphore: Arc, +} + +impl LimitOperations { + /// Constructs a new [`LimitOperations`]. + fn new(max_operations: usize) -> Self { + LimitOperations { semaphore: Arc::new(tokio::sync::Semaphore::new(max_operations)) } + } + + /// Reserves capacity to execute at least one operation and at most the requested items. + /// + /// Dropping [`PermitOperations`] without executing an operation will release + /// the reserved capacity. + /// + /// Returns nothing if there's no space available, else returns a permit + /// that guarantees that at least one operation can be executed. + fn reserve_at_most(&self, to_reserve: usize) -> Option { + let num_ops = std::cmp::min(self.semaphore.available_permits(), to_reserve); + + if num_ops == 0 { + return None + } + + let permits = Arc::clone(&self.semaphore) + .try_acquire_many_owned(num_ops.try_into().ok()?) + .ok()?; + + Some(PermitOperations { num_ops, _permit: permits }) + } +} + +/// Permits a number of operations to be executed. +/// +/// [`PermitOperations`] are returned by [`LimitOperations::reserve()`] and are used +/// to guarantee the RPC server can execute the number of operations. +/// +/// The number of reserved items are given back to the [`LimitOperations`] on drop. +struct PermitOperations { + /// The number of operations permitted (reserved). + num_ops: usize, + /// The permit for these operations. + _permit: tokio::sync::OwnedSemaphorePermit, +} + +impl PermitOperations { + /// Returns the number of reserved elements for this permit. + /// + /// This can be smaller than the number of items requested via [`LimitOperations::reserve()`]. + fn num_reserved(&self) -> usize { + self.num_ops + } +} + struct BlockState { /// The state machine of this block. state_machine: BlockStateMachine, @@ -124,6 +180,8 @@ struct SubscriptionState { /// /// This object is cloned between methods. response_sender: TracingUnboundedSender>, + /// Limit the number of ongoing operations. + limits: LimitOperations, /// The next operation ID. next_operation_id: usize, /// Track the block hashes available for this subscription. @@ -244,6 +302,13 @@ impl SubscriptionState { self.next_operation_id = self.next_operation_id.wrapping_add(1); op_id } + + /// Reserves capacity to execute at least one operation and at most the requested items. + /// + /// For more details see [`PermitOperations`]. + fn reserve_at_most(&self, to_reserve: usize) -> Option { + self.limits.reserve_at_most(to_reserve) + } } /// Keeps a specific block pinned while the handle is alive. @@ -254,6 +319,7 @@ pub struct BlockGuard> { with_runtime: bool, response_sender: TracingUnboundedSender>, operation_id: String, + permit_operations: PermitOperations, backend: Arc, } @@ -272,6 +338,7 @@ impl> BlockGuard { with_runtime: bool, response_sender: TracingUnboundedSender>, operation_id: usize, + permit_operations: PermitOperations, backend: Arc, ) -> Result { backend @@ -283,6 +350,7 @@ impl> BlockGuard { with_runtime, response_sender, operation_id: operation_id.to_string(), + permit_operations, backend, }) } @@ -301,6 +369,13 @@ impl> BlockGuard { pub fn operation_id(&self) -> String { self.operation_id.clone() } + + /// Returns the number of reserved elements for this permit. + /// + /// This can be smaller than the number of items requested. + pub fn num_reserved(&self) -> usize { + self.permit_operations.num_reserved() + } } impl> Drop for BlockGuard { @@ -328,6 +403,8 @@ pub struct SubscriptionsInner> { global_max_pinned_blocks: usize, /// The maximum duration that a block is allowed to be pinned per subscription. local_max_pin_duration: Duration, + /// The maximum number of ongoing operations per subscription. + max_ongoing_operations: usize, /// Map the subscription ID to internal details of the subscription. subs: HashMap>, /// Backend pinning / unpinning blocks. @@ -341,12 +418,14 @@ impl> SubscriptionsInner { pub fn new( global_max_pinned_blocks: usize, local_max_pin_duration: Duration, + max_ongoing_operations: usize, backend: Arc, ) -> Self { SubscriptionsInner { global_blocks: Default::default(), global_max_pinned_blocks, local_max_pin_duration, + max_ongoing_operations, subs: Default::default(), backend, } @@ -366,6 +445,7 @@ impl> SubscriptionsInner { with_runtime, tx_stop: Some(tx_stop), response_sender, + limits: LimitOperations::new(self.max_ongoing_operations), next_operation_id: 0, blocks: Default::default(), }; @@ -541,6 +621,7 @@ impl> SubscriptionsInner { &mut self, sub_id: &str, hash: Block::Hash, + to_reserve: usize, ) -> Result, SubscriptionManagementError> { let Some(sub) = self.subs.get_mut(sub_id) else { return Err(SubscriptionManagementError::SubscriptionAbsent) @@ -550,12 +631,18 @@ impl> SubscriptionsInner { return Err(SubscriptionManagementError::BlockHashAbsent) } + let Some(permit_operations) = sub.reserve_at_most(to_reserve) else { + // Error when the server cannot execute at least one operation. + return Err(SubscriptionManagementError::ExceededLimits) + }; + let operation_id = sub.next_operation_id(); BlockGuard::new( hash, sub.with_runtime, sub.response_sender.clone(), operation_id, + permit_operations, self.backend.clone(), ) } @@ -574,6 +661,9 @@ mod tests { Client, ClientBlockImportExt, GenesisInit, }; + /// Maximum number of ongoing operations per subscription ID. + const MAX_OPERATIONS_PER_SUB: usize = 16; + fn init_backend() -> ( Arc>, Arc>>, @@ -669,6 +759,7 @@ mod tests { tx_stop: None, response_sender, next_operation_id: 0, + limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB), blocks: Default::default(), }; @@ -698,6 +789,7 @@ mod tests { tx_stop: None, response_sender, next_operation_id: 0, + limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB), blocks: Default::default(), }; @@ -730,13 +822,14 @@ mod tests { fn subscription_lock_block() { let builder = TestClientBuilder::new(); let backend = builder.backend(); - let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); let hash = H256::random(); // Subscription not inserted. - let err = subs.lock_block(&id, hash).unwrap_err(); + let err = subs.lock_block(&id, hash, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); let _stop = subs.insert_subscription(id.clone(), true).unwrap(); @@ -744,13 +837,13 @@ mod tests { assert!(subs.insert_subscription(id.clone(), true).is_none()); // No block hash. - let err = subs.lock_block(&id, hash).unwrap_err(); + let err = subs.lock_block(&id, hash, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); subs.remove_subscription(&id); // No subscription. - let err = subs.lock_block(&id, hash).unwrap_err(); + let err = subs.lock_block(&id, hash, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); } @@ -762,7 +855,8 @@ mod tests { let hash = block.header.hash(); futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); let _stop = subs.insert_subscription(id.clone(), true).unwrap(); @@ -770,7 +864,7 @@ mod tests { // First time we are pinning the block. assert_eq!(subs.pin_block(&id, hash).unwrap(), true); - let block = subs.lock_block(&id, hash).unwrap(); + let block = subs.lock_block(&id, hash, 1).unwrap(); // Subscription started with runtime updates assert_eq!(block.has_runtime(), true); @@ -780,7 +874,7 @@ mod tests { // Unpin the block. subs.unpin_block(&id, hash).unwrap(); - let err = subs.lock_block(&id, hash).unwrap_err(); + let err = subs.lock_block(&id, hash, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::BlockHashAbsent); } @@ -791,7 +885,8 @@ mod tests { let hash = block.header.hash(); futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); let _stop = subs.insert_subscription(id.clone(), true).unwrap(); @@ -839,7 +934,8 @@ mod tests { let hash_3 = block.header.hash(); futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); - let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -884,7 +980,8 @@ mod tests { futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); // Maximum number of pinned blocks is 2. - let mut subs = SubscriptionsInner::new(2, Duration::from_secs(10), backend); + let mut subs = + SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -908,10 +1005,10 @@ mod tests { assert_eq!(err, SubscriptionManagementError::ExceededLimits); // Ensure both subscriptions are removed. - let err = subs.lock_block(&id_1, hash_1).unwrap_err(); + let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); - let err = subs.lock_block(&id_2, hash_1).unwrap_err(); + let err = subs.lock_block(&id_2, hash_1, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); assert!(subs.global_blocks.get(&hash_1).is_none()); @@ -934,7 +1031,8 @@ mod tests { futures::executor::block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); // Maximum number of pinned blocks is 2 and maximum pin duration is 5 second. - let mut subs = SubscriptionsInner::new(2, Duration::from_secs(5), backend); + let mut subs = + SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend); let id_1 = "abc".to_string(); let id_2 = "abcd".to_string(); @@ -958,10 +1056,10 @@ mod tests { assert_eq!(err, SubscriptionManagementError::ExceededLimits); // Ensure both subscriptions are removed. - let err = subs.lock_block(&id_1, hash_1).unwrap_err(); + let err = subs.lock_block(&id_1, hash_1, 1).unwrap_err(); assert_eq!(err, SubscriptionManagementError::SubscriptionAbsent); - let _block_guard = subs.lock_block(&id_2, hash_1).unwrap(); + let _block_guard = subs.lock_block(&id_2, hash_1, 1).unwrap(); assert_eq!(*subs.global_blocks.get(&hash_1).unwrap(), 1); assert!(subs.global_blocks.get(&hash_2).is_none()); @@ -983,7 +1081,8 @@ mod tests { fn subscription_check_stop_event() { let builder = TestClientBuilder::new(); let backend = builder.backend(); - let mut subs = SubscriptionsInner::new(10, Duration::from_secs(10), backend); + let mut subs = + SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend); let id = "abc".to_string(); @@ -1000,4 +1099,30 @@ mod tests { let res = sub_data.rx_stop.try_recv().unwrap(); assert!(res.is_some()); } + + #[test] + fn ongoing_operations() { + // The object can hold at most 2 operations. + let ops = LimitOperations::new(2); + + // One operation is reserved. + let permit_one = ops.reserve_at_most(1).unwrap(); + assert_eq!(permit_one.num_reserved(), 1); + + // Request 2 operations, however there is capacity only for one. + let permit_two = ops.reserve_at_most(2).unwrap(); + // Number of reserved permits is smaller than provided. + assert_eq!(permit_two.num_reserved(), 1); + + // Try to reserve operations when there's no space. + let permit = ops.reserve_at_most(1); + assert!(permit.is_none()); + + // Release capacity. + drop(permit_two); + + // Can reserve again + let permit_three = ops.reserve_at_most(1).unwrap(); + assert_eq!(permit_three.num_reserved(), 1); + } } diff --git a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 3aece6575ef66..39618ecfc1b3e 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -40,12 +40,14 @@ impl> SubscriptionManagement { pub fn new( global_max_pinned_blocks: usize, local_max_pin_duration: Duration, + max_ongoing_operations: usize, backend: Arc, ) -> Self { SubscriptionManagement { inner: RwLock::new(SubscriptionsInner::new( global_max_pinned_blocks, local_max_pin_duration, + max_ongoing_operations, backend, )), } @@ -110,15 +112,18 @@ impl> SubscriptionManagement { /// Ensure the block remains pinned until the return object is dropped. /// - /// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner. - /// Returns an error if the block hash is not pinned for the subscription or - /// the subscription ID is invalid. + /// Returns a [`BlockGuard`] that pins and unpins the block hash in RAII manner + /// and reserves capacity for ogoing operations. + /// + /// Returns an error if the block hash is not pinned for the subscription, + /// the subscription ID is invalid or the limit of ongoing operations was exceeded. pub fn lock_block( &self, sub_id: &str, hash: Block::Hash, + to_reserve: usize, ) -> Result, SubscriptionManagementError> { let mut inner = self.inner.write(); - inner.lock_block(sub_id, hash) + inner.lock_block(sub_id, hash, to_reserve) } } diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 6c3c343a10b53..4bda06d3cf01c 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -36,6 +36,7 @@ type Header = substrate_test_runtime_client::runtime::Header; type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; +const MAX_OPERATIONS: usize = 16; const CHAIN_GENESIS: [u8; 32] = [0; 32]; const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; @@ -79,8 +80,11 @@ async fn setup_api() -> ( backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -119,8 +123,11 @@ async fn follow_subscription_produces_blocks() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -177,8 +184,11 @@ async fn follow_with_runtime() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -285,8 +295,11 @@ async fn get_genesis() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -491,8 +504,11 @@ async fn call_runtime_without_flag() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1117,8 +1133,11 @@ async fn separate_operation_ids_for_subscriptions() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1194,8 +1213,11 @@ async fn follow_generates_initial_blocks() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1322,8 +1344,11 @@ async fn follow_exceeding_pinned_blocks() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - 2, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: 2, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1373,8 +1398,11 @@ async fn follow_with_unpin() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - 2, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: 2, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1454,8 +1482,11 @@ async fn follow_prune_best_block() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1611,8 +1642,11 @@ async fn follow_forks_pruned_block() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1725,8 +1759,11 @@ async fn follow_report_multiple_pruned_block() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -1930,8 +1967,11 @@ async fn pin_block_references() { backend.clone(), Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - 3, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: 3, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -2040,8 +2080,11 @@ async fn follow_finalized_before_new_block() { backend, Arc::new(TaskExecutor::default()), CHAIN_GENESIS, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECS), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + }, ) .into_rpc(); @@ -2119,3 +2162,100 @@ async fn follow_finalized_before_new_block() { }); assert_eq!(event, expected); } + +#[tokio::test] +async fn ensure_operation_limits_works() { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + // Configure the chainHead with maximum 1 ongoing operations. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: 1, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + let block_hash = format!("{:?}", block.header.hash()); + let key = hex_string(&KEY); + + let items = vec![ + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes }, + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsHashes }, + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues }, + StorageQuery { key: key.clone(), query_type: StorageQueryType::DescendantsValues }, + ]; + + let response: MethodResponse = api + .call("chainHead_unstable_storage", rpc_params![&sub_id, &block_hash, items]) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => { + // Check discarded items. + assert_eq!(started.discarded_items.unwrap(), 3); + started.operation_id + }, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + // No value associated with the provided key. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); + + // The storage is finished and capactiy must be released. + let alice_id = AccountKeyring::Alice.to_account_id(); + // Hex encoded scale encoded bytes representing the call parameters. + let call_parameters = hex_string(&alice_id.encode()); + let response: MethodResponse = api + .call( + "chainHead_unstable_call", + [&sub_id, &block_hash, "AccountNonceApi_account_nonce", &call_parameters], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + // Response propagated to `chainHead_follow`. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000" + ); +} diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index d4cc575afec89..b942ac58aa99b 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -74,11 +74,7 @@ use sp_consensus::block_validation::{ use sp_core::traits::{CodeExecutor, SpawnNamed}; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero}; -use std::{ - str::FromStr, - sync::Arc, - time::{Duration, SystemTime}, -}; +use std::{str::FromStr, sync::Arc, time::SystemTime}; /// Full client type. pub type TFullClient = @@ -636,24 +632,13 @@ where ) .into_rpc(); - // Maximum pinned blocks across all connections. - // This number is large enough to consider immediate blocks. - // Note: This should never exceed the `PINNING_CACHE_SIZE` from client/db. - const MAX_PINNED_BLOCKS: usize = 512; - - // Any block of any subscription should not be pinned more than - // this constant. When a subscription contains a block older than this, - // the subscription becomes subject to termination. - // Note: This should be enough for immediate blocks. - const MAX_PINNED_SECONDS: u64 = 60; - let chain_head_v2 = sc_rpc_spec_v2::chain_head::ChainHead::new( client.clone(), backend.clone(), task_executor.clone(), client.info().genesis_hash, - MAX_PINNED_BLOCKS, - Duration::from_secs(MAX_PINNED_SECONDS), + // Defaults to sensible limits for the `ChainHead`. + sc_rpc_spec_v2::chain_head::ChainHeadConfig::default(), ) .into_rpc();