Skip to content

Commit

Permalink
feat: use new market hours in pyth agent
Browse files Browse the repository at this point in the history
  • Loading branch information
keyvankhademi committed Apr 11, 2024
1 parent ab96151 commit 33521d9
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 39 deletions.
4 changes: 2 additions & 2 deletions integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"quote_currency": "USD",
"generic_symbol": "BTCUSD",
"description": "BTC/USD",
"holidays": f"{datetime.now().strftime('%m%d')}/O"
"schedule": f"America/New_York;O,O,O,O,O,O,O;{datetime.now().strftime('%m%d')}/O"
},
"metadata": {"jump_id": "78876709", "jump_symbol": "BTCUSD", "price_exp": -8, "min_publishers": 1},
}
Expand All @@ -77,7 +77,7 @@
"quote_currency": "USD",
"generic_symbol": "SOLUSD",
"description": "SOL/USD",
"holidays": f"{datetime.now().strftime('%m%d')}/C"
"schedule": f"America/New_York;O,O,O,O,O,O,O;{datetime.now().strftime('%m%d')}/C"
},
"metadata": {"jump_id": "78876711", "jump_symbol": "SOLUSD", "price_exp": -8, "min_publishers": 1},
}
Expand Down
25 changes: 23 additions & 2 deletions src/agent/market_schedule.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Holiday hours metadata parsing and evaluation logic
use {
super::schedule::market_hours::MHKind,
anyhow::{
anyhow,
Result,
Expand Down Expand Up @@ -43,6 +44,16 @@ pub struct MarketSchedule {
pub holidays: Vec<HolidayDaySchedule>,
}

impl Default for MarketSchedule {
fn default() -> Self {
Self {
timezone: Tz::UTC,
weekly_schedule: vec![ScheduleDayKind::Open; 7],
holidays: vec![],
}
}
}

impl MarketSchedule {
pub fn can_publish_at(&self, when: &DateTime<Utc>) -> bool {
let when_local = when.with_timezone(&self.timezone);
Expand Down Expand Up @@ -128,6 +139,14 @@ impl ScheduleDayKind {
Self::TimeRange(start, end) => start <= &when_local && &when_local <= end,
}
}

pub fn from_mhkind(mhkind: MHKind) -> Self {
match mhkind {
MHKind::Open => ScheduleDayKind::Open,
MHKind::Closed => ScheduleDayKind::Closed,
MHKind::TimeRange(start, end) => ScheduleDayKind::TimeRange(start, end),
}
}
}

impl Default for ScheduleDayKind {
Expand Down Expand Up @@ -168,8 +187,10 @@ impl FromStr for ScheduleDayKind {

#[cfg(test)]
mod tests {
use chrono::NaiveDateTime;
use super::*;
use {
super::*,
chrono::NaiveDateTime,
};

#[test]
fn test_parsing_schedule_day_kind() -> Result<()> {
Expand Down
16 changes: 7 additions & 9 deletions src/agent/solana/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ use {
network::Network,
},
crate::agent::{
market_schedule::MarketSchedule,
remote_keypair_loader::{
KeypairRequest,
RemoteKeypairLoader,
},
schedule::Schedule,
},
anyhow::{
anyhow,
Expand Down Expand Up @@ -172,7 +172,7 @@ pub fn spawn_exporter(
network: Network,
rpc_url: &str,
rpc_timeout: Duration,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
key_store: KeyStore,
local_store_tx: Sender<store::local::Message>,
global_store_tx: Sender<store::global::Lookup>,
Expand Down Expand Up @@ -260,10 +260,10 @@ pub struct Exporter {
inflight_transactions_tx: Sender<Signature>,

/// publisher => { permissioned_price => market hours } as read by the oracle module
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,

/// Currently known permissioned prices of this publisher along with their market hours
our_prices: HashMap<Pubkey, Schedule>,
our_prices: HashMap<Pubkey, MarketSchedule>,

/// Interval to update the dynamic price (if enabled)
dynamic_compute_unit_price_update_interval: Interval,
Expand All @@ -287,7 +287,7 @@ impl Exporter {
global_store_tx: Sender<store::global::Lookup>,
network_state_rx: watch::Receiver<NetworkState>,
inflight_transactions_tx: Sender<Signature>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
publisher_permissions_rx: mpsc::Receiver<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
keypair_request_tx: mpsc::Sender<KeypairRequest>,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -474,10 +474,8 @@ impl Exporter {
.into_iter()
.filter(|(id, _data)| {
let key_from_id = Pubkey::from((*id).clone().to_bytes());
if let Some(schedule) = self.our_prices.get_mut(&key_from_id) {
// let ret = schedule.market_hours.can_publish_at(&now);
schedule.holiday_hours.timezone = Some(schedule.market_hours.timezone);
let ret = schedule.market_hours.can_publish_at(&now) && schedule.holiday_hours.can_publish_at(now);
if let Some(schedule) = self.our_prices.get(&key_from_id) {
let ret = schedule.can_publish_at(&now);

if !ret {
debug!(self.logger, "Exporter: Attempted to publish price outside market hours";
Expand Down
64 changes: 38 additions & 26 deletions src/agent/solana/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use {
self::subscriber::Subscriber,
super::key_store::KeyStore,
crate::agent::{
schedule::{
holiday_hours::HolidaySchedule,
market_hours::WeeklySchedule,
Schedule,
market_schedule::{
MarketSchedule,
ScheduleDayKind,
},
schedule::market_hours::WeeklySchedule,
store::global,
},
anyhow::{
Expand Down Expand Up @@ -121,15 +121,15 @@ pub struct Data {
pub product_accounts: HashMap<Pubkey, ProductEntry>,
pub price_accounts: HashMap<Pubkey, PriceEntry>,
/// publisher => {their permissioned price accounts => market hours}
pub publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, Schedule>>,
pub publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>,
}

impl Data {
fn new(
mapping_accounts: HashMap<Pubkey, MappingAccount>,
product_accounts: HashMap<Pubkey, ProductEntry>,
price_accounts: HashMap<Pubkey, PriceEntry>,
publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, Schedule>>,
publisher_permissions: HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>,
) -> Self {
Data {
mapping_accounts,
Expand All @@ -144,7 +144,7 @@ pub type MappingAccount = pyth_sdk_solana::state::MappingAccount;
#[derive(Debug, Clone)]
pub struct ProductEntry {
pub account_data: pyth_sdk_solana::state::ProductAccount,
pub schedule: Schedule,
pub schedule: MarketSchedule,
pub price_accounts: Vec<Pubkey>,
}

Expand Down Expand Up @@ -207,7 +207,7 @@ pub fn spawn_oracle(
wss_url: &str,
rpc_timeout: Duration,
global_store_update_tx: mpsc::Sender<global::Update>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
key_store: KeyStore,
logger: Logger,
) -> Vec<JoinHandle<()>> {
Expand Down Expand Up @@ -422,7 +422,7 @@ struct Poller {
data_tx: mpsc::Sender<Data>,

/// Updates about permissioned price accounts from oracle to exporter
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,

/// The RPC client to use to poll data from the RPC node
rpc_client: RpcClient,
Expand All @@ -442,7 +442,7 @@ struct Poller {
impl Poller {
pub fn new(
data_tx: mpsc::Sender<Data>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, Schedule>>>,
publisher_permissions_tx: mpsc::Sender<HashMap<Pubkey, HashMap<Pubkey, MarketSchedule>>>,
rpc_url: &str,
rpc_timeout: Duration,
commitment: CommitmentLevel,
Expand Down Expand Up @@ -612,7 +612,7 @@ impl Poller {
let product = load_product_account(prod_acc.data.as_slice())
.context(format!("Could not parse product account {}", product_key))?;

let weekly_schedule: WeeklySchedule = if let Some((_wsched_key, wsched_val)) =
let legacy_schedule: WeeklySchedule = if let Some((_wsched_key, wsched_val)) =
product.iter().find(|(k, _v)| *k == "weekly_schedule")
{
wsched_val.parse().unwrap_or_else(|err| {
Expand All @@ -629,33 +629,45 @@ impl Poller {
Default::default() // No market hours specified, meaning 24/7 publishing
};

let holiday_schedule: HolidaySchedule = if let Some((_hsched_key, hsched_val)) =
product.iter().find(|(k, _v)| *k == "holidays")
let market_schedule: Option<MarketSchedule> = if let Some((
_msched_key,
msched_val,
)) =
product.iter().find(|(k, _v)| *k == "schedule")
{
hsched_val.parse().unwrap_or_else(|err| {
let schedule = msched_val.parse::<MarketSchedule>();
if schedule.is_ok() {
Some(schedule.unwrap())
} else {
warn!(
self.logger,
"Oracle: Product has weekly_schedule defined but it could not be parsed. Falling back to 24/7 publishing.";
"Oracle: Product has schedule defined but it could not be parsed. Falling back to legacy schedule.";
"product_key" => product_key.to_string(),
"holiday_schedule" => hsched_val,
"schedule" => msched_val,
);
debug!(self.logger, "parsing error context"; "context" => format!("{:?}", err));
Default::default()
})
None
}
} else {
Default::default() // No market hours specified, meaning 24/7 publishing
None
};

product_entries.insert(
*product_key,
ProductEntry {
account_data: *product,
schedule: {
Schedule {
market_hours: weekly_schedule,
holiday_hours: holiday_schedule,
}
},
schedule: market_schedule.unwrap_or_else(|| MarketSchedule {
timezone: legacy_schedule.timezone,
weekly_schedule: vec![
ScheduleDayKind::from_mhkind(legacy_schedule.mon),
ScheduleDayKind::from_mhkind(legacy_schedule.tue),
ScheduleDayKind::from_mhkind(legacy_schedule.wed),
ScheduleDayKind::from_mhkind(legacy_schedule.thu),
ScheduleDayKind::from_mhkind(legacy_schedule.fri),
ScheduleDayKind::from_mhkind(legacy_schedule.sat),
ScheduleDayKind::from_mhkind(legacy_schedule.sun),
],
holidays: vec![],
}),
price_accounts: vec![],
},
);
Expand Down

0 comments on commit 33521d9

Please sign in to comment.