Skip to content

Commit

Permalink
rpc v2: submitAndWatch replace old messages if it's lagging (#4901)
Browse files Browse the repository at this point in the history
Close #3076

The fix is really just that older messages are replaced if the client
can't keep up with the server instead.
Because I wanted the same functionality as `pipe_from_stream` for both
pending/subscription I added two wrapper types on-top of the types from
jsonrpsee to make it nicer.

I added a trait `Buffer` so I could still use pipe_from_stream but that
abstraction is a little leaky but only to avoid adding an identical
method/function with another strategy...
  • Loading branch information
niklasad1 authored Jul 29, 2024
1 parent 0636ffd commit fc10887
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 156 deletions.
10 changes: 8 additions & 2 deletions substrate/client/consensus/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use parking_lot::RwLock;
use sp_consensus_beefy::AuthorityIdBound;
use std::sync::Arc;

use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::{BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
use sp_application_crypto::RuntimeAppPublic;
use sp_runtime::traits::Block as BlockT;

Expand Down Expand Up @@ -145,7 +148,10 @@ where
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block, AuthorityId>(vfp));

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
);
}

async fn latest_finalized(&self) -> Result<Block::Hash, Error> {
Expand Down
10 changes: 8 additions & 2 deletions substrate/client/consensus/grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use finality::{EncodedFinalityProof, RpcFinalityProofProvider};
use notification::JustificationNotification;
use report::{ReportAuthoritySet, ReportVoterState, ReportedRoundStates};
use sc_consensus_grandpa::GrandpaJustificationStream;
use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::{BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};

/// Provides RPC methods for interacting with GRANDPA.
Expand Down Expand Up @@ -108,7 +111,10 @@ where
},
);

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
);
}

async fn prove_finality(
Expand Down
11 changes: 5 additions & 6 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ use codec::Encode;
use futures::{channel::oneshot, future::FutureExt};
use jsonrpsee::{
core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
MethodResponseFuture, PendingSubscriptionSink, SubscriptionSink,
MethodResponseFuture, PendingSubscriptionSink,
};
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::to_sub_message;
use sc_rpc::utils::Subscription;
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
}

/// Helper to convert the `subscription ID` to a string.
pub fn read_subscription_id_as_string(sink: &SubscriptionSink) -> String {
pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
match sink.subscription_id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.into_owned().into(),
Expand Down Expand Up @@ -213,7 +213,7 @@ where
return
};

let Ok(sink) = pending.accept().await else { return };
let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };

let sub_id = read_subscription_id_as_string(&sink);
// Keep track of the subscription.
Expand All @@ -223,8 +223,7 @@ where
// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
// subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return
};
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
Expand Down
22 changes: 8 additions & 14 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ use futures::{
stream::{self, Stream, StreamExt},
};
use futures_util::future::Either;
use jsonrpsee::SubscriptionSink;
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::to_sub_message;
use sc_rpc::utils::Subscription;
use schnellru::{ByLength, LruMap};
use sp_api::CallApiAt;
use sp_blockchain::{
Expand Down Expand Up @@ -597,7 +596,7 @@ where
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
sink: SubscriptionSink,
sink: Subscription,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
where
Expand Down Expand Up @@ -632,23 +631,20 @@ where
self.sub_id,
err
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
_ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};

for event in events {
let msg = to_sub_message(&sink, &event);
if let Err(err) = sink.send(msg).await {
if let Err(err) = sink.send(&event).await {
// Failed to submit event.
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
);

let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
// No need to propagate this error further, the client disconnected.
return Ok(())
}
Expand All @@ -662,15 +658,14 @@ where
// - the substrate streams have closed
// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
// - the client disconnected.
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
Ok(())
}

/// Generate the block events for the `chainHead_follow` method.
pub async fn generate_events(
&mut self,
sink: SubscriptionSink,
sink: Subscription,
sub_data: InsertedSubscriptionData<Block>,
) -> Result<(), SubscriptionManagementError> {
// Register for the new block and finalized notifications.
Expand Down Expand Up @@ -698,8 +693,7 @@ where
self.sub_id,
err
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use codec::Encode;
use jsonrpsee::rpc_params;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool};
use sp_core::H256;
use std::sync::Arc;
use std::{sync::Arc, vec};
use substrate_test_runtime_client::AccountKeyring::*;
use substrate_test_runtime_transaction_pool::uxt;

Expand Down Expand Up @@ -149,3 +149,89 @@ async fn tx_with_pruned_best_block() {
let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::Finalized(TransactionBlock { hash: block_2, index: 0 }));
}

#[tokio::test]
async fn tx_slow_client_replace_old_messages() {
let (api, pool, client, tx_api, _exec_middleware, _pool_middleware) = setup_api_tx();
let block_1_header = api.push_block(1, vec![], true);
client.set_best_block(block_1_header.hash(), 1);

let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());

// The subscription itself has a buffer of length 1 and no way to create
// it without a buffer.
//
// Then `transactionWatch` has its own buffer of length 3 which leads to
// that it's limited to 5 items in the tests.
//
// 1. Send will complete immediately
// 2. Send will be pending in the subscription sink (not possible to cancel)
// 3. The rest of messages will be kept in a RingBuffer and older messages are replaced by newer
// items.
let mut sub = tx_api
.subscribe("transactionWatch_v1_submitAndWatch", rpc_params![&xt], 1)
.await
.unwrap();

// Import block 2 with the transaction included.
let block = api.push_block(2, vec![uxt.clone()], true);
let block_hash = block.hash();
let event = ChainEvent::NewBestBlock { hash: block_hash, tree_route: None };
pool.inner_pool.maintain(event).await;

let mut block2_hash = None;

// Import block 2 again without the transaction included.
for _ in 0..10 {
let block_not_imported = api.push_block(2, vec![], true);
let event = ChainEvent::NewBestBlock { hash: block_not_imported.hash(), tree_route: None };
pool.inner_pool.maintain(event).await;

let block2 = api.push_block(2, vec![uxt.clone()], true);
block2_hash = Some(block2.hash());
let event = ChainEvent::NewBestBlock { hash: block2.hash(), tree_route: None };

pool.inner_pool.maintain(event).await;
}

let block2_hash = block2_hash.unwrap();

// Finalize the transaction
let event = ChainEvent::Finalized { hash: block2_hash, tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;

// Hack to mimic a slow client.
tokio::time::sleep(std::time::Duration::from_secs(10)).await;

// Read the events.
let mut res: Vec<TransactionEvent<_>> = Vec::new();

while let Some(item) = tokio::time::timeout(std::time::Duration::from_secs(5), sub.next())
.await
.unwrap()
{
let (ev, _) = item.unwrap();
res.push(ev);
}

// BestBlockIncluded(None) is dropped and not seen.
let exp = vec![
// First message
TransactionEvent::Validated,
// Second message
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: block_hash,
index: 0,
})),
// Most recent 3 messages.
TransactionEvent::Validated,
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: block2_hash,
index: 0,
})),
TransactionEvent::Finalized(TransactionBlock { hash: block2_hash, index: 0 }),
];

assert_eq!(res, exp);
}
27 changes: 16 additions & 11 deletions substrate/client/rpc-spec-v2/src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
use codec::Decode;
use futures::{StreamExt, TryFutureExt};
use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
use sc_rpc::utils::{pipe_from_stream, to_sub_message};
use sc_rpc::utils::{RingBuffer, Subscription};
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
TransactionStatus,
Expand Down Expand Up @@ -84,16 +84,14 @@ where
Err(e) => {
log::debug!(target: LOG_TARGET, "Extrinsic bytes cannot be decoded: {:?}", e);

let Ok(sink) = pending.accept().await else { return };
let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };

// The transaction is invalid.
let msg = to_sub_message(
&sink,
&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
let _ = sink
.send(&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
error: "Extrinsic bytes cannot be decoded".into(),
}),
);
let _ = sink.send(msg).await;
}))
.await;
return
},
};
Expand All @@ -108,16 +106,23 @@ where
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
});

let Ok(sink) = pending.accept().await.map(Subscription::from) else {
return;
};

match submit.await {
Ok(stream) => {
let stream = stream.filter_map(move |event| async move { handle_event(event) });
pipe_from_stream(pending, stream.boxed()).await;
let stream =
stream.filter_map(move |event| async move { handle_event(event) }).boxed();

// If the subscription is too slow older events will be overwritten.
sink.pipe_from_stream(stream, RingBuffer::new(3)).await;
},
Err(err) => {
// We have not created an `Watcher` for the tx. Make sure the
// error is still propagated as an event.
let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
pipe_from_stream(pending, futures::stream::once(async { event }).boxed()).await;
_ = sink.send(&event).await;
},
};
};
Expand Down
6 changes: 4 additions & 2 deletions substrate/client/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod tests;
use std::sync::Arc;

use crate::{
utils::{pipe_from_stream, spawn_subscription_task},
utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};

Expand Down Expand Up @@ -202,7 +202,9 @@ where
},
};

pipe_from_stream(pending, stream).await;
PendingSubscription::from(pending)
.pipe_from_stream(stream, BoundedVecDeque::default())
.await;
};

spawn_subscription_task(&self.executor, fut);
Expand Down
8 changes: 5 additions & 3 deletions substrate/client/rpc/src/chain/chain_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use super::{client_err, ChainBackend, Error};
use crate::{
utils::{pipe_from_stream, spawn_subscription_task},
utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
use std::{marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -142,6 +142,8 @@ fn subscribe_headers<Block, Client, F, G, S>(
// we set up the stream and chain it to the stream. Consuming code would need to handle
// duplicates at the beginning of the stream though.
let stream = stream::iter(maybe_header).chain(stream());

spawn_subscription_task(executor, pipe_from_stream(pending, stream));
spawn_subscription_task(
executor,
PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
);
}
Loading

0 comments on commit fc10887

Please sign in to comment.