Skip to content

Commit

Permalink
dev(better_theoros):
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Nov 2, 2024
1 parent 653d3dd commit efbed1e
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 35 deletions.
36 changes: 17 additions & 19 deletions rust/theoros/src/handlers/websocket/subscribe_to_calldata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use futures::{
};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::Receiver;
use utoipa::ToSchema;

use crate::constants::{MAX_CLIENT_MESSAGE_SIZE, PING_INTERVAL_DURATION};
use crate::types::calldata::AsCalldata;
use crate::types::{hyperlane::NewUpdatesAvailableEvent, rpc::RpcDataFeed};
use crate::types::hyperlane::NewUpdatesAvailableEvent;
use crate::AppState;
use crate::{configs::evm_config::EvmChainName, types::calldata::Calldata};

Expand All @@ -40,6 +41,15 @@ enum ClientMessage {
Unsubscribe { ids: Vec<String> },
}

#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct RpcDataFeed {
pub feed_id: String,
/// The calldata binary represented as a hex string.
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option<String>)]
pub encoded_calldata: Option<String>,
}

#[derive(Serialize, Debug, Clone)]
#[serde(tag = "type")]
enum ServerMessage {
Expand Down Expand Up @@ -70,14 +80,9 @@ pub async fn ws_route_handler(
async fn websocket_handler(stream: WebSocket, state: AppState) {
let ws_state = state.ws.clone();

// TODO: add new connection to metrics

let (sender, receiver) = stream.split();

let feeds_receiver = state.storage.feeds_updated_tx().subscribe();

let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst);

let mut subscriber = Subscriber::new(id, Arc::new(state), feeds_receiver, receiver, sender);

subscriber.run().await;
Expand Down Expand Up @@ -145,14 +150,6 @@ impl Subscriber {
},
_ = self.ping_interval.tick() => {
if !self.responded_to_ping {
// self.metrics
// .interactions
// .get_or_create(&Labels {
// interaction: Interaction::ClientHeartbeat,
// status: Status::Error,
// })
// .inc();

return Err(anyhow!("Subscriber did not respond to ping. Closing connection."));
}
self.responded_to_ping = false;
Expand All @@ -176,10 +173,12 @@ impl Subscriber {
Calldata::build_from(self.state.as_ref(), self.active_chain.unwrap(), feed_id.to_owned()).await?;

let message = serde_json::to_string(&ServerMessage::DataFeedUpdate {
data_feed: RpcDataFeed { feed_id: feed_id.clone(), calldata: Some(hex::encode(calldata.as_bytes())) },
data_feed: RpcDataFeed {
feed_id: feed_id.clone(),
encoded_calldata: Some(hex::encode(calldata.as_bytes())),
},
})?;
self.sender.send(message.into()).await?;
// TODO: success metric
Ok(())
}

Expand Down Expand Up @@ -229,13 +228,11 @@ impl Subscriber {
Ok(ClientMessage::Subscribe { ids: feed_ids, chain_name }) => {
let stored_feed_ids = self.state.storage.feed_ids();

// TODO: Assert that the chain is supported?

// If there is a single feed id that is not found, we don't subscribe to any of the
// asked feed ids and return an error to be more explicit and clear.
match stored_feed_ids.contains_vec(&feed_ids).await {
// TODO: return multiple missing ids
Some(missing_id) => {
// TODO: return multiple missing ids
self.sender
.send(
serde_json::to_string(&ServerMessage::Response(ServerResponseMessage::Err {
Expand All @@ -249,6 +246,7 @@ impl Subscriber {
None => {
for feed_id in feed_ids {
self.data_feeds_with_config.insert(feed_id, DataFeedClientConfig {});
// TODO: Assert that the chain is supported by theoros
self.active_chain = Some(chain_name);
}
}
Expand Down
1 change: 0 additions & 1 deletion rust/theoros/src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod calldata;
pub mod hyperlane;
pub mod rpc;
pub mod state;
13 changes: 0 additions & 13 deletions rust/theoros/src/types/rpc.rs

This file was deleted.

1 change: 0 additions & 1 deletion rust/theoros/src/types/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{

#[derive(Clone)]
pub struct AppState {
#[allow(unused)]
pub starknet_rpc: Arc<StarknetRpc>,
pub hyperlane_validators_mapping: Arc<HyperlaneValidatorsMapping>,
pub storage: Arc<TheorosStorage>,
Expand Down
2 changes: 1 addition & 1 deletion solidity/test/PragmaDecoder.t.sol
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ contract PragmaHarnessTest is Test {
setupRaw();
// encoded update
bytes memory encodedUpdate =
hex"0100000170030100c1ec5070f1a4868b8e6bfa5bbd31ac77605c5af1a739bc4e7758d4ca1d88fa8835c1460646b647c4c4403b324c2297a04d70b84888dc873021f80d6d70ed015e1c00031b8b0000000067225b1100611a3d0060240f2bccef7e64f920eec05c5bfffbc48c6ceaa4efca8748772b60cbafc30536953cdd0dd5b8e24428e4fb6eab5c143daba15f62b24606e50d822508faefd53032e26a3b1d1510dfe82a2ab8d6c0fc0f010dcdd3c410ba2f9fdad3479b1400031b8b15704e0efd1955cfe1c1182ba083bd5309707bdd795397cbbbb106cfc9b29bb001000100000000000000000000004254432f555344000000000000000000000000000000000000000067225b1100080800000000000000000000068aa5cb9d63000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004254432f5553440000000067225b11";
hex"0100000170030100aeffc47b4d795e4978f92378ba5276697dd58fea699470add9e855cbb782b19b05da6229faf0cd99fef5a908b976102af271f6ec0e8d736b22df6dfd794fc3a81b0003c3de000000006726476300611a3d0060240f2bccef7e64f920eec05c5bfffbc48c6ceaa4efca8748772b60cbafc30536953cdd0dd5b8e24428e4fb6eab5c143daba15f62b24606e50d822508faefba5410a4a5555b80baf66026cec6481aa28a3291a2f25546b1187bf9ac18306d0003c3dec2e4e2ec168d8437f4c052570f20994259cd442d7068ae2ee2f33401e5243bb301006b00000000000000000000004254432f5553440000000000000000000000000000000000000000672647630008080000000000000000000000000000000000000000000000000000064fd736747b0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004254432f5553440000000067264763";
uint8 numUpdates = pragmaHarness.exposed_updateDataInfoFromUpdate(encodedUpdate);
assertEq(numUpdates, 1, "Number of updates should be 1");
}
Expand Down

0 comments on commit efbed1e

Please sign in to comment.