diff --git a/rust/theoros/src/handlers/websocket/subscribe_to_calldata.rs b/rust/theoros/src/handlers/websocket/subscribe_to_calldata.rs index 6a5e6557..138708af 100644 --- a/rust/theoros/src/handlers/websocket/subscribe_to_calldata.rs +++ b/rust/theoros/src/handlers/websocket/subscribe_to_calldata.rs @@ -19,10 +19,11 @@ use futures::{ }; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast::Receiver; +use utoipa::ToSchema; use crate::constants::{MAX_CLIENT_MESSAGE_SIZE, PING_INTERVAL_DURATION}; use crate::types::calldata::AsCalldata; -use crate::types::{hyperlane::NewUpdatesAvailableEvent, rpc::RpcDataFeed}; +use crate::types::hyperlane::NewUpdatesAvailableEvent; use crate::AppState; use crate::{configs::evm_config::EvmChainName, types::calldata::Calldata}; @@ -40,6 +41,15 @@ enum ClientMessage { Unsubscribe { ids: Vec }, } +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct RpcDataFeed { + pub feed_id: String, + /// The calldata binary represented as a hex string. + #[serde(skip_serializing_if = "Option::is_none")] + #[schema(value_type = Option)] + pub encoded_calldata: Option, +} + #[derive(Serialize, Debug, Clone)] #[serde(tag = "type")] enum ServerMessage { @@ -70,14 +80,9 @@ pub async fn ws_route_handler( async fn websocket_handler(stream: WebSocket, state: AppState) { let ws_state = state.ws.clone(); - // TODO: add new connection to metrics - let (sender, receiver) = stream.split(); - let feeds_receiver = state.storage.feeds_updated_tx().subscribe(); - let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst); - let mut subscriber = Subscriber::new(id, Arc::new(state), feeds_receiver, receiver, sender); subscriber.run().await; @@ -145,14 +150,6 @@ impl Subscriber { }, _ = self.ping_interval.tick() => { if !self.responded_to_ping { - // self.metrics - // .interactions - // .get_or_create(&Labels { - // interaction: Interaction::ClientHeartbeat, - // status: Status::Error, - // }) - // .inc(); - return Err(anyhow!("Subscriber did not respond to ping. Closing connection.")); } self.responded_to_ping = false; @@ -176,10 +173,12 @@ impl Subscriber { Calldata::build_from(self.state.as_ref(), self.active_chain.unwrap(), feed_id.to_owned()).await?; let message = serde_json::to_string(&ServerMessage::DataFeedUpdate { - data_feed: RpcDataFeed { feed_id: feed_id.clone(), calldata: Some(hex::encode(calldata.as_bytes())) }, + data_feed: RpcDataFeed { + feed_id: feed_id.clone(), + encoded_calldata: Some(hex::encode(calldata.as_bytes())), + }, })?; self.sender.send(message.into()).await?; - // TODO: success metric Ok(()) } @@ -229,13 +228,11 @@ impl Subscriber { Ok(ClientMessage::Subscribe { ids: feed_ids, chain_name }) => { let stored_feed_ids = self.state.storage.feed_ids(); - // TODO: Assert that the chain is supported? - // If there is a single feed id that is not found, we don't subscribe to any of the // asked feed ids and return an error to be more explicit and clear. match stored_feed_ids.contains_vec(&feed_ids).await { + // TODO: return multiple missing ids Some(missing_id) => { - // TODO: return multiple missing ids self.sender .send( serde_json::to_string(&ServerMessage::Response(ServerResponseMessage::Err { @@ -249,6 +246,7 @@ impl Subscriber { None => { for feed_id in feed_ids { self.data_feeds_with_config.insert(feed_id, DataFeedClientConfig {}); + // TODO: Assert that the chain is supported by theoros self.active_chain = Some(chain_name); } } diff --git a/rust/theoros/src/types/mod.rs b/rust/theoros/src/types/mod.rs index c1f9c34d..d6f6d340 100644 --- a/rust/theoros/src/types/mod.rs +++ b/rust/theoros/src/types/mod.rs @@ -1,4 +1,3 @@ pub mod calldata; pub mod hyperlane; -pub mod rpc; pub mod state; diff --git a/rust/theoros/src/types/rpc.rs b/rust/theoros/src/types/rpc.rs deleted file mode 100644 index 54feb797..00000000 --- a/rust/theoros/src/types/rpc.rs +++ /dev/null @@ -1,13 +0,0 @@ -use serde::{Deserialize, Serialize}; -use utoipa::ToSchema; - -pub type RpcDataFeedIdentifier = String; - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -pub struct RpcDataFeed { - pub feed_id: RpcDataFeedIdentifier, - /// The calldata binary represented as a hex string. - #[serde(skip_serializing_if = "Option::is_none")] - #[schema(value_type = Option)] - pub calldata: Option, -} diff --git a/rust/theoros/src/types/state.rs b/rust/theoros/src/types/state.rs index 330d7998..279cca37 100644 --- a/rust/theoros/src/types/state.rs +++ b/rust/theoros/src/types/state.rs @@ -9,7 +9,6 @@ use crate::{ #[derive(Clone)] pub struct AppState { - #[allow(unused)] pub starknet_rpc: Arc, pub hyperlane_validators_mapping: Arc, pub storage: Arc, diff --git a/solidity/test/PragmaDecoder.t.sol b/solidity/test/PragmaDecoder.t.sol index 093282af..52779d49 100644 --- a/solidity/test/PragmaDecoder.t.sol +++ b/solidity/test/PragmaDecoder.t.sol @@ -37,7 +37,7 @@ contract PragmaHarnessTest is Test { setupRaw(); // encoded update bytes memory encodedUpdate = - hex"0100000170030100c1ec5070f1a4868b8e6bfa5bbd31ac77605c5af1a739bc4e7758d4ca1d88fa8835c1460646b647c4c4403b324c2297a04d70b84888dc873021f80d6d70ed015e1c00031b8b0000000067225b1100611a3d0060240f2bccef7e64f920eec05c5bfffbc48c6ceaa4efca8748772b60cbafc30536953cdd0dd5b8e24428e4fb6eab5c143daba15f62b24606e50d822508faefd53032e26a3b1d1510dfe82a2ab8d6c0fc0f010dcdd3c410ba2f9fdad3479b1400031b8b15704e0efd1955cfe1c1182ba083bd5309707bdd795397cbbbb106cfc9b29bb001000100000000000000000000004254432f555344000000000000000000000000000000000000000067225b1100080800000000000000000000068aa5cb9d63000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004254432f5553440000000067225b11"; + hex"0100000170030100aeffc47b4d795e4978f92378ba5276697dd58fea699470add9e855cbb782b19b05da6229faf0cd99fef5a908b976102af271f6ec0e8d736b22df6dfd794fc3a81b0003c3de000000006726476300611a3d0060240f2bccef7e64f920eec05c5bfffbc48c6ceaa4efca8748772b60cbafc30536953cdd0dd5b8e24428e4fb6eab5c143daba15f62b24606e50d822508faefba5410a4a5555b80baf66026cec6481aa28a3291a2f25546b1187bf9ac18306d0003c3dec2e4e2ec168d8437f4c052570f20994259cd442d7068ae2ee2f33401e5243bb301006b00000000000000000000004254432f5553440000000000000000000000000000000000000000672647630008080000000000000000000000000000000000000000000000000000064fd736747b0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004254432f5553440000000067264763"; uint8 numUpdates = pragmaHarness.exposed_updateDataInfoFromUpdate(encodedUpdate); assertEq(numUpdates, 1, "Number of updates should be 1"); }