Skip to content

Commit

Permalink
feat: add publish_interval logic
Browse files Browse the repository at this point in the history
  • Loading branch information
keyvankhademi committed May 10, 2024
1 parent 3d35c1b commit b69d11d
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 47 deletions.
66 changes: 65 additions & 1 deletion integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@
},
"metadata": {"jump_id": "78876711", "jump_symbol": "SOLUSD", "price_exp": -8, "min_publishers": 1},
}
PYTH_USD = {
"account": "",
"attr_dict": {
"symbol": "Crypto.PYTH/USD",
"asset_type": "Crypto",
"base": "PYTH",
"quote_currency": "USD",
"generic_symbol": "PYTHUSD",
"description": "PYTH/USD",
"publish_interval": "2",
},
"metadata": {"jump_id": "78876712", "jump_symbol": "PYTHUSD", "price_exp": -8, "min_publishers": 1},
}
AAPL_USD = {
"account": "",
"attr_dict": {
Expand Down Expand Up @@ -110,7 +123,7 @@
},
"metadata": {"jump_id": "78876710", "jump_symbol": "ETHUSD", "price_exp": -8, "min_publishers": 1},
}
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD]
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD, PYTH_USD]

asyncio.set_event_loop(asyncio.new_event_loop())

Expand Down Expand Up @@ -293,6 +306,7 @@ def refdata_permissions(self, refdata_path):
"BTCUSD": {"price": ["some_publisher_b", "some_publisher_a"]}, # Reversed order helps ensure permission discovery works correctly for publisher A
"ETHUSD": {"price": ["some_publisher_b"]},
"SOLUSD": {"price": ["some_publisher_a"]},
"PYTHUSD": {"price": ["some_publisher_a"]},
}))
f.flush()
yield f.name
Expand Down Expand Up @@ -820,3 +834,53 @@ async def test_agent_respects_holiday_hours(self, client: PythAgentClient):
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"

@pytest.mark.asyncio
async def test_agent_respects_publish_interval(self, client: PythAgentClient):
'''
Similar to test_agent_respects_market_hours, but using PYTH_USD.
This test asserts that consecutive price updates will only get published
if it's after the specified publish interval.
'''

# Fetch all products
products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()}

# Find the product account ID corresponding to the AAPL/USD symbol
product = products[PYTH_USD["attr_dict"]["symbol"]]
product_account = product["account"]

# Get the price account with which to send updates
price_account = product["price_accounts"][0]["account"]

# Send an "update_price" request
await client.update_price(price_account, 42, 2, "trading")
time.sleep(1)

# Send another update_price request to "trigger" aggregation
# (aggregation would happen if publish interval were to fail, but
# we want to catch that happening if there's a problem)
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has not been updated
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"


# Send another update_price request to "trigger" aggregation
# Now it is after the publish interval, so the price should be updated
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has been updated
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 42
assert final_price_account["conf"] == 2
assert final_price_account["status"] == "trading"
14 changes: 8 additions & 6 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 6,
atype: 4,
Expand Down Expand Up @@ -499,8 +499,9 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
publish_interval: None,
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
)
Expand All @@ -522,7 +523,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 5,
atype: 3,
Expand Down Expand Up @@ -559,8 +560,9 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
publish_interval: None,
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD",
)
Expand Down
76 changes: 49 additions & 27 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
},
key_store,
network::Network,
oracle::PublisherPermission,
},
crate::agent::{
market_schedule::MarketSchedule,
Expand Down Expand Up @@ -174,7 +175,9 @@ pub fn spawn_exporter(
network: Network,
rpc_url: &str,
rpc_timeout: Duration,
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx: watch::Receiver<
HashMap<Pubkey, HashMap<Pubkey, PublisherPermission>>,
>,
key_store: KeyStore,
local_store_tx: Sender<store::local::Message>,
global_store_tx: Sender<store::global::Lookup>,
Expand Down Expand Up @@ -262,10 +265,11 @@ pub struct Exporter {
inflight_transactions_tx: Sender<Signature>,

/// publisher => { permissioned_price => market hours } as read by the oracle module
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx:
watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, PublisherPermission>>>,

/// Currently known permissioned prices of this publisher along with their market hours
our_prices: HashMap<Pubkey, MarketSchedule>,
our_prices: HashMap<Pubkey, PublisherPermission>,

/// Interval to update the dynamic price (if enabled)
dynamic_compute_unit_price_update_interval: Interval,
Expand All @@ -289,7 +293,9 @@ impl Exporter {
global_store_tx: Sender<store::global::Lookup>,
network_state_rx: watch::Receiver<NetworkState>,
inflight_transactions_tx: Sender<Signature>,
publisher_permissions_rx: watch::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
publisher_permissions_rx: watch::Receiver<
HashMap<Pubkey, HashMap<Pubkey, PublisherPermission>>,
>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -432,15 +438,23 @@ impl Exporter {
async fn get_permissioned_updates(&mut self) -> Result<Vec<(PriceIdentifier, PriceInfo)>> {
let local_store_contents = self.fetch_local_store_contents().await?;

let now = Utc::now().timestamp();
let publish_keypair = self.get_publish_keypair().await?;
self.update_our_prices(&publish_keypair.pubkey());

let now = Utc::now();

debug!(self.logger, "Exporter: filtering prices permissioned to us";
"our_prices" => format!("{:?}", self.our_prices.keys()),
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

// Filter the contents to only include information we haven't already sent,
// and to ignore stale information.
let fresh_updates = local_store_contents
Ok(local_store_contents
.into_iter()
.filter(|(_identifier, info)| {
// Filter out timestamps that are old
(now - info.timestamp) < self.config.staleness_threshold.as_secs() as i64
(now.timestamp() - info.timestamp) < self.config.staleness_threshold.as_secs() as i64
})
.filter(|(identifier, info)| {
// Filter out unchanged price data if the max delay wasn't reached
Expand All @@ -457,32 +471,15 @@ impl Exporter {
true // No prior data found, letting the price through
}
})
.collect::<Vec<_>>();

let publish_keypair = self.get_publish_keypair().await?;

self.update_our_prices(&publish_keypair.pubkey());

debug!(self.logger, "Exporter: filtering prices permissioned to us";
"our_prices" => format!("{:?}", self.our_prices.keys()),
"publish_pubkey" => publish_keypair.pubkey().to_string(),
);

// Get a fresh system time
let now = Utc::now();

// Filter out price accounts we're not permissioned to update
Ok(fresh_updates
.into_iter()
.filter(|(id, _data)| {
let key_from_id = Pubkey::from((*id).clone().to_bytes());
if let Some(schedule) = self.our_prices.get(&key_from_id) {
let ret = schedule.can_publish_at(&now);
if let Some(publisher_permission) = self.our_prices.get(&key_from_id) {
let ret = publisher_permission.schedule.can_publish_at(&now);

if !ret {
debug!(self.logger, "Exporter: Attempted to publish price outside market hours";
"price_account" => key_from_id.to_string(),
"schedule" => format!("{:?}", schedule),
"schedule" => format!("{:?}", publisher_permission.schedule),
"utc_time" => now.format("%c").to_string(),
);
}
Expand All @@ -501,6 +498,31 @@ impl Exporter {
false
}
})
.filter(|(id, info)| {
// Filtering out prices that are being updated too frequently according to publisher_permission.publish_interval
let last_info = self.last_published_state.get(id);
if last_info.is_none() {
// No prior data found, letting the price through
return true;
}
let last_info = last_info.unwrap();

let key_from_id = Pubkey::from((*id).clone().to_bytes());
let publisher_permisssion = self.our_prices.get(&key_from_id);
if publisher_permisssion.is_none() {
// Should never happen since we have filtered out the price above
return false;
}
let publisher_permission = publisher_permisssion.unwrap();

if let Some(publish_interval) = publisher_permission.publish_interval {
if info.timestamp.saturating_sub(last_info.timestamp) < publish_interval.as_secs() as i64 {
// Updating the price too soon after the last update, skipping
return false;
}
}
true
})
.collect::<Vec<_>>())
}

Expand Down
Loading

0 comments on commit b69d11d

Please sign in to comment.