diff --git a/integration-tests/tests/test_integration.py b/integration-tests/tests/test_integration.py index e7d4f15..26bd8e7 100644 --- a/integration-tests/tests/test_integration.py +++ b/integration-tests/tests/test_integration.py @@ -78,6 +78,7 @@ "nasdaq_symbol": "AAPL", "symbol": "Equity.US.AAPL/USD", "base": "AAPL", + "weekly_schedule": "America/New_York,C,C,C,C,C,C,C" # Should never be published due to all-closed market hours }, "metadata": {"jump_id": "186", "jump_symbol": "AAPL", "price_exp": -5, "min_publishers": 1}, } @@ -732,3 +733,37 @@ async def test_agent_migrate_config(self, # Continue with the simple test case, which must succeed await self.test_update_price_simple(client_no_spawn) await client_no_spawn.close() + + @pytest.mark.asyncio + async def test_agent_respects_market_hours(self, client: PythAgentClient): + ''' + Similar to test_update_price_simple, but using AAPL_USD and + asserting that nothing is published due to the symbol's + all-closed market hours. + ''' + + # Fetch all products + products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()} + + # Find the product account ID corresponding to the AAPL/USD symbol + product = products[AAPL_USD["attr_dict"]["symbol"]] + product_account = product["account"] + + # Get the price account with which to send updates + price_account = product["price_accounts"][0]["account"] + + # Send an "update_price" request + await client.update_price(price_account, 42, 2, "trading") + time.sleep(2) + + # Send another "update_price" request to trigger aggregation + await client.update_price(price_account, 81, 1, "trading") + time.sleep(2) + + # Confirm that the price account has been updated with the values from the first "update_price" request + final_product_state = await client.get_product(product_account) + + final_price_account = final_product_state["price_accounts"][0] + assert final_price_account["price"] == 0 + assert final_price_account["conf"] == 0 + assert final_price_account["status"] == "unknown" diff --git a/src/agent/market_hours.rs b/src/agent/market_hours.rs index 1a8cc84..28e7f88 100644 --- a/src/agent/market_hours.rs +++ b/src/agent/market_hours.rs @@ -9,8 +9,9 @@ use { chrono::{ naive::NaiveTime, DateTime, + Datelike, Duration, - TimeZone, + Utc, Weekday, }, chrono_tz::Tz, @@ -27,8 +28,8 @@ lazy_static! { } /// Weekly market hours schedule -#[derive(Default, Debug, Eq, PartialEq)] -pub struct MarketHours { +#[derive(Clone, Default, Debug, Eq, PartialEq)] +pub struct WeeklySchedule { pub timezone: Tz, pub mon: MHKind, pub tue: MHKind, @@ -39,7 +40,7 @@ pub struct MarketHours { pub sun: MHKind, } -impl MarketHours { +impl WeeklySchedule { pub fn all_closed() -> Self { Self { timezone: Default::default(), @@ -53,13 +54,11 @@ impl MarketHours { } } - pub fn can_publish_at(&self, when: &DateTime) -> Result { + pub fn can_publish_at(&self, when: &DateTime) -> bool { // Convert to time local to the market let when_market_local = when.with_timezone(&self.timezone); - // NOTE(2023-11-21): Strangely enough, I couldn't find a - // method that gets the programmatic Weekday from a DateTime. - let market_weekday: Weekday = when_market_local.format("%A").to_string().parse()?; + let market_weekday: Weekday = when_market_local.date_naive().weekday(); let market_time = when_market_local.time(); @@ -73,11 +72,11 @@ impl MarketHours { Weekday::Sun => self.sun.can_publish_at(market_time), }; - Ok(ret) + ret } } -impl FromStr for MarketHours { +impl FromStr for WeeklySchedule { type Err = anyhow::Error; fn from_str(s: &str) -> Result { let mut split_by_commas = s.split(","); @@ -163,7 +162,7 @@ impl FromStr for MarketHours { } /// Helper enum for denoting per-day schedules: time range, all-day open and all-day closed. -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum MHKind { Open, Closed, @@ -236,9 +235,9 @@ mod tests { // Mon-Fri 9-5, inconsistent leading space on Tuesday, leading 0 on Friday (expected to be fine) let s = "Europe/Warsaw,9:00-17:00, 9:00-17:00,9:00-17:00,9:00-17:00,09:00-17:00,C,C"; - let parsed: MarketHours = s.parse()?; + let parsed: WeeklySchedule = s.parse()?; - let expected = MarketHours { + let expected = WeeklySchedule { timezone: Tz::Europe__Warsaw, mon: MHKind::TimeRange( NaiveTime::from_hms_opt(9, 0, 0).unwrap(), @@ -274,7 +273,7 @@ mod tests { // Valid but missing a timezone let s = "O,C,O,C,O,C,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -285,7 +284,7 @@ mod tests { // One day short let s = "Asia/Hong_Kong,C,O,C,O,C,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -295,7 +294,7 @@ mod tests { fn test_parsing_gibberish_timezone_is_error() { // Pretty sure that one's extinct let s = "Pangea/New_Dino_City,O,O,O,O,O,O,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -304,7 +303,7 @@ mod tests { #[test] fn test_parsing_gibberish_day_schedule_is_error() { let s = "Europe/Amsterdam,mondays are alright I guess,O,O,O,O,O,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -314,7 +313,7 @@ mod tests { fn test_parsing_too_many_days_is_error() { // One day too many let s = "Europe/Lisbon,O,O,O,O,O,O,O,O,C"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -323,7 +322,7 @@ mod tests { #[test] fn test_market_hours_happy_path() -> Result<()> { // Prepare a schedule of narrow ranges - let mh: MarketHours = "America/New_York,00:00-1:00,1:00-2:00,2:00-3:00,3:00-4:00,4:00-5:00,5:00-6:00,6:00-7:00".parse()?; + let wsched: WeeklySchedule = "America/New_York,00:00-1:00,1:00-2:00,2:00-3:00,3:00-4:00,4:00-5:00,5:00-6:00,6:00-7:00".parse()?; // Prepare UTC datetimes that fall before, within and after market hours let format = "%Y-%m-%d %H:%M"; @@ -357,7 +356,7 @@ mod tests { NaiveDateTime::parse_from_str("2023-11-26 12:30", format)?.and_utc(), ]; - dbg!(&mh); + dbg!(&wsched); for ((before_dt, ok_dt), after_dt) in bad_datetimes_before .iter() @@ -368,9 +367,9 @@ mod tests { dbg!(&ok_dt); dbg!(&after_dt); - assert!(!mh.can_publish_at(before_dt)?); - assert!(mh.can_publish_at(ok_dt)?); - assert!(!mh.can_publish_at(after_dt)?); + assert!(!wsched.can_publish_at(before_dt)); + assert!(wsched.can_publish_at(ok_dt)); + assert!(!wsched.can_publish_at(after_dt)); } Ok(()) @@ -380,7 +379,8 @@ mod tests { #[test] fn test_market_hours_midnight_00_24() -> Result<()> { // Prepare a schedule of midnight-neighboring ranges - let mh: MarketHours = "Europe/Amsterdam,23:00-24:00,00:00-01:00,O,C,C,C,C".parse()?; + let wsched: WeeklySchedule = + "Europe/Amsterdam,23:00-24:00,00:00-01:00,O,C,C,C,C".parse()?; let format = "%Y-%m-%d %H:%M"; let ok_datetimes = vec![ @@ -408,16 +408,122 @@ mod tests { .unwrap(), ]; - dbg!(&mh); + dbg!(&wsched); for (ok_dt, bad_dt) in ok_datetimes.iter().zip(bad_datetimes.iter()) { dbg!(&ok_dt); dbg!(&bad_dt); - assert!(mh.can_publish_at(ok_dt)?); - assert!(!mh.can_publish_at(bad_dt)?); + assert!(wsched.can_publish_at(&ok_dt.with_timezone(&Utc))); + assert!(!wsched.can_publish_at(&bad_dt.with_timezone(&Utc))); } Ok(()) } + + /// Performs a scenario on 2023 autumn DST change. During that + /// time, most of the EU switched on the weekend one week earlier + /// (Oct 28-29) than most of the US (Nov 4-5). + #[test] + fn test_market_hours_dst_shenanigans() -> Result<()> { + // The Monday schedule is equivalent between Amsterdam and + // Chicago for most of 2023 (7h difference), except for two + // instances of Amsterdam/Chicago DST change lag: + // * Spring 2023: Mar12(US)-Mar26(EU) (clocks go forward 1h, + // CDT/CET 6h offset in use for 2 weeks, CDT/CEST 7h offset after) + // * Autumn 2023: Oct29(EU)-Nov5(US) (clocks go back 1h, + // CDT/CET 6h offset in use 1 week, CST/CET 7h offset after) + let wsched_eu: WeeklySchedule = "Europe/Amsterdam,9:00-17:00,O,O,O,O,O,O".parse()?; + let wsched_us: WeeklySchedule = "America/Chicago,2:00-10:00,O,O,O,O,O,O".parse()?; + + let format = "%Y-%m-%d %H:%M"; + + // Monday after EU change, before US change, from Amsterdam + // perspective. Okay for publishing Amsterdam market, outside hours for Chicago market + let dt1 = NaiveDateTime::parse_from_str("2023-10-30 16:01", format)? + .and_local_timezone(Tz::Europe__Amsterdam) + .unwrap(); + dbg!(&dt1); + + assert!(wsched_eu.can_publish_at(&dt1.with_timezone(&Utc))); + assert!(!wsched_us.can_publish_at(&dt1.with_timezone(&Utc))); + + // Same point in time, from Chicago perspective. Still okay + // for Amsterdam, still outside hours for Chicago. + let dt2 = NaiveDateTime::parse_from_str("2023-10-30 10:01", format)? + .and_local_timezone(Tz::America__Chicago) + .unwrap(); + dbg!(&dt2); + + assert!(wsched_eu.can_publish_at(&dt2.with_timezone(&Utc))); + assert!(!wsched_us.can_publish_at(&dt2.with_timezone(&Utc))); + + assert_eq!(dt1, dt2); + + // Monday after EU change, before US change, from Chicago + // perspective. Okay for publishing Chicago market, outside + // hours for publishing Amsterdam market. + let dt3 = NaiveDateTime::parse_from_str("2023-10-30 02:01", format)? + .and_local_timezone(Tz::America__Chicago) + .unwrap(); + dbg!(&dt3); + + assert!(!wsched_eu.can_publish_at(&dt3.with_timezone(&Utc))); + assert!(wsched_us.can_publish_at(&dt3.with_timezone(&Utc))); + + // Same point in time, from Amsterdam perspective. Still okay + // for Chicago, still outside hours for Amsterdam. + let dt4 = NaiveDateTime::parse_from_str("2023-10-30 08:01", format)? + .and_local_timezone(Tz::Europe__Amsterdam) + .unwrap(); + dbg!(&dt4); + + assert!(!wsched_eu.can_publish_at(&dt4.with_timezone(&Utc))); + assert!(wsched_us.can_publish_at(&dt4.with_timezone(&Utc))); + + assert_eq!(dt3, dt4); + + // Monday after both Amsterdam and Chicago get over their DST + // change, from Amsterdam perspective. Okay for publishing + // both markets. + let dt5 = NaiveDateTime::parse_from_str("2023-11-06 09:01", format)? + .and_local_timezone(Tz::Europe__Amsterdam) + .unwrap(); + dbg!(&dt5); + assert!(wsched_eu.can_publish_at(&dt5.with_timezone(&Utc))); + assert!(wsched_us.can_publish_at(&dt5.with_timezone(&Utc))); + + // Same point in time, from Chicago perspective + let dt6 = NaiveDateTime::parse_from_str("2023-11-06 02:01", format)? + .and_local_timezone(Tz::America__Chicago) + .unwrap(); + dbg!(&dt6); + assert!(wsched_eu.can_publish_at(&dt6.with_timezone(&Utc))); + assert!(wsched_us.can_publish_at(&dt6.with_timezone(&Utc))); + + assert_eq!(dt5, dt6); + + // Monday after both Amsterdam and Chicago get over their DST + // change, from Amsterdam perspective. Outside both markets' + // hours. + let dt7 = NaiveDateTime::parse_from_str("2023-11-06 17:01", format)? + .and_local_timezone(Tz::Europe__Amsterdam) + .unwrap(); + dbg!(&dt7); + assert!(!wsched_eu.can_publish_at(&dt7.with_timezone(&Utc))); + assert!(!wsched_us.can_publish_at(&dt7.with_timezone(&Utc))); + + // Same point in time, from Chicago perspective, still outside + // hours for both markets. + let dt8 = NaiveDateTime::parse_from_str("2023-11-06 10:01", format)? + .and_local_timezone(Tz::America__Chicago) + .unwrap(); + dbg!(&dt8); + assert!(!wsched_eu.can_publish_at(&dt8.with_timezone(&Utc))); + assert!(!wsched_us.can_publish_at(&dt8.with_timezone(&Utc))); + + assert_eq!(dt7, dt8); + + Ok(()) + } } diff --git a/src/agent/pythd/adapter.rs b/src/agent/pythd/adapter.rs index da1e539..db81b27 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/pythd/adapter.rs @@ -955,7 +955,7 @@ mod tests { ) .unwrap(), solana::oracle::ProductEntry { - account_data: pyth_sdk_solana::state::ProductAccount { + account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 6, atype: 4, @@ -992,7 +992,8 @@ mod tests { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, - price_accounts: vec![ + weekly_schedule: Default::default(), + price_accounts: vec![ solana_sdk::pubkey::Pubkey::from_str( "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU", ) @@ -1014,7 +1015,7 @@ mod tests { ) .unwrap(), solana::oracle::ProductEntry { - account_data: pyth_sdk_solana::state::ProductAccount { + account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 5, atype: 3, @@ -1051,7 +1052,8 @@ mod tests { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, - price_accounts: vec![ + weekly_schedule: Default::default(), + price_accounts: vec![ solana_sdk::pubkey::Pubkey::from_str( "GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD", ) diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 056dc0c..f50cee2 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -9,9 +9,12 @@ use { key_store, network::Network, }, - crate::agent::remote_keypair_loader::{ - KeypairRequest, - RemoteKeypairLoader, + crate::agent::{ + market_hours::WeeklySchedule, + remote_keypair_loader::{ + KeypairRequest, + RemoteKeypairLoader, + }, }, anyhow::{ anyhow, @@ -57,7 +60,6 @@ use { collections::{ BTreeMap, HashMap, - HashSet, }, time::Duration, }, @@ -169,7 +171,7 @@ pub fn spawn_exporter( network: Network, rpc_url: &str, rpc_timeout: Duration, - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: mpsc::Receiver>>, key_store: KeyStore, local_store_tx: Sender, global_store_tx: Sender, @@ -256,11 +258,11 @@ pub struct Exporter { // Channel on which to send inflight transactions to the transaction monitor inflight_transactions_tx: Sender, - /// Permissioned symbols as read by the oracle module - publisher_permissions_rx: mpsc::Receiver>>, + /// publisher => { permissioned_price => market hours } as read by the oracle module + publisher_permissions_rx: mpsc::Receiver>>, - /// Currently known permissioned prices of this publisher - our_prices: HashSet, + /// Currently known permissioned prices of this publisher along with their market hours + our_prices: HashMap, /// Interval to update the dynamic price (if enabled) dynamic_compute_unit_price_update_interval: Interval, @@ -284,7 +286,7 @@ impl Exporter { global_store_tx: Sender, network_state_rx: watch::Receiver, inflight_transactions_tx: Sender, - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: mpsc::Receiver>>, keypair_request_tx: mpsc::Sender, logger: Logger, ) -> Self { @@ -301,7 +303,7 @@ impl Exporter { network_state_rx, inflight_transactions_tx, publisher_permissions_rx, - our_prices: HashSet::new(), + our_prices: HashMap::new(), dynamic_compute_unit_price_update_interval: tokio::time::interval( time::Duration::from_secs(1), ), @@ -468,13 +470,26 @@ impl Exporter { "publish_pubkey" => publish_keypair.pubkey().to_string(), ); + // Get a fresh system time + let now = Utc::now(); + // Filter out price accounts we're not permissioned to update Ok(fresh_updates .into_iter() .filter(|(id, _data)| { let key_from_id = Pubkey::from((*id).clone().to_bytes()); - if self.our_prices.contains(&key_from_id) { - true + if let Some(weekly_schedule) = self.our_prices.get(&key_from_id) { + let ret = weekly_schedule.can_publish_at(&now); + + if !ret { + debug!(self.logger, "Exporter: Attempted to publish price outside market hours"; + "price_account" => key_from_id.to_string(), + "weekly_schedule" => format!("{:?}", weekly_schedule), + "utc_time" => now.format("%c").to_string(), + ); + } + + ret } else { // Note: This message is not an error. Some // publishers have different permissions on @@ -570,7 +585,7 @@ impl Exporter { "Exporter: No permissioned prices were found for the publishing keypair on-chain. This is expected only on startup."; "publish_pubkey" => publish_pubkey.to_string(), ); - HashSet::new() + HashMap::new() }); trace!( self.logger, diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 52fe389..c4df8b4 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -1,3 +1,4 @@ +use crate::agent::market_hours::WeeklySchedule; // This module is responsible for loading the current state of the // on-chain Oracle program accounts from Solana. use { @@ -47,8 +48,8 @@ pub struct Data { pub mapping_accounts: HashMap, pub product_accounts: HashMap, pub price_accounts: HashMap, - /// publisher => {their permissioned price accounts} - pub publisher_permissions: HashMap>, + /// publisher => {their permissioned price accounts => market hours} + pub publisher_permissions: HashMap>, } impl Data { @@ -56,7 +57,7 @@ impl Data { mapping_accounts: HashMap, product_accounts: HashMap, price_accounts: HashMap, - publisher_permissions: HashMap>, + publisher_permissions: HashMap>, ) -> Self { Data { mapping_accounts, @@ -70,8 +71,9 @@ impl Data { pub type MappingAccount = pyth_sdk_solana::state::MappingAccount; #[derive(Debug, Clone)] pub struct ProductEntry { - pub account_data: pyth_sdk_solana::state::ProductAccount, - pub price_accounts: Vec, + pub account_data: pyth_sdk_solana::state::ProductAccount, + pub weekly_schedule: WeeklySchedule, + pub price_accounts: Vec, } pub type PriceEntry = pyth_sdk_solana::state::PriceAccount; @@ -134,7 +136,7 @@ pub fn spawn_oracle( wss_url: &str, rpc_timeout: Duration, global_store_update_tx: mpsc::Sender, - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: mpsc::Sender>>, key_store: KeyStore, logger: Logger, ) -> Vec> { @@ -349,7 +351,7 @@ struct Poller { data_tx: mpsc::Sender, /// Updates about permissioned price accounts from oracle to exporter - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: mpsc::Sender>>, /// The RPC client to use to poll data from the RPC node rpc_client: RpcClient, @@ -369,7 +371,7 @@ struct Poller { impl Poller { pub fn new( data_tx: mpsc::Sender, - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: mpsc::Sender>>, rpc_url: &str, rpc_timeout: Duration, commitment: CommitmentLevel, @@ -435,9 +437,21 @@ impl Poller { for component in price_entry.comp { let component_pub_entry = publisher_permissions .entry(component.publisher) - .or_insert(HashSet::new()); + .or_insert(HashMap::new()); - component_pub_entry.insert(*price_key); + let weekly_schedule = if let Some(prod_entry) = + product_accounts.get(&price_entry.prod) + { + prod_entry.weekly_schedule.clone() + } else { + warn!(&self.logger, "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7."; + "price" => price_key.to_string(), + "missing_product" => price_entry.prod.to_string(), + ); + Default::default() + }; + + component_pub_entry.insert(*price_key, weekly_schedule); } } @@ -525,10 +539,29 @@ impl Poller { let product = load_product_account(prod_acc.data.as_slice()) .context(format!("Could not parse product account {}", product_key))?; + let weekly_schedule: WeeklySchedule = if let Some((_wsched_key, wsched_val)) = + product.iter().find(|(k, _v)| *k == "weekly_schedule") + { + wsched_val.parse().unwrap_or_else(|err| { + warn!( + self.logger, + "Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing."; + "product_key" => product_key.to_string(), + "weekly_schedule" => wsched_val, + ); + debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err)); + Default::default() + } + ) + } else { + Default::default() // No market hours specified, meaning 24/7 publishing + }; + product_entries.insert( *product_key, ProductEntry { - account_data: *product, + account_data: *product, + weekly_schedule, price_accounts: vec![], }, );