From 920df19e5fe079f10e8fd492e36270d98a125572 Mon Sep 17 00:00:00 2001 From: Keyvan Khademi Date: Fri, 12 Apr 2024 10:55:49 -0700 Subject: [PATCH] feat: add market_schedule module (#112) * feat: add holiday_hours module * feat: integrate holiday schedule * feat: update gitignore to ignore .DS_Store * refactor: imports * fix: pre-commit * feat: wip market schedule using winnow parser * feat: wip add holiday day schedule parser * fix: format * feat: wip implement market_schedule parser * fix: format * feat: add market schedule can publish at test * feat: use new market hours in pyth agent * fix: format * refactor: rollback module restructure * rename market hours to legacy schedule * chore: add comment * feat: add support for 24:00 * fix: avoid parsing twice for verification * refactor: use match instead of if * refactor: use seq for time range parser * refactor: improve parser * refactor: improve parser * refactor: implement from trait * feat: add proptest * fix: day kind regex and add comments * refactor: improve comment * chore: increase pyth agent minor version --- .gitignore | 3 + Cargo.lock | 75 ++- Cargo.toml | 4 +- integration-tests/tests/test_integration.py | 53 +- .../agent/market_schedule.txt | 8 + src/agent.rs | 3 +- .../{market_hours.rs => legacy_schedule.rs} | 30 +- src/agent/market_schedule.rs | 561 ++++++++++++++++++ src/agent/pythd/adapter.rs | 12 +- src/agent/solana/exporter.rs | 18 +- src/agent/solana/oracle.rs | 59 +- 11 files changed, 775 insertions(+), 51 deletions(-) create mode 100644 proptest-regressions/agent/market_schedule.txt rename src/agent/{market_hours.rs => legacy_schedule.rs} (95%) create mode 100644 src/agent/market_schedule.rs diff --git a/.gitignore b/.gitignore index 41b55c9..48d41e1 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ result **/*.rs.bk __pycache__ keystore + +# Mac OS +.DS_Store diff --git a/Cargo.lock b/Cargo.lock index f0224dd..cd1f188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2363,6 +2363,12 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + [[package]] name = "libredox" version = "0.0.1" @@ -2715,6 +2721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" dependencies = [ "autocfg 1.2.0", + "libm", ] [[package]] @@ -3241,9 +3248,29 @@ dependencies = [ "syn 2.0.55", ] +[[package]] +name = "proptest" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b476131c3c86cb68032fdc5cb6d5a1045e3e42d96b69fa599fd77701e1f5bf" +dependencies = [ + "bit-set", + "bit-vec", + "bitflags 2.5.0", + "lazy_static", + "num-traits", + "rand 0.8.5", + "rand_chacha 0.3.1", + "rand_xorshift 0.3.0", + "regex-syntax 0.8.3", + "rusty-fork", + "tempfile", + "unarray", +] + [[package]] name = "pyth-agent" -version = "2.5.2" +version = "2.6.0" dependencies = [ "anyhow", "async-trait", @@ -3261,6 +3288,7 @@ dependencies = [ "parking_lot", "portpicker", "prometheus-client", + "proptest", "pyth-sdk", "pyth-sdk-solana", "rand 0.8.5", @@ -3285,6 +3313,7 @@ dependencies = [ "toml_edit 0.22.9", "typed-html", "warp", + "winnow 0.6.5", ] [[package]] @@ -3338,6 +3367,12 @@ dependencies = [ "syn 2.0.55", ] +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quinn" version = "0.10.2" @@ -3410,7 +3445,7 @@ dependencies = [ "rand_jitter", "rand_os", "rand_pcg", - "rand_xorshift", + "rand_xorshift 0.1.1", "winapi", ] @@ -3572,6 +3607,15 @@ dependencies = [ "rand_core 0.3.1", ] +[[package]] +name = "rand_xorshift" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d25bf25ec5ae4a3f1b92f929810509a2f53d7dca2f50b794ff57e3face536c8f" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "rand_xoshiro" version = "0.6.0" @@ -3914,6 +3958,18 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" +[[package]] +name = "rusty-fork" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f" +dependencies = [ + "fnv", + "quick-error", + "tempfile", + "wait-timeout", +] + [[package]] name = "ryu" version = "1.0.17" @@ -5941,6 +5997,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" +[[package]] +name = "unarray" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" + [[package]] name = "unicase" version = "2.7.0" @@ -6077,6 +6139,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "wait-timeout" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f200f5b12eb75f8c1ed65abd4b2db8a6e1b138a20de009dacee265a2498f3f6" +dependencies = [ + "libc", +] + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 9bd9deb..8c28965 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "2.5.2" +version = "2.6.0" edition = "2021" [[bin]] @@ -52,6 +52,8 @@ prometheus-client = "0.22.2" lazy_static = "1.4.0" toml_edit = "0.22.9" slog-bunyan = "2.5.0" +winnow = "0.6.5" +proptest = "1.4.0" [dev-dependencies] tokio-util = { version = "0.7.10", features = ["full"] } diff --git a/integration-tests/tests/test_integration.py b/integration-tests/tests/test_integration.py index 0b8f81d..20d6d29 100644 --- a/integration-tests/tests/test_integration.py +++ b/integration-tests/tests/test_integration.py @@ -1,4 +1,5 @@ import asyncio +from datetime import datetime import json import os import requests @@ -63,9 +64,23 @@ "quote_currency": "USD", "generic_symbol": "BTCUSD", "description": "BTC/USD", + "schedule": f"America/New_York;O,O,O,O,O,O,O;{datetime.now().strftime('%m%d')}/O" }, "metadata": {"jump_id": "78876709", "jump_symbol": "BTCUSD", "price_exp": -8, "min_publishers": 1}, } +SOL_USD = { + "account": "", + "attr_dict": { + "symbol": "Crypto.SOL/USD", + "asset_type": "Crypto", + "base": "SOL", + "quote_currency": "USD", + "generic_symbol": "SOLUSD", + "description": "SOL/USD", + "schedule": f"America/New_York;O,O,O,O,O,O,O;{datetime.now().strftime('%m%d')}/C" + }, + "metadata": {"jump_id": "78876711", "jump_symbol": "SOLUSD", "price_exp": -8, "min_publishers": 1}, +} AAPL_USD = { "account": "", "attr_dict": { @@ -95,7 +110,7 @@ }, "metadata": {"jump_id": "78876710", "jump_symbol": "ETHUSD", "price_exp": -8, "min_publishers": 1}, } -ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD] +ALL_PRODUCTS=[BTC_USD, AAPL_USD, ETH_USD, SOL_USD] asyncio.set_event_loop(asyncio.new_event_loop()) @@ -277,6 +292,7 @@ def refdata_permissions(self, refdata_path): "AAPL": {"price": ["some_publisher_a"]}, "BTCUSD": {"price": ["some_publisher_b", "some_publisher_a"]}, # Reversed order helps ensure permission discovery works correctly for publisher A "ETHUSD": {"price": ["some_publisher_b"]}, + "SOLUSD": {"price": ["some_publisher_a"]}, })) f.flush() yield f.name @@ -769,3 +785,38 @@ async def test_agent_respects_market_hours(self, client: PythAgentClient): assert final_price_account["price"] == 0 assert final_price_account["conf"] == 0 assert final_price_account["status"] == "unknown" + + @pytest.mark.asyncio + async def test_agent_respects_holiday_hours(self, client: PythAgentClient): + ''' + Similar to test_agent_respects_market_hours, but using SOL_USD and + asserting that nothing is published due to the symbol's all-closed holiday. + ''' + + # Fetch all products + products = {product["attr_dict"]["symbol"]: product for product in await client.get_all_products()} + + # Find the product account ID corresponding to the AAPL/USD symbol + product = products[SOL_USD["attr_dict"]["symbol"]] + product_account = product["account"] + + # Get the price account with which to send updates + price_account = product["price_accounts"][0]["account"] + + # Send an "update_price" request + await client.update_price(price_account, 42, 2, "trading") + time.sleep(2) + + # Send another update_price request to "trigger" aggregation + # (aggregation would happen if market hours were to fail, but + # we want to catch that happening if there's a problem) + await client.update_price(price_account, 81, 1, "trading") + time.sleep(2) + + # Confirm that the price account has not been updated + final_product_state = await client.get_product(product_account) + + final_price_account = final_product_state["price_accounts"][0] + assert final_price_account["price"] == 0 + assert final_price_account["conf"] == 0 + assert final_price_account["status"] == "unknown" diff --git a/proptest-regressions/agent/market_schedule.txt b/proptest-regressions/agent/market_schedule.txt new file mode 100644 index 0000000..5d86612 --- /dev/null +++ b/proptest-regressions/agent/market_schedule.txt @@ -0,0 +1,8 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 173b9a862e3ad1149b0fdef292a11164ecab5b67b395857178f63294c3c9c0b7 # shrinks to s = "0000-0060" +cc 6cf32e18287cb6de4b40f4326d1e9fd3be409086af3ccf75eac6f980c1f67052 # shrinks to s = TimeRange(00:00:00, 00:00:01) diff --git a/src/agent.rs b/src/agent.rs index b23db86..738e0a3 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -63,7 +63,8 @@ Note that there is an Oracle and Exporter for each network, but only one Local S ################################################################################################################################## */ pub mod dashboard; -pub mod market_hours; +pub mod legacy_schedule; +pub mod market_schedule; pub mod metrics; pub mod pythd; pub mod remote_keypair_loader; diff --git a/src/agent/market_hours.rs b/src/agent/legacy_schedule.rs similarity index 95% rename from src/agent/market_hours.rs rename to src/agent/legacy_schedule.rs index 28e7f88..8360021 100644 --- a/src/agent/market_hours.rs +++ b/src/agent/legacy_schedule.rs @@ -28,8 +28,10 @@ lazy_static! { } /// Weekly market hours schedule +/// TODO: Remove after all publishers have upgraded to support the new schedule format #[derive(Clone, Default, Debug, Eq, PartialEq)] -pub struct WeeklySchedule { +#[deprecated(note = "This struct is deprecated, use MarketSchedule instead.")] +pub struct LegacySchedule { pub timezone: Tz, pub mon: MHKind, pub tue: MHKind, @@ -40,7 +42,7 @@ pub struct WeeklySchedule { pub sun: MHKind, } -impl WeeklySchedule { +impl LegacySchedule { pub fn all_closed() -> Self { Self { timezone: Default::default(), @@ -76,7 +78,7 @@ impl WeeklySchedule { } } -impl FromStr for WeeklySchedule { +impl FromStr for LegacySchedule { type Err = anyhow::Error; fn from_str(s: &str) -> Result { let mut split_by_commas = s.split(","); @@ -235,9 +237,9 @@ mod tests { // Mon-Fri 9-5, inconsistent leading space on Tuesday, leading 0 on Friday (expected to be fine) let s = "Europe/Warsaw,9:00-17:00, 9:00-17:00,9:00-17:00,9:00-17:00,09:00-17:00,C,C"; - let parsed: WeeklySchedule = s.parse()?; + let parsed: LegacySchedule = s.parse()?; - let expected = WeeklySchedule { + let expected = LegacySchedule { timezone: Tz::Europe__Warsaw, mon: MHKind::TimeRange( NaiveTime::from_hms_opt(9, 0, 0).unwrap(), @@ -273,7 +275,7 @@ mod tests { // Valid but missing a timezone let s = "O,C,O,C,O,C,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -284,7 +286,7 @@ mod tests { // One day short let s = "Asia/Hong_Kong,C,O,C,O,C,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -294,7 +296,7 @@ mod tests { fn test_parsing_gibberish_timezone_is_error() { // Pretty sure that one's extinct let s = "Pangea/New_Dino_City,O,O,O,O,O,O,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -303,7 +305,7 @@ mod tests { #[test] fn test_parsing_gibberish_day_schedule_is_error() { let s = "Europe/Amsterdam,mondays are alright I guess,O,O,O,O,O,O"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -313,7 +315,7 @@ mod tests { fn test_parsing_too_many_days_is_error() { // One day too many let s = "Europe/Lisbon,O,O,O,O,O,O,O,O,C"; - let parsing_result: Result = s.parse(); + let parsing_result: Result = s.parse(); dbg!(&parsing_result); assert!(parsing_result.is_err()); @@ -322,7 +324,7 @@ mod tests { #[test] fn test_market_hours_happy_path() -> Result<()> { // Prepare a schedule of narrow ranges - let wsched: WeeklySchedule = "America/New_York,00:00-1:00,1:00-2:00,2:00-3:00,3:00-4:00,4:00-5:00,5:00-6:00,6:00-7:00".parse()?; + let wsched: LegacySchedule = "America/New_York,00:00-1:00,1:00-2:00,2:00-3:00,3:00-4:00,4:00-5:00,5:00-6:00,6:00-7:00".parse()?; // Prepare UTC datetimes that fall before, within and after market hours let format = "%Y-%m-%d %H:%M"; @@ -379,7 +381,7 @@ mod tests { #[test] fn test_market_hours_midnight_00_24() -> Result<()> { // Prepare a schedule of midnight-neighboring ranges - let wsched: WeeklySchedule = + let wsched: LegacySchedule = "Europe/Amsterdam,23:00-24:00,00:00-01:00,O,C,C,C,C".parse()?; let format = "%Y-%m-%d %H:%M"; @@ -433,8 +435,8 @@ mod tests { // CDT/CET 6h offset in use for 2 weeks, CDT/CEST 7h offset after) // * Autumn 2023: Oct29(EU)-Nov5(US) (clocks go back 1h, // CDT/CET 6h offset in use 1 week, CST/CET 7h offset after) - let wsched_eu: WeeklySchedule = "Europe/Amsterdam,9:00-17:00,O,O,O,O,O,O".parse()?; - let wsched_us: WeeklySchedule = "America/Chicago,2:00-10:00,O,O,O,O,O,O".parse()?; + let wsched_eu: LegacySchedule = "Europe/Amsterdam,9:00-17:00,O,O,O,O,O,O".parse()?; + let wsched_us: LegacySchedule = "America/Chicago,2:00-10:00,O,O,O,O,O,O".parse()?; let format = "%Y-%m-%d %H:%M"; diff --git a/src/agent/market_schedule.rs b/src/agent/market_schedule.rs new file mode 100644 index 0000000..e55c799 --- /dev/null +++ b/src/agent/market_schedule.rs @@ -0,0 +1,561 @@ +//! Holiday hours metadata parsing and evaluation logic + +use { + super::legacy_schedule::{ + LegacySchedule, + MHKind, + }, + anyhow::{ + anyhow, + Result, + }, + chrono::{ + naive::NaiveTime, + DateTime, + Datelike, + Duration, + Utc, + }, + chrono_tz::Tz, + proptest::{ + arbitrary::any, + prop_compose, + proptest, + }, + std::{ + fmt::Display, + str::FromStr, + }, + winnow::{ + combinator::{ + alt, + separated, + seq, + }, + stream::ToUsize, + token::{ + take, + take_till, + }, + PResult, + Parser, + }, +}; + + +/// Helper time value representing 24:00:00 as 00:00:00 minus 1 +/// nanosecond (underflowing to 23:59:59.999(...) ). While chrono +/// has this value internally exposed as NaiveTime::MAX, it is not +/// exposed outside the crate. +const MAX_TIME_INSTANT: NaiveTime = NaiveTime::MIN + .overflowing_sub_signed(Duration::nanoseconds(1)) + .0; + + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct MarketSchedule { + pub timezone: Tz, + pub weekly_schedule: Vec, + pub holidays: Vec, +} + +impl Default for MarketSchedule { + fn default() -> Self { + Self { + timezone: Tz::UTC, + weekly_schedule: vec![ScheduleDayKind::Open; 7], + holidays: vec![], + } + } +} + +impl Display for MarketSchedule { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{};", self.timezone)?; + for (i, day) in self.weekly_schedule.iter().enumerate() { + write!(f, "{}", day)?; + if i < 6 { + write!(f, ",")?; + } + } + write!(f, ";")?; + for (i, holiday) in self.holidays.iter().enumerate() { + write!(f, "{}", holiday)?; + if i < self.holidays.len() - 1 { + write!(f, ",")?; + } + } + Ok(()) + } +} + +impl MarketSchedule { + pub fn can_publish_at(&self, when: &DateTime) -> bool { + let when_local = when.with_timezone(&self.timezone); + + let month = when_local.date_naive().month0() + 1; + let day = when_local.date_naive().day0() + 1; + let time = when_local.time(); + let weekday = when_local.weekday().number_from_monday().to_usize(); + + for holiday in &self.holidays { + // Check if the day matches + if holiday.month == month && holiday.day == day { + return holiday.kind.can_publish_at(time); + } + } + + self.weekly_schedule[weekday].can_publish_at(time) + } +} + +fn market_schedule_parser<'s>(input: &mut &'s str) -> PResult { + seq!( + MarketSchedule { + timezone: take_till(0.., ';').verify_map(|s| Tz::from_str(s).ok()), + _: ';', + weekly_schedule: separated(7, schedule_day_kind_parser, ","), + _: ';', + holidays: separated(0.., holiday_day_schedule_parser, ","), + } + ) + .parse_next(input) +} + +impl FromStr for MarketSchedule { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + market_schedule_parser + .parse_next(&mut s.to_owned().as_str()) + .map_err(|e| anyhow!(e)) + } +} + +impl From for MarketSchedule { + fn from(legacy: LegacySchedule) -> Self { + Self { + timezone: legacy.timezone, + weekly_schedule: vec![ + legacy.mon.into(), + legacy.tue.into(), + legacy.wed.into(), + legacy.thu.into(), + legacy.fri.into(), + legacy.sat.into(), + legacy.sun.into(), + ], + holidays: vec![], + } + } +} + + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct HolidayDaySchedule { + pub month: u32, + pub day: u32, + pub kind: ScheduleDayKind, +} + +fn two_digit_parser<'s>(input: &mut &'s str) -> PResult { + take(2usize) + .verify_map(|s| u32::from_str(s).ok()) + .parse_next(input) +} + +fn holiday_day_schedule_parser<'s>(input: &mut &'s str) -> PResult { + // day and month are not validated to be correct dates + // if they are invalid, it will be ignored since there + // are no real dates that match the invalid input + seq!( + HolidayDaySchedule { + month: two_digit_parser, + day: two_digit_parser, + _: "/", + kind: schedule_day_kind_parser, + } + ) + .parse_next(input) +} + +impl FromStr for HolidayDaySchedule { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + holiday_day_schedule_parser + .parse_next(&mut s.to_owned().as_str()) + .map_err(|e| anyhow!(e)) + } +} + +impl Display for HolidayDaySchedule { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:02}{:02}/{}", self.month, self.day, self.kind) + } +} + + +#[derive(Clone, Debug, Eq, PartialEq, Copy)] +pub enum ScheduleDayKind { + Open, + Closed, + TimeRange(NaiveTime, NaiveTime), +} + +impl ScheduleDayKind { + pub fn can_publish_at(&self, when_local: NaiveTime) -> bool { + match self { + Self::Open => true, + Self::Closed => false, + Self::TimeRange(start, end) => start <= &when_local && &when_local <= end, + } + } +} + +impl Default for ScheduleDayKind { + fn default() -> Self { + Self::Open + } +} + +impl Display for ScheduleDayKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Open => write!(f, "O"), + Self::Closed => write!(f, "C"), + Self::TimeRange(start, end) => { + write!(f, "{}-{}", start.format("%H%M"), end.format("%H%M")) + } + } + } +} + +fn time_parser<'s>(input: &mut &'s str) -> PResult { + alt(("2400", take(4usize))) + .verify_map(|time_str| match time_str { + "2400" => Some(MAX_TIME_INSTANT), + _ => NaiveTime::parse_from_str(time_str, "%H%M").ok(), + }) + .parse_next(input) +} + +fn time_range_parser<'s>(input: &mut &'s str) -> PResult { + seq!( + time_parser, + _: "-", + time_parser, + ) + .map(|s| ScheduleDayKind::TimeRange(s.0, s.1)) + .parse_next(input) +} + +fn schedule_day_kind_parser<'s>(input: &mut &'s str) -> PResult { + alt(( + "C".map(|_| ScheduleDayKind::Closed), + "O".map(|_| ScheduleDayKind::Open), + time_range_parser, + )) + .parse_next(input) +} + +impl FromStr for ScheduleDayKind { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + schedule_day_kind_parser + .parse_next(&mut s.to_owned().as_str()) + .map_err(|e| anyhow!(e)) + } +} + +impl From for ScheduleDayKind { + fn from(mhkind: MHKind) -> Self { + match mhkind { + MHKind::Open => ScheduleDayKind::Open, + MHKind::Closed => ScheduleDayKind::Closed, + MHKind::TimeRange(start, end) => ScheduleDayKind::TimeRange(start, end), + } + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + chrono::NaiveDateTime, + }; + + #[test] + fn test_parsing_schedule_day_kind() -> Result<()> { + // Mon-Fri 9-5, inconsistent leading space on Tuesday, leading 0 on Friday (expected to be fine) + let open = "O"; + let closed = "C"; + let valid = "1234-1347"; + let valid2400 = "1234-2400"; + let invalid = "1234-5668"; + let invalid_format = "1234-56"; + + assert_eq!( + open.parse::().unwrap(), + ScheduleDayKind::Open + ); + assert_eq!( + closed.parse::().unwrap(), + ScheduleDayKind::Closed + ); + assert_eq!( + valid.parse::().unwrap(), + ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(12, 34, 0).unwrap(), + NaiveTime::from_hms_opt(13, 47, 0).unwrap(), + ) + ); + assert_eq!( + valid2400.parse::().unwrap(), + ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(12, 34, 0).unwrap(), + MAX_TIME_INSTANT, + ) + ); + assert!(invalid.parse::().is_err()); + assert!(invalid_format.parse::().is_err()); + + Ok(()) + } + + #[test] + fn test_parsing_holiday_day_schedule() -> Result<()> { + let input = "0412/O"; + let expected = HolidayDaySchedule { + month: 04, + day: 12, + kind: ScheduleDayKind::Open, + }; + + let parsed = input.parse::()?; + assert_eq!(parsed, expected); + + let input = "0412/C"; + let expected = HolidayDaySchedule { + month: 04, + day: 12, + kind: ScheduleDayKind::Closed, + }; + let parsed = input.parse::()?; + assert_eq!(parsed, expected); + + let input = "0412/1234-1347"; + let expected = HolidayDaySchedule { + month: 04, + day: 12, + kind: ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(12, 34, 0).unwrap(), + NaiveTime::from_hms_opt(13, 47, 0).unwrap(), + ), + }; + let parsed = input.parse::()?; + assert_eq!(parsed, expected); + + let input = "0412/1234-5332"; + assert!(input.parse::().is_err()); + + let input = "0412/1234-53"; + assert!(input.parse::().is_err()); + + Ok(()) + } + + #[test] + fn test_parsing_market_schedule() -> Result<()> { + let input = "America/New_York;O,1234-1347,0930-2400,C,C,C,O;0412/O,0413/C,0414/1234-1347,1230/0930-2400"; + let expected = MarketSchedule { + timezone: Tz::America__New_York, + weekly_schedule: vec![ + ScheduleDayKind::Open, + ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(12, 34, 0).unwrap(), + NaiveTime::from_hms_opt(13, 47, 0).unwrap(), + ), + ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(09, 30, 0).unwrap(), + MAX_TIME_INSTANT, + ), + ScheduleDayKind::Closed, + ScheduleDayKind::Closed, + ScheduleDayKind::Closed, + ScheduleDayKind::Open, + ], + holidays: vec![ + HolidayDaySchedule { + month: 04, + day: 12, + kind: ScheduleDayKind::Open, + }, + HolidayDaySchedule { + month: 04, + day: 13, + kind: ScheduleDayKind::Closed, + }, + HolidayDaySchedule { + month: 04, + day: 14, + kind: ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(12, 34, 0).unwrap(), + NaiveTime::from_hms_opt(13, 47, 0).unwrap(), + ), + }, + HolidayDaySchedule { + month: 12, + day: 30, + kind: ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(09, 30, 0).unwrap(), + MAX_TIME_INSTANT, + ), + }, + ], + }; + + let parsed = input.parse::()?; + assert_eq!(parsed, expected); + + Ok(()) + } + + #[test] + fn invalid_timezone_is_err() { + let input = "Invalid/Timezone;O,C,C,C,C,C,O;0412/O,0413/C,0414/1234-1347"; + assert!(input.parse::().is_err()); + } + + #[test] + fn test_market_schedule_can_publish_at() -> Result<()> { + // Prepare a schedule of narrow ranges + let market_schedule: MarketSchedule = + "UTC;O,O,O,O,O,O,O;0422/0900-1700,1109/0930-1730,1201/O,1225/C,1231/0930-2400" + .parse() + .unwrap(); + + let format = "%Y-%m-%d %H:%M"; + + // Date no match + assert!(market_schedule + .can_publish_at(&NaiveDateTime::parse_from_str("2023-11-20 05:30", format)?.and_utc())); + + // Date match before range + assert!(!market_schedule + .can_publish_at(&NaiveDateTime::parse_from_str("2023-04-22 08:59", format)?.and_utc())); + + // Date match at start of range + assert!(market_schedule + .can_publish_at(&NaiveDateTime::parse_from_str("2023-04-22 09:00", format)?.and_utc())); + + // Date match in range + assert!(market_schedule + .can_publish_at(&NaiveDateTime::parse_from_str("2023-04-22 12:00", format)?.and_utc())); + + // Date match at end of range + assert!(market_schedule + .can_publish_at(&NaiveDateTime::parse_from_str("2023-04-22 17:00", format)?.and_utc())); + + // Date match after range + assert!(!market_schedule + .can_publish_at(&NaiveDateTime::parse_from_str("2023-04-22 17:01", format)?.and_utc())); + + // Date 2400 range + assert!(market_schedule + .can_publish_at(&NaiveDateTime::parse_from_str("2023-12-31 23:59", format)?.and_utc())); + Ok(()) + } +} + +prop_compose! { + fn schedule_day_kind()( + r in any::(), + t1 in any::(), + t2 in any::(), + ) -> ScheduleDayKind { + match r % 3 { + 0 => ScheduleDayKind::Open, + 1 => ScheduleDayKind::Closed, + _ => ScheduleDayKind::TimeRange( + NaiveTime::from_hms_opt(t1 % 24, t1 / 24 % 60, 0).unwrap(), + NaiveTime::from_hms_opt(t2 % 24, t2 / 24 % 60, 0).unwrap(), + ), + } + } +} + +prop_compose! { + fn holiday_day_schedule()( + m in 1..=12u32, + d in 1..=31u32, + s in schedule_day_kind(), + ) -> HolidayDaySchedule { + HolidayDaySchedule { + month: m, + day: d, + kind: s, + } + } +} + +prop_compose! { + fn market_schedule()( + tz in proptest::sample::select(vec![ + Tz::UTC, + Tz::America__New_York, + Tz::America__Los_Angeles, + Tz::America__Chicago, + Tz::Singapore, + Tz::Australia__Sydney, + ]), + weekly_schedule in proptest::collection::vec(schedule_day_kind(), 7..=7), + holidays in proptest::collection::vec(holiday_day_schedule(), 0..12), + ) -> MarketSchedule { + MarketSchedule { + timezone: tz, + weekly_schedule, + holidays, + } + } +} + +// Matches C or O or hhmm-hhmm with 24-hour time +const VALID_SCHEDULE_DAY_KIND_REGEX: &str = + "C|O|([01][1-9]|2[0-3])([0-5][0-9])-([01][1-9]|2[0-3])([0-5][0-9])"; + +// Matches MMDD with MM and DD being 01-12 and 01-31 respectively +const VALID_MONTH_DAY_REGEX: &str = "(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])"; + +proptest!( + #[test] + fn doesnt_crash(s in "\\PC*") { + _ = s.parse::(); + _ = s.parse::(); + _ = s.parse::(); + } + + #[test] + fn parse_valid_schedule_day_kind(s in VALID_SCHEDULE_DAY_KIND_REGEX) { + assert!(s.parse::().is_ok()); + } + + #[test] + fn test_valid_schedule_day_kind(s in schedule_day_kind()) { + assert_eq!(s, s.to_string().parse::().unwrap()); + } + + #[test] + fn parse_valid_holiday_day_schedule(s in VALID_SCHEDULE_DAY_KIND_REGEX, d in VALID_MONTH_DAY_REGEX) { + let valid_holiday_day = format!("{}/{}", d, s); + assert!(valid_holiday_day.parse::().is_ok()); + } + + #[test] + fn test_valid_holiday_day_schedule(s in holiday_day_schedule()) { + assert_eq!(s, s.to_string().parse::().unwrap()); + } + + #[test] + fn test_valid_market_schedule(s in market_schedule()) { + assert_eq!(s, s.to_string().parse::().unwrap()); + } +); diff --git a/src/agent/pythd/adapter.rs b/src/agent/pythd/adapter.rs index ebea11d..a4b3428 100644 --- a/src/agent/pythd/adapter.rs +++ b/src/agent/pythd/adapter.rs @@ -960,7 +960,7 @@ mod tests { ) .unwrap(), solana::oracle::ProductEntry { - account_data: pyth_sdk_solana::state::ProductAccount { + account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 6, atype: 4, @@ -997,8 +997,8 @@ mod tests { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, - weekly_schedule: Default::default(), - price_accounts: vec![ + schedule: Default::default(), + price_accounts: vec![ solana_sdk::pubkey::Pubkey::from_str( "GVXRSBjFk6e6J3NbVPXohDJetcTjaeeuykUpbQF8UoMU", ) @@ -1020,7 +1020,7 @@ mod tests { ) .unwrap(), solana::oracle::ProductEntry { - account_data: pyth_sdk_solana::state::ProductAccount { + account_data: pyth_sdk_solana::state::ProductAccount { magic: 0xa1b2c3d4, ver: 5, atype: 3, @@ -1057,8 +1057,8 @@ mod tests { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ], }, - weekly_schedule: Default::default(), - price_accounts: vec![ + schedule: Default::default(), + price_accounts: vec![ solana_sdk::pubkey::Pubkey::from_str( "GG3FTE7xhc9Diy7dn9P6BWzoCrAEE4D3p5NBYrDAm5DD", ) diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 3231a7b..5784cc6 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -10,7 +10,7 @@ use { network::Network, }, crate::agent::{ - market_hours::WeeklySchedule, + market_schedule::MarketSchedule, remote_keypair_loader::{ KeypairRequest, RemoteKeypairLoader, @@ -66,8 +66,8 @@ use { }, tokio::{ sync::{ - mpsc, mpsc::{ + self, error::TryRecvError, Sender, }, @@ -172,7 +172,7 @@ pub fn spawn_exporter( network: Network, rpc_url: &str, rpc_timeout: Duration, - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: mpsc::Receiver>>, key_store: KeyStore, local_store_tx: Sender, global_store_tx: Sender, @@ -260,10 +260,10 @@ pub struct Exporter { inflight_transactions_tx: Sender, /// publisher => { permissioned_price => market hours } as read by the oracle module - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: mpsc::Receiver>>, /// Currently known permissioned prices of this publisher along with their market hours - our_prices: HashMap, + our_prices: HashMap, /// Interval to update the dynamic price (if enabled) dynamic_compute_unit_price_update_interval: Interval, @@ -287,7 +287,7 @@ impl Exporter { global_store_tx: Sender, network_state_rx: watch::Receiver, inflight_transactions_tx: Sender, - publisher_permissions_rx: mpsc::Receiver>>, + publisher_permissions_rx: mpsc::Receiver>>, keypair_request_tx: mpsc::Sender, logger: Logger, ) -> Self { @@ -474,13 +474,13 @@ impl Exporter { .into_iter() .filter(|(id, _data)| { let key_from_id = Pubkey::from((*id).clone().to_bytes()); - if let Some(weekly_schedule) = self.our_prices.get(&key_from_id) { - let ret = weekly_schedule.can_publish_at(&now); + if let Some(schedule) = self.our_prices.get(&key_from_id) { + let ret = schedule.can_publish_at(&now); if !ret { debug!(self.logger, "Exporter: Attempted to publish price outside market hours"; "price_account" => key_from_id.to_string(), - "weekly_schedule" => format!("{:?}", weekly_schedule), + "schedule" => format!("{:?}", schedule), "utc_time" => now.format("%c").to_string(), ); } diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 20c8d43..57027ec 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -4,7 +4,11 @@ use { self::subscriber::Subscriber, super::key_store::KeyStore, crate::agent::{ - market_hours::WeeklySchedule, + legacy_schedule::LegacySchedule, + market_schedule::{ + MarketSchedule, + ScheduleDayKind, + }, store::global, }, anyhow::{ @@ -117,7 +121,7 @@ pub struct Data { pub product_accounts: HashMap, pub price_accounts: HashMap, /// publisher => {their permissioned price accounts => market hours} - pub publisher_permissions: HashMap>, + pub publisher_permissions: HashMap>, } impl Data { @@ -125,7 +129,7 @@ impl Data { mapping_accounts: HashMap, product_accounts: HashMap, price_accounts: HashMap, - publisher_permissions: HashMap>, + publisher_permissions: HashMap>, ) -> Self { Data { mapping_accounts, @@ -139,9 +143,9 @@ impl Data { pub type MappingAccount = pyth_sdk_solana::state::MappingAccount; #[derive(Debug, Clone)] pub struct ProductEntry { - pub account_data: pyth_sdk_solana::state::ProductAccount, - pub weekly_schedule: WeeklySchedule, - pub price_accounts: Vec, + pub account_data: pyth_sdk_solana::state::ProductAccount, + pub schedule: MarketSchedule, + pub price_accounts: Vec, } // Oracle is responsible for fetching Solana account data stored in the Pyth on-chain Oracle. @@ -203,7 +207,7 @@ pub fn spawn_oracle( wss_url: &str, rpc_timeout: Duration, global_store_update_tx: mpsc::Sender, - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: mpsc::Sender>>, key_store: KeyStore, logger: Logger, ) -> Vec> { @@ -418,7 +422,7 @@ struct Poller { data_tx: mpsc::Sender, /// Updates about permissioned price accounts from oracle to exporter - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: mpsc::Sender>>, /// The RPC client to use to poll data from the RPC node rpc_client: RpcClient, @@ -438,7 +442,7 @@ struct Poller { impl Poller { pub fn new( data_tx: mpsc::Sender, - publisher_permissions_tx: mpsc::Sender>>, + publisher_permissions_tx: mpsc::Sender>>, rpc_url: &str, rpc_timeout: Duration, commitment: CommitmentLevel, @@ -510,10 +514,8 @@ impl Poller { .entry(component.publisher) .or_insert(HashMap::new()); - let weekly_schedule = if let Some(prod_entry) = - product_accounts.get(&price_entry.prod) - { - prod_entry.weekly_schedule.clone() + let schedule = if let Some(prod_entry) = product_accounts.get(&price_entry.prod) { + prod_entry.schedule.clone() } else { warn!(&self.logger, "Oracle: INTERNAL: could not find product from price `prod` field, market hours falling back to 24/7."; "price" => price_key.to_string(), @@ -522,7 +524,7 @@ impl Poller { Default::default() }; - component_pub_entry.insert(*price_key, weekly_schedule); + component_pub_entry.insert(*price_key, schedule); } } @@ -610,7 +612,7 @@ impl Poller { let product = load_product_account(prod_acc.data.as_slice()) .context(format!("Could not parse product account {}", product_key))?; - let weekly_schedule: WeeklySchedule = if let Some((_wsched_key, wsched_val)) = + let legacy_schedule: LegacySchedule = if let Some((_wsched_key, wsched_val)) = product.iter().find(|(k, _v)| *k == "weekly_schedule") { wsched_val.parse().unwrap_or_else(|err| { @@ -627,11 +629,34 @@ impl Poller { Default::default() // No market hours specified, meaning 24/7 publishing }; + let market_schedule: Option = if let Some(( + _msched_key, + msched_val, + )) = + product.iter().find(|(k, _v)| *k == "schedule") + { + match msched_val.parse::() { + Ok(schedule) => Some(schedule), + Err(err) => { + warn!( + self.logger, + "Oracle: Product has schedule defined but it could not be parsed. Falling back to legacy schedule."; + "product_key" => product_key.to_string(), + "schedule" => msched_val, + ); + debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err)); + None + } + } + } else { + None + }; + product_entries.insert( *product_key, ProductEntry { - account_data: *product, - weekly_schedule, + account_data: *product, + schedule: market_schedule.unwrap_or_else(|| legacy_schedule.into()), price_accounts: vec![], }, );