Skip to content

Commit

Permalink
chainHead: Propagate results on the chainHead_follow (#1116)
Browse files Browse the repository at this point in the history
* rpc/types: Update chainHead events

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc: Sync chainHead methods with spec

Signed-off-by: Alexandru Vasile <[email protected]>

* testing: Adjust chainHead tests

Signed-off-by: Alexandru Vasile <[email protected]>

* tests: Ignore block related events to avoid flaky tests

Signed-off-by: Alexandru Vasile <[email protected]>

* tests: Adjust clippy

Signed-off-by: Alexandru Vasile <[email protected]>

* test: Remove unused OfflineClientT

Signed-off-by: Alexandru Vasile <[email protected]>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <[email protected]>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <[email protected]>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <[email protected]>

* Update subxt/src/rpc/types.rs

Co-authored-by: Niklas Adolfsson <[email protected]>

* types: Remove serde flags for serialization

Signed-off-by: Alexandru Vasile <[email protected]>

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Niklas Adolfsson <[email protected]>
  • Loading branch information
lexnv and niklasad1 authored Aug 9, 2023
1 parent d02afcd commit b97acc5
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 160 deletions.
46 changes: 25 additions & 21 deletions subxt/src/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{error::Error, utils::PhantomDataSendSync, Config, Metadata};

use super::{
rpc_params,
types::{self, ChainHeadEvent, ChainHeadStorageEvent, FollowEvent, StorageQuery},
types::{self, FollowEvent, StorageQuery},
RpcClient, RpcClientT, Subscription,
};

Expand Down Expand Up @@ -499,7 +499,10 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}

/// Subscribe to `chainHead_unstable_body` to obtain events regarding the block's body.
/// Call the `chainHead_unstable_body` method and return an operation ID to obtain the block's body.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
Expand All @@ -509,17 +512,16 @@ impl<T: Config> Rpc<T> {
&self,
subscription_id: String,
hash: T::Hash,
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
) -> Result<types::MethodResponse, Error> {
let response = self
.client
.subscribe(
.request(
"chainHead_unstable_body",
rpc_params![subscription_id, hash],
"chainHead_unstable_stopBody",
)
.await?;

Ok(subscription)
Ok(response)
}

/// Get the block's body using the `chainHead_unstable_header` method.
Expand All @@ -544,8 +546,10 @@ impl<T: Config> Rpc<T> {
Ok(header)
}

/// Subscribe to `chainHead_storage` to obtain events regarding the
/// block's storage.
/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the block's storage.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
Expand All @@ -557,7 +561,7 @@ impl<T: Config> Rpc<T> {
hash: T::Hash,
items: Vec<StorageQuery<&[u8]>>,
child_key: Option<&[u8]>,
) -> Result<Subscription<ChainHeadStorageEvent<Option<String>>>, Error> {
) -> Result<types::MethodResponse, Error> {
let items: Vec<StorageQuery<String>> = items
.into_iter()
.map(|item| StorageQuery {
Expand All @@ -566,20 +570,21 @@ impl<T: Config> Rpc<T> {
})
.collect();

let subscription = self
let response = self
.client
.subscribe(
.request(
"chainHead_unstable_storage",
rpc_params![subscription_id, hash, items, child_key.map(to_hex)],
"chainHead_unstable_stopStorage",
)
.await?;

Ok(subscription)
Ok(response)
}

/// Subscribe to `chainHead_call` to obtain events regarding the
/// runtime API call.
/// Call the `chainhead_unstable_storage` method and return an operation ID to obtain the runtime API result.
///
/// The response events are provided on the `chainHead_follow` subscription and identified by
/// the returned operation ID.
///
/// # Note
///
Expand All @@ -591,17 +596,16 @@ impl<T: Config> Rpc<T> {
hash: T::Hash,
function: String,
call_parameters: &[u8],
) -> Result<Subscription<ChainHeadEvent<String>>, Error> {
let subscription = self
) -> Result<types::MethodResponse, Error> {
let response = self
.client
.subscribe(
.request(
"chainHead_unstable_call",
rpc_params![subscription_id, hash, function, to_hex(call_parameters)],
"chainHead_unstable_stopCall",
)
.await?;

Ok(subscription)
Ok(response)
}

/// Unpin a block reported by the `chainHead_follow` subscription.
Expand Down
Loading

0 comments on commit b97acc5

Please sign in to comment.