Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into fix/ignore-product-without-price
Browse files Browse the repository at this point in the history
ali-bahjati authored Aug 22, 2024
2 parents e2071c1 + e4683c7 commit 687e130
Showing 11 changed files with 98 additions and 130 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.10.2"
version = "2.10.3"
edition = "2021"

[[bin]]
48 changes: 4 additions & 44 deletions config/config.sample.pythnet.toml
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@ listen_address = "127.0.0.1:8910"

[primary_network]

# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private
# endpoint should be used in most cases.
# HTTP(S) endpoint of the RPC node. Public Pythnet RPC endpoints are usually
# rate-limited, so a private endpoint should be used in most cases.
rpc_url = "https://api2.pythnet.pyth.network"

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
@@ -20,48 +20,8 @@ key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH"
# Oracle mapping pubkey
key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J"

# Pythnet accumulator key
# The pythnet accumulator key settings are only valid for pythnet. Do not apply these settings for any other environment i.e. mainnet, pythtest-conformance
key_store.accumulator_key = "7Vbmv1jt4vyuqBZcpYPpnVhrqVe5e6ZPb6JxDcffRHUM"

# IMPORTANT: Exporter batch size must be decreased to 7 to support
# larger accumulator transactions, when accumulator_key is set.
exporter.max_batch_size = 7

# Duration of the interval at which to publish updates
exporter.publish_interval_duration = "1s"

# Configuration for the optional secondary network this agent will publish data to.
# In most cases this should be a Solana endpoint.
[secondary_network]

# Please use other endpoints as these are rate limited
rpc_url = "https://api.mainnet-beta.solana.com"
wss_url = "wss://api.mainnet-beta.solana.com"

# Path to your publishing keypair.
key_store.publish_keypair_path = "/path/to/keypair.json"

# Oracle program pubkey
key_store.program_key = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH"

# Oracle mapping pubkey
key_store.mapping_key = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J"

# Compute unit limit requested per instruction for transactions that update the price.
# This should be an upper bound of the compute units a single upd_price instruction might consume.
# For solana mainnet, this should be set to 20000 instead of the default 40000 since there is no accumulator.
exporter.compute_unit_limit = 20000

# Whether the dynamic compute unit pricing is enabled.
# This is needed for solana to be able to land transactions on the network
# during periods of high network congestion.
exporter.dynamic_compute_unit_pricing_enabled = true

# Price per compute unit offered for update_price transactions
exporter.compute_unit_price_micro_lamports = 1000

exporter.maximum_compute_unit_price_micro_lamports = 100000
# Compute unit per price update.
exporter.compute_unit_limit = 5000

# Configuration for the JRPC API
[pythd_adapter]
33 changes: 4 additions & 29 deletions config/config.sample.pythtest.toml
Original file line number Diff line number Diff line change
@@ -3,12 +3,12 @@ listen_address = "127.0.0.1:8910"

[primary_network]

# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private
# endpoint should be used in most cases.
# HTTP(S) endpoint of the RPC node.
rpc_url = "https://api.pythtest.pyth.network"

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
# This can be omitted when oracle.subscriber_enabled is set to false.
# WS(S) endpoint of the RRC node. This is used to subscribe to account changes
# on the network. This can be omitted when oracle.subscriber_enabled is set to
# false.
wss_url = "wss://api.pythtest.pyth.network"

# Path to your publishing keypair.
@@ -28,31 +28,6 @@ key_store.mapping_key = "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z" # conform
# Duration of the interval at which to publish updates
exporter.publish_interval_duration = "400ms"

# Configuration for the optional secondary network this agent will publish data to.
# In most cases this should be a Solana endpoint.
[secondary_network]

# Please use other endpoints as these are rate limited
rpc_url = "https://api.testnet.solana.com"
wss_url = "wss://api.testnet.solana.com"

# Path to your publishing keypair.
key_store.publish_keypair_path = "/path/to/keypair.json"

# Oracle program pubkey
key_store.program_key = "8tfDNiaEyrV6Q1U4DEXrEigs9DoDtkugzFbybENEbCDz"

# Oracle mapping pubkey
key_store.mapping_key = "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z"

# Duration of the interval at which to publish updates. Default interval is 1 seconds.
# exporter.publish_interval_duration = "1s"

# Price per compute unit offered for update_price transactions.
# This is needed for solana to be able to land transactions on the network
# during periods of high network congestion.
exporter.compute_unit_price_micro_lamports = 1000

# Configuration for the JRPC API
[pythd_adapter]

22 changes: 17 additions & 5 deletions config/config.toml
Original file line number Diff line number Diff line change
@@ -6,17 +6,29 @@
# connection is not exposed for unauthorized access.
listen_address = "127.0.0.1:8910"

# Size of the buffer of each Server's channel on which `notify_price` events are
# received from the Price state.
# notify_price_tx_buffer = 10000

# Size of the buffer of each Server's channel on which `notify_price_sched` events are
# received from the Price state.
# notify_price_sched_tx_buffer = 10000

# Flush interval for responses and notifications. This is the maximum time the
# server will wait before flushing the messages to the client.
# flush_interval_duration = "50ms"

# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.
[primary_network]
### Required fields ###

# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually rate-limited, so a private endpoint should be used in most cases.
# Note that api.pythtest.pyth.network is a private endpoint: please contact us for access.
# HTTP(S) endpoint of the RPC node. Public RPC endpoints are usually
# rate-limited for Pythnet, and so a private endpoint should be used in most
# cases. For Pythtest, the public endpoint can be used.
rpc_url = "https://api.pythtest.pyth.network"

# WS(S) endpoint of the RRC node. This is used to subscribe to account changes on the network.
# This can be omitted when oracle.subscriber_enabled is set to false.
# Note that api.pythtest.pyth.network is a private endpoint: please contact us for access.
wss_url = "wss://api.pythtest.pyth.network"

# Path to the keypair used to publish price updates. If set to a
@@ -186,8 +198,8 @@ key_store.mapping_key = "RelevantOracleMappingAddress"
## Configuration for OpenTelemetry ##
[opentelemetry]

# Timeout in seconds for the OpenTelemetry exporter
exporter_timeout_secs = 3
# Timeout duration for the OpenTelemetry exporter
exporter_timeout_duration = "3s"

# Endpoint URL for the OpenTelemetry exporter
exporter_endpoint = "http://127.0.0.1:4317"
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "stable"
channel = "1.79.0"
profile = "minimal"
components = ["rustfmt", "clippy"]
10 changes: 7 additions & 3 deletions src/agent/config.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,10 @@ use {
File,
},
serde::Deserialize,
std::path::Path,
std::{
path::Path,
time::Duration,
},
};

/// Configuration for all components of the Agent
@@ -88,6 +91,7 @@ impl Default for ChannelCapacities {

#[derive(Deserialize, Debug)]
pub struct OpenTelemetryConfig {
pub exporter_timeout_secs: u64,
pub exporter_endpoint: String,
#[serde(with = "humantime_serde")]
pub exporter_timeout_duration: Duration,
pub exporter_endpoint: String,
}
40 changes: 29 additions & 11 deletions src/agent/pyth/rpc.rs
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ use {
fmt::Debug,
net::SocketAddr,
sync::Arc,
time::Duration,
},
tokio::sync::mpsc,
tracing::instrument,
@@ -115,6 +116,7 @@ async fn handle_connection<S>(
state: Arc<S>,
notify_price_tx_buffer: usize,
notify_price_sched_tx_buffer: usize,
flush_interval_duration: Duration,
) where
S: state::Prices,
S: Send,
@@ -127,6 +129,8 @@ async fn handle_connection<S>(
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
mpsc::channel(notify_price_sched_tx_buffer);

let mut flush_interval = tokio::time::interval(flush_interval_duration);

loop {
if let Err(err) = handle_next(
&*state,
@@ -136,6 +140,7 @@ async fn handle_connection<S>(
&mut notify_price_rx,
&mut notify_price_sched_tx,
&mut notify_price_sched_rx,
&mut flush_interval,
)
.await
{
@@ -159,6 +164,7 @@ async fn handle_next<S>(
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
flush_interval: &mut tokio::time::Interval,
) -> Result<()>
where
S: state::Prices,
@@ -183,13 +189,16 @@ where
}
}
Some(notify_price) = notify_price_rx.recv() => {
send_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
feed_notification(ws_tx, Method::NotifyPrice, Some(notify_price))
.await
}
Some(notify_price_sched) = notify_price_sched_rx.recv() => {
send_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
.await
}
_ = flush_interval.tick() => {
flush(ws_tx).await
}
}
}

@@ -229,9 +238,9 @@ where
// Send an array if we're handling a batch
// request, single response object otherwise
if is_batch {
send_text(ws_tx, &serde_json::to_string(&responses)?).await?;
feed_text(ws_tx, &serde_json::to_string(&responses)?).await?;
} else {
send_text(ws_tx, &serde_json::to_string(&responses[0])?).await?;
feed_text(ws_tx, &serde_json::to_string(&responses[0])?).await?;
}
}
// The top-level parsing errors are fine to share with client
@@ -354,21 +363,21 @@ async fn send_error(
error.to_string(),
None,
);
send_text(ws_tx, &response.to_string()).await
feed_text(ws_tx, &response.to_string()).await
}

async fn send_notification<T>(
async fn feed_notification<T>(
ws_tx: &mut SplitSink<WebSocket, Message>,
method: Method,
params: Option<T>,
) -> Result<()>
where
T: Sized + Serialize + DeserializeOwned,
{
send_request(ws_tx, IdReq::Notification, method, params).await
feed_request(ws_tx, IdReq::Notification, method, params).await
}

async fn send_request<I, T>(
async fn feed_request<I, T>(
ws_tx: &mut SplitSink<WebSocket, Message>,
id: I,
method: Method,
@@ -379,16 +388,20 @@ where
T: Sized + Serialize + DeserializeOwned,
{
let request = Request::with_params(id, method, params);
send_text(ws_tx, &request.to_string()).await
feed_text(ws_tx, &request.to_string()).await
}

async fn send_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
async fn feed_text(ws_tx: &mut SplitSink<WebSocket, Message>, msg: &str) -> Result<()> {
ws_tx
.send(Message::text(msg.to_string()))
.feed(Message::text(msg.to_string()))
.await
.map_err(|e| e.into())
}

async fn flush(ws_tx: &mut SplitSink<WebSocket, Message>) -> Result<()> {
ws_tx.flush().await.map_err(|e| e.into())
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
@@ -400,6 +413,9 @@ pub struct Config {
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
/// received from the Price state.
pub notify_price_sched_tx_buffer: usize,
/// Flush interval duration for the notifications.
#[serde(with = "humantime_serde")]
pub flush_interval_duration: Duration,
}

impl Default for Config {
@@ -408,6 +424,7 @@ impl Default for Config {
listen_address: "127.0.0.1:8910".to_string(),
notify_price_tx_buffer: 10000,
notify_price_sched_tx_buffer: 10000,
flush_interval_duration: Duration::from_millis(50),
}
}
}
@@ -448,6 +465,7 @@ where
state,
config.notify_price_tx_buffer,
config.notify_price_sched_tx_buffer,
config.flush_interval_duration,
)
.await
})
2 changes: 1 addition & 1 deletion src/agent/services/exporter.rs
Original file line number Diff line number Diff line change
@@ -262,7 +262,7 @@ mod exporter {
config.exporter.unchanged_publish_threshold,
).await {
if let Err(err) = publish_batches(
&*state,
state.clone(),
client.clone(),
network,
&network_state_rx,
62 changes: 32 additions & 30 deletions src/agent/state/exporter.rs
Original file line number Diff line number Diff line change
@@ -429,7 +429,7 @@ async fn estimate_compute_unit_price_micro_lamports(
)
)]
pub async fn publish_batches<S>(
state: &S,
state: Arc<S>,
client: Arc<RpcClient>,
network: Network,
network_state_rx: &watch::Receiver<NetworkState>,
@@ -466,7 +466,7 @@ where
let network_state = *network_state_rx.borrow();
for batch in batches {
batch_futures.push(publish_batch(
state,
state.clone(),
client.clone(),
network,
network_state,
@@ -494,7 +494,7 @@ where
.into_iter()
.collect::<Result<Vec<_>>>()?;

Exporter::record_publish(state, batch_state).await;
Exporter::record_publish(&*state, batch_state).await;
Ok(())
}

@@ -509,7 +509,7 @@ where
)
)]
async fn publish_batch<S>(
state: &S,
state: Arc<S>,
client: Arc<RpcClient>,
network: Network,
network_state: NetworkState,
@@ -535,7 +535,7 @@ where
let mut instructions = Vec::new();

// Refresh the data in the batch
let local_store_contents = LocalStore::get_all_price_infos(state).await;
let local_store_contents = LocalStore::get_all_price_infos(&*state).await;
let refreshed_batch = batch.iter().map(|(identifier, _)| {
(
identifier,
@@ -615,7 +615,7 @@ where
// Use the estimated previous price if it is higher
// than the current price.
let recent_compute_unit_price_micro_lamports =
Exporter::get_recent_compute_unit_price_micro_lamports(state).await;
Exporter::get_recent_compute_unit_price_micro_lamports(&*state).await;

if let Some(estimated_recent_price) = recent_compute_unit_price_micro_lamports {
// Get the estimated compute unit price and wrap it so it stays below the maximum
@@ -633,7 +633,7 @@ where
// in this batch. This will use the maximum total compute unit fee if the publisher
// hasn't updated for >= MAXIMUM_SLOT_GAP_FOR_DYNAMIC_COMPUTE_UNIT_PRICE slots.
let result = GlobalStore::price_accounts(
state,
&*state,
network,
price_accounts.clone().into_iter().collect(),
)
@@ -697,31 +697,33 @@ where
network_state.blockhash,
);

let signature = match client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await
{
Ok(signature) => signature,
Err(err) => {
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
return Ok(());
}
};
tokio::spawn(async move {
let signature = match client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
skip_preflight: true,
..RpcSendTransactionConfig::default()
},
)
.await
{
Ok(signature) => signature,
Err(err) => {
tracing::error!(err = ?err, "Exporter: failed to send transaction.");
return;
}
};

tracing::debug!(
signature = signature.to_string(),
instructions = instructions.len(),
price_accounts = ?price_accounts,
"Sent upd_price transaction.",
);
tracing::debug!(
signature = signature.to_string(),
instructions = instructions.len(),
price_accounts = ?price_accounts,
"Sent upd_price transaction.",
);

Transactions::add_transaction(state, signature).await;
Transactions::add_transaction(&*state, signature).await;
});

Ok(())
}
5 changes: 1 addition & 4 deletions src/bin/agent.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ use {
std::{
io::IsTerminal,
path::PathBuf,
time::Duration,
},
tracing_subscriber::{
prelude::*,
@@ -65,9 +64,7 @@ async fn main() -> Result<()> {
let otlp_exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&opentelemetry_config.exporter_endpoint)
.with_timeout(Duration::from_secs(
opentelemetry_config.exporter_timeout_secs,
));
.with_timeout(opentelemetry_config.exporter_timeout_duration);

// Set up the OpenTelemetry tracer
let tracer = opentelemetry_otlp::new_pipeline()

0 comments on commit 687e130

Please sign in to comment.