Skip to content

Commit

Permalink
feat: integrate holiday schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
keyvankhademi committed Apr 9, 2024
1 parent dbea071 commit 574a444
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 34 deletions.
53 changes: 52 additions & 1 deletion integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from datetime import datetime
import json
import os
import requests
Expand Down Expand Up @@ -63,9 +64,23 @@
"quote_currency": "USD",
"generic_symbol": "BTCUSD",
"description": "BTC/USD",
"holidays": f"{datetime.now().strftime('%m%d')}/O"
},
"metadata": {"jump_id": "78876709", "jump_symbol": "BTCUSD", "price_exp": -8, "min_publishers": 1},
}
SOL_USD = {
"account": "",
"attr_dict": {
"symbol": "Crypto.SOL/USD",
"asset_type": "Crypto",
"base": "SOL",
"quote_currency": "USD",
"generic_symbol": "SOLUSD",
"description": "SOL/USD",
"holidays": f"{datetime.now().strftime('%m%d')}/C"
},
"metadata": {"jump_id": "78876711", "jump_symbol": "SOLUSD", "price_exp": -8, "min_publishers": 1},
}
AAPL_USD = {
"account": "",
"attr_dict": {
Expand Down Expand Up @@ -95,7 +110,7 @@
},
"metadata": {"jump_id": "78876710", "jump_symbol": "ETHUSD", "price_exp": -8, "min_publishers": 1},
}
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD]
ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD]

asyncio.set_event_loop(asyncio.new_event_loop())

Expand Down Expand Up @@ -277,6 +292,7 @@ def refdata_permissions(self, refdata_path):
"AAPL": {"price": ["some_publisher_a"]},
"BTCUSD": {"price": ["some_publisher_b", "some_publisher_a"]}, # Reversed order helps ensure permission discovery works correctly for publisher A
"ETHUSD": {"price": ["some_publisher_b"]},
"SOLUSD": {"price": ["some_publisher_a"]},
}))
f.flush()
yield f.name
Expand Down Expand Up @@ -769,3 +785,38 @@ async def test_agent_respects_market_hours(self, client: PythAgentClient):
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"

@pytest.mark.asyncio
async def test_agent_respects_holiday_hours(self, client: PythAgentClient):
'''
Similar to test_agent_respects_market_hours, but using SOL_USD and
asserting that nothing is published due to the symbol's all-closed holiday.
'''

# Fetch all products
products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()}

# Find the product account ID corresponding to the AAPL/USD symbol
product = products[SOL_USD["attr_dict"]["symbol"]]
product_account = product["account"]

# Get the price account with which to send updates
price_account = product["price_accounts"][0]["account"]

# Send an "update_price" request
await client.update_price(price_account, 42, 2, "trading")
time.sleep(2)

# Send another update_price request to "trigger" aggregation
# (aggregation would happen if market hours were to fail, but
# we want to catch that happening if there's a problem)
await client.update_price(price_account, 81, 1, "trading")
time.sleep(2)

# Confirm that the price account has not been updated
final_product_state = await client.get_product(product_account)

final_price_account = final_product_state["price_accounts"][0]
assert final_price_account["price"] == 0
assert final_price_account["conf"] == 0
assert final_price_account["status"] == "unknown"
3 changes: 1 addition & 2 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ Note that there is an Oracle and Exporter for each network, but only one Local S
################################################################################################################################## */

pub mod dashboard;
pub mod holiday_hours;
pub mod market_hours;
pub mod metrics;
pub mod pythd;
pub mod remote_keypair_loader;
pub mod schedule;
pub mod solana;
pub mod store;
use {
Expand Down
12 changes: 6 additions & 6 deletions src/agent/pythd/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 6,
atype: 4,
Expand Down Expand Up @@ -997,8 +997,8 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
weekly_schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU",
)
Expand All @@ -1020,7 +1020,7 @@ mod tests {
)
.unwrap(),
solana::oracle::ProductEntry {
account_data: pyth_sdk_solana::state::ProductAccount {
account_data: pyth_sdk_solana::state::ProductAccount {
magic: 0xa1b2c3d4,
ver: 5,
atype: 3,
Expand Down Expand Up @@ -1057,8 +1057,8 @@ mod tests {
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
],
},
weekly_schedule: Default::default(),
price_accounts: vec![
schedule: Default::default(),
price_accounts: vec![
solana_sdk::pubkey::Pubkey::from_str(
"GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD",
)
Expand Down
14 changes: 14 additions & 0 deletions src/agent/schedule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pub mod holiday_hours;
pub mod market_hours;


use crate::agent::schedule::{
holiday_hours::HolidaySchedule,
market_hours::WeeklySchedule,
};

#[derive(Debug, Clone, Default)]
pub struct Schedule {
pub market_hours: WeeklySchedule,
pub holiday_hours: HolidaySchedule,
}
File renamed without changes.
File renamed without changes.
20 changes: 11 additions & 9 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use {
network::Network,
},
crate::agent::{
market_hours::WeeklySchedule,
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
schedule::Schedule,
},
anyhow::{
anyhow,
Expand Down Expand Up @@ -66,8 +66,8 @@ use {
},
tokio::{
sync::{
mpsc,
mpsc::{
self,
error::TryRecvError,
Sender,
},
Expand Down Expand Up @@ -172,7 +172,7 @@ pub fn spawn_exporter(
network: Network,
rpc_url: &str,
rpc_timeout: Duration,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
key_store: KeyStore,
local_store_tx: Sender<store::local::Message>,
global_store_tx: Sender<store::global::Lookup>,
Expand Down Expand Up @@ -260,10 +260,10 @@ pub struct Exporter {
inflight_transactions_tx: Sender<Signature>,

/// publisher => { permissioned_price => market hours } as read by the oracle module
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,

/// Currently known permissioned prices of this publisher along with their market hours
our_prices: HashMap<Pubkey, WeeklySchedule>,
our_prices: HashMap<Pubkey, Schedule>,

/// Interval to update the dynamic price (if enabled)
dynamic_compute_unit_price_update_interval: Interval,
Expand All @@ -287,7 +287,7 @@ impl Exporter {
global_store_tx: Sender<store::global::Lookup>,
network_state_rx: watch::Receiver<NetworkState>,
inflight_transactions_tx: Sender<Signature>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -474,13 +474,15 @@ impl Exporter {
.into_iter()
.filter(|(id, _data)| {
let key_from_id = Pubkey::from((*id).clone().to_bytes());
if let Some(weekly_schedule) = self.our_prices.get(&key_from_id) {
let ret = weekly_schedule.can_publish_at(&now);
if let Some(schedule) = self.our_prices.get_mut(&key_from_id) {
// let ret = schedule.market_hours.can_publish_at(&now);
schedule.holiday_hours.timezone = Some(schedule.market_hours.timezone);
let ret = schedule.market_hours.can_publish_at(&now) && schedule.holiday_hours.can_publish_at(now);

if !ret {
debug!(self.logger, "Exporter: Attempted to publish price outside market hours";
"price_account" => key_from_id.to_string(),
"weekly_schedule" => format!("{:?}", weekly_schedule),
"schedule" => format!("{:?}", schedule),
"utc_time" => now.format("%c").to_string(),
);
}
Expand Down
56 changes: 40 additions & 16 deletions src/agent/solana/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use {
self::subscriber::Subscriber,
super::key_store::KeyStore,
crate::agent::{
market_hours::WeeklySchedule,
schedule::{
holiday_hours::HolidaySchedule,
market_hours::WeeklySchedule,
Schedule,
},
store::global,
},
anyhow::{
Expand Down Expand Up @@ -117,15 +121,15 @@ pub struct Data {
pub product_accounts: HashMap<Pubkey, ProductEntry>,
pub price_accounts: HashMap<Pubkey, PriceEntry>,
/// publisher => {their permissioned price accounts => market hours}
pub publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>,
pub publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, Schedule>>,
}

impl Data {
fn new(
mapping_accounts: HashMap<Pubkey, MappingAccount>,
product_accounts: HashMap<Pubkey, ProductEntry>,
price_accounts: HashMap<Pubkey, PriceEntry>,
publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>,
publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, Schedule>>,
) -> Self {
Data {
mapping_accounts,
Expand All @@ -139,9 +143,9 @@ impl Data {
pub type MappingAccount = pyth_sdk_solana::state::MappingAccount;
#[derive(Debug, Clone)]
pub struct ProductEntry {
pub account_data: pyth_sdk_solana::state::ProductAccount,
pub weekly_schedule: WeeklySchedule,
pub price_accounts: Vec<Pubkey>,
pub account_data: pyth_sdk_solana::state::ProductAccount,
pub schedule: Schedule,
pub price_accounts: Vec<Pubkey>,
}

// Oracle is responsible for fetching Solana account data stored in the Pyth on-chain Oracle.
Expand Down Expand Up @@ -203,7 +207,7 @@ pub fn spawn_oracle(
wss_url: &str,
rpc_timeout: Duration,
global_store_update_tx: mpsc::Sender<global::Update>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
key_store: KeyStore,
logger: Logger,
) -> Vec<JoinHandle<()>> {
Expand Down Expand Up @@ -418,7 +422,7 @@ struct Poller {
data_tx: mpsc::Sender<Data>,

/// Updates about permissioned price accounts from oracle to exporter
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,

/// The RPC client to use to poll data from the RPC node
rpc_client: RpcClient,
Expand All @@ -438,7 +442,7 @@ struct Poller {
impl Poller {
pub fn new(
data_tx: mpsc::Sender<Data>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, WeeklySchedule>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
rpc_url: &str,
rpc_timeout: Duration,
commitment: CommitmentLevel,
Expand Down Expand Up @@ -510,10 +514,8 @@ impl Poller {
.entry(component.publisher)
.or_insert(HashMap::new());

let weekly_schedule = if let Some(prod_entry) =
product_accounts.get(&price_entry.prod)
{
prod_entry.weekly_schedule.clone()
let schedule = if let Some(prod_entry) = product_accounts.get(&price_entry.prod) {
prod_entry.schedule.clone()
} else {
warn!(&self.logger, "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7.";
"price" => price_key.to_string(),
Expand All @@ -522,7 +524,7 @@ impl Poller {
Default::default()
};

component_pub_entry.insert(*price_key, weekly_schedule);
component_pub_entry.insert(*price_key, schedule);
}
}

Expand Down Expand Up @@ -627,11 +629,33 @@ impl Poller {
Default::default() // No market hours specified, meaning 24/7 publishing
};

let holiday_schedule: HolidaySchedule = if let Some((_hsched_key, hsched_val)) =
product.iter().find(|(k, _v)| *k == "holidays")
{
hsched_val.parse().unwrap_or_else(|err| {
warn!(
self.logger,
"Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing.";
"product_key" => product_key.to_string(),
"holiday_schedule" => hsched_val,
);
debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err));
Default::default()
})
} else {
Default::default() // No market hours specified, meaning 24/7 publishing
};

product_entries.insert(
*product_key,
ProductEntry {
account_data: *product,
weekly_schedule,
account_data: *product,
schedule: {
Schedule {
market_hours: weekly_schedule,
holiday_hours: holiday_schedule,
}
},
price_accounts: vec![],
},
);
Expand Down

0 comments on commit 574a444

Please sign in to comment.