Skip to content

Commit

Permalink
fix: feed rpc responses and notifications instead of send and flush (#…
Browse files Browse the repository at this point in the history
…135)

* fix: feed rpc respones and notifications instead of send and flush

* refactor: use duration instead of _secs in config

* chore: bump version
  • Loading branch information
ali-bahjati authored Aug 1, 2024
1 parent efc17f5 commit 3852195
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.10.2"
version = "2.10.3"
edition = "2021"

[[bin]]
Expand Down
16 changes: 14 additions & 2 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
# connection is not exposed for unauthorized access.
listen_address = "127.0.0.1:8910"

# Size of the buffer of each Server's channel on which `notify_price` events are
# received from the Price state.
# notify_price_tx_buffer = 10000

# Size of the buffer of each Server's channel on which `notify_price_sched` events are
# received from the Price state.
# notify_price_sched_tx_buffer = 10000

# Flush interval for responses and notifications. This is the maximum time the
# server will wait before flushing the messages to the client.
# flush_interval_duration = "50ms"

# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.
[primary_network]
### Required fields ###
Expand Down Expand Up @@ -186,8 +198,8 @@ key_store.mapping_key = "RelevantOracleMappingAddress"
## Configuration for OpenTelemetry ##
[opentelemetry]

# Timeout in seconds for the OpenTelemetry exporter
exporter_timeout_secs = 3
# Timeout duration for the OpenTelemetry exporter
exporter_timeout_duration = "3s"

# Endpoint URL for the OpenTelemetry exporter
exporter_endpoint = "http://127.0.0.1:4317"
10 changes: 7 additions & 3 deletions src/agent/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use {
File,
},
serde::Deserialize,
std::path::Path,
std::{
path::Path,
time::Duration,
},
};

/// Configuration for all components of the Agent
Expand Down Expand Up @@ -88,6 +91,7 @@ impl Default for ChannelCapacities {

#[derive(Deserialize, Debug)]
pub struct OpenTelemetryConfig {
pub exporter_timeout_secs: u64,
pub exporter_endpoint: String,
#[serde(with = "humantime_serde")]
pub exporter_timeout_duration: Duration,
pub exporter_endpoint: String,
}
40 changes: 29 additions & 11 deletions src/agent/pyth/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use {
fmt::Debug,
net::SocketAddr,
sync::Arc,
time::Duration,
},
tokio::sync::mpsc,
tracing::instrument,
Expand Down Expand Up @@ -115,6 +116,7 @@ async fn handle_connection<S>(
state: Arc<S>,
notify_price_tx_buffer: usize,
notify_price_sched_tx_buffer: usize,
flush_interval_duration: Duration,
) where
S: state::Prices,
S: Send,
Expand All @@ -127,6 +129,8 @@ async fn handle_connection<S>(
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
mpsc::channel(notify_price_sched_tx_buffer);

let mut flush_interval = tokio::time::interval(flush_interval_duration);

loop {
if let Err(err) = handle_next(
&*state,
Expand All @@ -136,6 +140,7 @@ async fn handle_connection<S>(
&mut notify_price_rx,
&mut notify_price_sched_tx,
&mut notify_price_sched_rx,
&mut flush_interval,
)
.await
{
Expand All @@ -159,6 +164,7 @@ async fn handle_next<S>(
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
flush_interval: &mut tokio::time::Interval,
) -> Result<()>
where
S: state::Prices,
Expand All @@ -183,13 +189,16 @@ where
}
}
Some(notify_price) = notify_price_rx.recv() => {
send_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
feed_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
.await
}
Some(notify_price_sched) = notify_price_sched_rx.recv() => {
send_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
.await
}
_ = flush_interval.tick() => {
flush(ws_tx).await
}
}
}

Expand Down Expand Up @@ -229,9 +238,9 @@ where
// Send an array if we're handling a batch
// request, single response object otherwise
if is_batch {
send_text(ws_tx, &serde_json::to_string(&responses)?).await?;
feed_text(ws_tx, &serde_json::to_string(&responses)?).await?;
} else {
send_text(ws_tx, &serde_json::to_string(&responses[0])?).await?;
feed_text(ws_tx, &serde_json::to_string(&responses[0])?).await?;
}
}
// The top-level parsing errors are fine to share with client
Expand Down Expand Up @@ -354,21 +363,21 @@ async fn send_error(
error.to_string(),
None,
);
send_text(ws_tx, &response.to_string()).await
feed_text(ws_tx, &response.to_string()).await
}

async fn send_notification<T>(
async fn feed_notification<T>(
ws_tx: &mut SplitSink<WebSocket, Message>,
method: Method,
params: Option<T>,
) -> Result<()>
where
T: Sized + Serialize + DeserializeOwned,
{
send_request(ws_tx, IdReq::Notification, method, params).await
feed_request(ws_tx, IdReq::Notification, method, params).await
}

async fn send_request<I, T>(
async fn feed_request<I, T>(
ws_tx: &mut SplitSink<WebSocket, Message>,
id: I,
method: Method,
Expand All @@ -379,16 +388,20 @@ where
T: Sized + Serialize + DeserializeOwned,
{
let request = Request::with_params(id, method, params);
send_text(ws_tx, &request.to_string()).await
feed_text(ws_tx, &request.to_string()).await
}

async fn send_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
async fn feed_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
ws_tx
.send(Message::text(msg.to_string()))
.feed(Message::text(msg.to_string()))
.await
.map_err(|e| e.into())
}

async fn flush(ws_tx: &mut SplitSink<WebSocket, Message>) -> Result<()> {
ws_tx.flush().await.map_err(|e| e.into())
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
Expand All @@ -400,6 +413,9 @@ pub struct Config {
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
/// received from the Price state.
pub notify_price_sched_tx_buffer: usize,
/// Flush interval duration for the notifications.
#[serde(with = "humantime_serde")]
pub flush_interval_duration: Duration,
}

impl Default for Config {
Expand All @@ -408,6 +424,7 @@ impl Default for Config {
listen_address: "127.0.0.1:8910".to_string(),
notify_price_tx_buffer: 10000,
notify_price_sched_tx_buffer: 10000,
flush_interval_duration: Duration::from_millis(50),
}
}
}
Expand Down Expand Up @@ -448,6 +465,7 @@ where
state,
config.notify_price_tx_buffer,
config.notify_price_sched_tx_buffer,
config.flush_interval_duration,
)
.await
})
Expand Down
5 changes: 1 addition & 4 deletions src/bin/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use {
std::{
io::IsTerminal,
path::PathBuf,
time::Duration,
},
tracing_subscriber::{
prelude::*,
Expand Down Expand Up @@ -65,9 +64,7 @@ async fn main() -> Result<()> {
let otlp_exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&opentelemetry_config.exporter_endpoint)
.with_timeout(Duration::from_secs(
opentelemetry_config.exporter_timeout_secs,
));
.with_timeout(opentelemetry_config.exporter_timeout_duration);

// Set up the OpenTelemetry tracer
let tracer = opentelemetry_otlp::new_pipeline()
Expand Down

0 comments on commit 3852195

Please sign in to comment.