Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add publish_interval logic #121

Merged
merged 6 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.6.2"
version = "2.7.0"
edition = "2021"

[[bin]]
Expand Down
66 changes: 65 additions & 1 deletion integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@
},
"metadata": {"jump_id": "78876711", "jump_symbol": "SOLUSD", "price_exp": -8, "min_publishers": 1},
}
PYTH_USD = {
"account": "",
"attr_dict": {
"symbol": "Crypto.PYTH/USD",
"asset_type": "Crypto",
"base": "PYTH",
"quote_currency": "USD",
"generic_symbol": "PYTHUSD",
"description": "PYTH/USD",
"publish_interval": "2",
},
"metadata": {"jump_id": "78876712", "jump_symbol": "PYTHUSD", "price_exp": -8, "min_publishers": 1},
}
AAPL_USD = {
"account": "",
"attr_dict": {
Expand Down Expand Up @@ -110,7 +123,7 @@
},
"metadata": {"jump_id": "78876710", "jump_symbol": "ETHUSD", "price_exp": -8, "min_publishers": 1},
}
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD]
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD, PYTH_USD]

asyncio.set_event_loop(asyncio.new_event_loop())

Expand Down Expand Up @@ -293,6 +306,7 @@ def refdata_permissions(self, refdata_path):
"BTCUSD": {"price": ["some_publisher_b", "some_publisher_a"]}, # Reversed order helps ensure permission discovery works correctly for publisher A
"ETHUSD": {"price": ["some_publisher_b"]},
"SOLUSD": {"price": ["some_publisher_a"]},
"PYTHUSD": {"price": ["some_publisher_a"]},
}))
f.flush()
yield f.name
Expand Down Expand Up @@ -820,3 +834,53 @@ async def test_agent_respects_holiday_hours(self, client: PythAgentClient):
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"

@pytest.mark.asyncio
async def test_agent_respects_publish_interval(self, client: PythAgentClient):
'''
Similar to test_agent_respects_market_hours, but using PYTH_USD.
This test asserts that consecutive price updates will only get published
if it's after the specified publish interval.
'''

# Fetch all products
products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()}

# Find the product account ID corresponding to the AAPL/USD symbol
product = products[PYTH_USD["attr_dict"]["symbol"]]
product_account = product["account"]

# Get the price account with which to send updates
price_account = product["price_accounts"][0]["account"]

# Send an "update_price" request
await client.update_price(price_account, 42, 2, "trading")
time.sleep(1)

# Send another update_price request to "trigger" aggregation
# (aggregation would happen if publish interval were to fail, but
# we want to catch that happening if there's a problem)
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has not been updated
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"


# Send another update_price request to "trigger" aggregation
# Now it is after the publish interval, so the price should be updated
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has been updated
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 42
assert final_price_account["conf"] == 2
assert final_price_account["status"] == "trading"
6 changes: 1 addition & 5 deletions src/agent/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,7 @@ impl MetricsServer {
};

let last_local_update_string = if let Some(local_data) = price_data.local_data {
if let Some(datetime) = DateTime::from_timestamp(local_data.timestamp, 0) {
datetime.format("%Y-%m-%d %H:%M:%S").to_string()
} else {
format!("Invalid timestamp {}", local_data.timestamp)
}
local_data.timestamp.format("%Y-%m-%d %H:%M:%S").to_string()
} else {
"no data".to_string()
};
Expand Down
6 changes: 2 additions & 4 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use {
},
warp::{
hyper::StatusCode,
reply::{
self,
},
reply,
Filter,
Rejection,
Reply,
Expand Down Expand Up @@ -428,7 +426,7 @@ impl PriceLocalMetrics {
.get_or_create(&PriceLocalLabels {
pubkey: price_key.to_string(),
})
.set(price_info.timestamp);
.set(price_info.timestamp.and_utc().timestamp());
update_count
.get_or_create(&PriceLocalLabels {
pubkey: price_key.to_string(),
Expand Down
14 changes: 8 additions & 6 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 6,
atype: 4,
Expand Down Expand Up @@ -499,8 +499,9 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
publish_interval: None,
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
)
Expand All @@ -522,7 +523,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 5,
atype: 3,
Expand Down Expand Up @@ -559,8 +560,9 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
publish_interval: None,
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD",
)
Expand Down
2 changes: 1 addition & 1 deletion src/agent/pythd/adapter/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ impl AdapterApi for Adapter {
status: Adapter::map_status(&status)?,
price,
conf,
timestamp: Utc::now().timestamp(),
timestamp: Utc::now().naive_utc(),
},
})
.await
Expand Down
97 changes: 58 additions & 39 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ use {
},
key_store,
network::Network,
oracle::PricePublishingMetadata,
},
crate::agent::{
market_schedule::MarketSchedule,
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
crate::agent::remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
anyhow::{
anyhow,
Expand Down Expand Up @@ -68,7 +66,6 @@ use {
sync::{
mpsc::{
self,
error::TryRecvError,
Sender,
},
oneshot,
Expand Down Expand Up @@ -174,7 +171,9 @@ pub fn spawn_exporter(
network: Network,
rpc_url: &str,
rpc_timeout: Duration,
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx: watch::Receiver<
HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
>,
key_store: KeyStore,
local_store_tx: Sender<store::local::Message>,
global_store_tx: Sender<store::global::Lookup>,
Expand Down Expand Up @@ -262,10 +261,11 @@ pub struct Exporter {
inflight_transactions_tx: Sender<Signature>,

/// publisher => { permissioned_price => market hours } as read by the oracle module
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx:
watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>>,

/// Currently known permissioned prices of this publisher along with their market hours
our_prices: HashMap<Pubkey, MarketSchedule>,
our_prices: HashMap<Pubkey, PricePublishingMetadata>,

/// Interval to update the dynamic price (if enabled)
dynamic_compute_unit_price_update_interval: Interval,
Expand All @@ -289,7 +289,9 @@ impl Exporter {
global_store_tx: Sender<store::global::Lookup>,
network_state_rx: watch::Receiver<NetworkState>,
inflight_transactions_tx: Sender<Signature>,
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx: watch::Receiver<
HashMap<Pubkey, HashMap<Pubkey, PricePublishingMetadata>>,
>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -432,22 +434,30 @@ impl Exporter {
async fn get_permissioned_updates(&mut self) -> Result<Vec<(PriceIdentifier, PriceInfo)>> {
let local_store_contents = self.fetch_local_store_contents().await?;

let now = Utc::now().timestamp();
let publish_keypair = self.get_publish_keypair().await?;
self.update_our_prices(&publish_keypair.pubkey());

let now = Utc::now().naive_utc();

debug!(self.logger, "Exporter: filtering prices permissioned to us";
"our_prices" => format!("{:?}", self.our_prices.keys()),
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

// Filter the contents to only include information we haven't already sent,
// and to ignore stale information.
let fresh_updates = local_store_contents
Ok(local_store_contents
.into_iter()
.filter(|(_identifier, info)| {
// Filter out timestamps that are old
(now - info.timestamp) < self.config.staleness_threshold.as_secs() as i64
now < info.timestamp + self.config.staleness_threshold
})
.filter(|(identifier, info)| {
// Filter out unchanged price data if the max delay wasn't reached

if let Some(last_info) = self.last_published_state.get(identifier) {
if info.timestamp.saturating_sub(last_info.timestamp)
> self.config.unchanged_publish_threshold.as_secs() as i64
if info.timestamp
> last_info.timestamp + self.config.unchanged_publish_threshold
{
true // max delay since last published state reached, we publish anyway
} else {
Expand All @@ -457,33 +467,17 @@ impl Exporter {
true // No prior data found, letting the price through
}
})
.collect::<Vec<_>>();

let publish_keypair = self.get_publish_keypair().await?;

self.update_our_prices(&publish_keypair.pubkey());

debug!(self.logger, "Exporter: filtering prices permissioned to us";
"our_prices" => format!("{:?}", self.our_prices.keys()),
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

// Get a fresh system time
let now = Utc::now();

// Filter out price accounts we're not permissioned to update
Ok(fresh_updates
.into_iter()
.filter(|(id, _data)| {
let key_from_id = Pubkey::from((*id).clone().to_bytes());
if let Some(schedule) = self.our_prices.get(&key_from_id) {
let ret = schedule.can_publish_at(&now);
if let Some(publisher_permission) = self.our_prices.get(&key_from_id) {
let now_utc = Utc::now();
let ret = publisher_permission.schedule.can_publish_at(&now_utc);

if !ret {
debug!(self.logger, "Exporter: Attempted to publish price outside market hours";
"price_account" => key_from_id.to_string(),
"schedule" => format!("{:?}", schedule),
"utc_time" => now.format("%c").to_string(),
"schedule" => format!("{:?}", publisher_permission.schedule),
"utc_time" => now_utc.format("%c").to_string(),
);
}

Expand All @@ -501,6 +495,31 @@ impl Exporter {
false
}
})
.filter(|(id, info)| {
// Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval
let last_info = self.last_published_state.get(id);
if last_info.is_none() {
// No prior data found, letting the price through
return true;
}
let last_info = last_info.unwrap();
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved

let key_from_id = Pubkey::from((*id).clone().to_bytes());
let publisher_permisssion = self.our_prices.get(&key_from_id);
if publisher_permisssion.is_none() {
// Should never happen since we have filtered out the price above
return false;
}
let publisher_permission = publisher_permisssion.unwrap();
keyvankhademi marked this conversation as resolved.
Show resolved Hide resolved

if let Some(publish_interval) = publisher_permission.publish_interval {
if info.timestamp < last_info.timestamp + publish_interval {
// Updating the price too soon after the last update, skipping
return false;
}
}
true
})
.collect::<Vec<_>>())
}

Expand Down Expand Up @@ -623,9 +642,9 @@ impl Exporter {
let network_state = *self.network_state_rx.borrow();
for (identifier, price_info_result) in refreshed_batch {
let price_info = price_info_result?;
let now = Utc::now().naive_utc();

let stale_price = (Utc::now().timestamp() - price_info.timestamp)
> self.config.staleness_threshold.as_secs() as i64;
let stale_price = now > price_info.timestamp + self.config.staleness_threshold;
if stale_price {
continue;
}
Expand Down
Loading
Loading