Skip to content

Commit

Permalink
refactor: flatten rpc server types
Browse files Browse the repository at this point in the history
  • Loading branch information
Reisen committed Apr 17, 2024
1 parent 00d31d1 commit ca9c5fb
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 156 deletions.
6 changes: 3 additions & 3 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ impl Agent {
));

// Spawn the Pythd API Server
jhs.push(rpc::spawn_server(
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
logger.clone(),
pythd_adapter_tx,
shutdown_rx,
logger.clone(),
));
)));

// Spawn the metrics server
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
Expand Down
170 changes: 83 additions & 87 deletions src/agent/market_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ use {
Utc,
},
chrono_tz::Tz,
proptest::{
arbitrary::any,
prop_compose,
proptest,
},
std::{
fmt::Display,
str::FromStr,
Expand All @@ -42,7 +37,6 @@ use {
},
};


/// Helper time value representing 24:00:00 as 00:00:00 minus 1
/// nanosecond (underflowing to 23:59:59.999(...) ). While chrono
/// has this value internally exposed as NaiveTime::MAX, it is not
Expand All @@ -51,7 +45,6 @@ const MAX_TIME_INSTANT: NaiveTime = NaiveTime::MIN
.overflowing_sub_signed(Duration::nanoseconds(1))
.0;


#[derive(Clone, Debug, Eq, PartialEq)]
pub struct MarketSchedule {
pub timezone: Tz,
Expand Down Expand Up @@ -154,7 +147,6 @@ impl From<LegacySchedule> for MarketSchedule {
}
}


#[derive(Clone, Debug, Eq, PartialEq)]
pub struct HolidayDaySchedule {
pub month: u32,
Expand Down Expand Up @@ -198,7 +190,6 @@ impl Display for HolidayDaySchedule {
}
}


#[derive(Clone, Debug, Eq, PartialEq, Copy)]
pub enum ScheduleDayKind {
Open,
Expand Down Expand Up @@ -286,6 +277,11 @@ mod tests {
use {
super::*,
chrono::NaiveDateTime,
proptest::{
arbitrary::any,
prop_compose,
proptest,
},
};

#[test]
Expand Down Expand Up @@ -478,98 +474,98 @@ mod tests {

Ok(())
}
}

prop_compose! {
fn schedule_day_kind()(
r in any::<u8>(),
t1 in any::<u32>(),
t2 in any::<u32>(),
) -> ScheduleDayKind {
match r % 3 {
0 => ScheduleDayKind::Open,
1 => ScheduleDayKind::Closed,
_ => ScheduleDayKind::TimeRange(
NaiveTime::from_hms_opt(t1 % 24, t1 / 24 % 60, 0).unwrap(),
NaiveTime::from_hms_opt(t2 % 24, t2 / 24 % 60, 0).unwrap(),
),
prop_compose! {
fn schedule_day_kind()(
r in any::<u8>(),
t1 in any::<u32>(),
t2 in any::<u32>(),
) -> ScheduleDayKind {
match r % 3 {
0 => ScheduleDayKind::Open,
1 => ScheduleDayKind::Closed,
_ => ScheduleDayKind::TimeRange(
NaiveTime::from_hms_opt(t1 % 24, t1 / 24 % 60, 0).unwrap(),
NaiveTime::from_hms_opt(t2 % 24, t2 / 24 % 60, 0).unwrap(),
),
}
}
}
}

prop_compose! {
fn holiday_day_schedule()(
m in 1..=12u32,
d in 1..=31u32,
s in schedule_day_kind(),
) -> HolidayDaySchedule {
HolidayDaySchedule {
month: m,
day: d,
kind: s,
prop_compose! {
fn holiday_day_schedule()(
m in 1..=12u32,
d in 1..=31u32,
s in schedule_day_kind(),
) -> HolidayDaySchedule {
HolidayDaySchedule {
month: m,
day: d,
kind: s,
}
}
}
}

prop_compose! {
fn market_schedule()(
tz in proptest::sample::select(vec![
Tz::UTC,
Tz::America__New_York,
Tz::America__Los_Angeles,
Tz::America__Chicago,
Tz::Singapore,
Tz::Australia__Sydney,
]),
weekly_schedule in proptest::collection::vec(schedule_day_kind(), 7..=7),
holidays in proptest::collection::vec(holiday_day_schedule(), 0..12),
) -> MarketSchedule {
MarketSchedule {
timezone: tz,
weekly_schedule,
holidays,
prop_compose! {
fn market_schedule()(
tz in proptest::sample::select(vec![
Tz::UTC,
Tz::America__New_York,
Tz::America__Los_Angeles,
Tz::America__Chicago,
Tz::Singapore,
Tz::Australia__Sydney,
]),
weekly_schedule in proptest::collection::vec(schedule_day_kind(), 7..=7),
holidays in proptest::collection::vec(holiday_day_schedule(), 0..12),
) -> MarketSchedule {
MarketSchedule {
timezone: tz,
weekly_schedule,
holidays,
}
}
}
}

// Matches C or O or hhmm-hhmm with 24-hour time
const VALID_SCHEDULE_DAY_KIND_REGEX: &str =
"C|O|([01][1-9]|2[0-3])([0-5][0-9])-([01][1-9]|2[0-3])([0-5][0-9])";
// Matches C or O or hhmm-hhmm with 24-hour time
const VALID_SCHEDULE_DAY_KIND_REGEX: &str =
"C|O|([01][1-9]|2[0-3])([0-5][0-9])-([01][1-9]|2[0-3])([0-5][0-9])";

// Matches MMDD with MM and DD being 01-12 and 01-31 respectively
const VALID_MONTH_DAY_REGEX: &str = "(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])";
// Matches MMDD with MM and DD being 01-12 and 01-31 respectively
const VALID_MONTH_DAY_REGEX: &str = "(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])";

proptest!(
#[test]
fn doesnt_crash(s in "\\PC*") {
_ = s.parse::<MarketSchedule>();
_ = s.parse::<HolidayDaySchedule>();
_ = s.parse::<ScheduleDayKind>();
}
proptest!(
#[test]
fn doesnt_crash(s in "\\PC*") {
_ = s.parse::<MarketSchedule>();
_ = s.parse::<HolidayDaySchedule>();
_ = s.parse::<ScheduleDayKind>();
}

#[test]
fn parse_valid_schedule_day_kind(s in VALID_SCHEDULE_DAY_KIND_REGEX) {
assert!(s.parse::<ScheduleDayKind>().is_ok());
}
#[test]
fn parse_valid_schedule_day_kind(s in VALID_SCHEDULE_DAY_KIND_REGEX) {
assert!(s.parse::<ScheduleDayKind>().is_ok());
}

#[test]
fn test_valid_schedule_day_kind(s in schedule_day_kind()) {
assert_eq!(s, s.to_string().parse::<ScheduleDayKind>().unwrap());
}
#[test]
fn test_valid_schedule_day_kind(s in schedule_day_kind()) {
assert_eq!(s, s.to_string().parse::<ScheduleDayKind>().unwrap());
}

#[test]
fn parse_valid_holiday_day_schedule(s in VALID_SCHEDULE_DAY_KIND_REGEX, d in VALID_MONTH_DAY_REGEX) {
let valid_holiday_day = format!("{}/{}", d, s);
assert!(valid_holiday_day.parse::<HolidayDaySchedule>().is_ok());
}
#[test]
fn parse_valid_holiday_day_schedule(s in VALID_SCHEDULE_DAY_KIND_REGEX, d in VALID_MONTH_DAY_REGEX) {
let valid_holiday_day = format!("{}/{}", d, s);
assert!(valid_holiday_day.parse::<HolidayDaySchedule>().is_ok());
}

#[test]
fn test_valid_holiday_day_schedule(s in holiday_day_schedule()) {
assert_eq!(s, s.to_string().parse::<HolidayDaySchedule>().unwrap());
}
#[test]
fn test_valid_holiday_day_schedule(s in holiday_day_schedule()) {
assert_eq!(s, s.to_string().parse::<HolidayDaySchedule>().unwrap());
}

#[test]
fn test_valid_market_schedule(s in market_schedule()) {
assert_eq!(s, s.to_string().parse::<MarketSchedule>().unwrap());
}
);
#[test]
fn test_valid_market_schedule(s in market_schedule()) {
assert_eq!(s, s.to_string().parse::<MarketSchedule>().unwrap());
}
);
}
97 changes: 35 additions & 62 deletions src/agent/pythd/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,10 @@ pub mod rpc {
fmt::Debug,
net::SocketAddr,
},
tokio::{
sync::{
broadcast,
mpsc,
oneshot,
},
task::JoinHandle,
tokio::sync::{
broadcast,
mpsc,
oneshot,
},
warp::{
ws::{
Expand Down Expand Up @@ -584,53 +581,33 @@ pub mod rpc {
}
}

pub fn spawn_server(
pub async fn run(
config: Config,
logger: Logger,
adapter_tx: mpsc::Sender<adapter::Message>,
shutdown_rx: broadcast::Receiver<()>,
logger: Logger,
) -> JoinHandle<()> {
tokio::spawn(async move {
Server::new(adapter_tx, config, logger)
.run(shutdown_rx)
.await
})
) {
if let Err(err) = serve(config, &logger, adapter_tx, shutdown_rx).await {
error!(logger, "{}", err);
debug!(logger, "error context"; "context" => format!("{:?}", err));
}
}

pub struct Server {
async fn serve(
config: Config,
logger: &Logger,
adapter_tx: mpsc::Sender<adapter::Message>,
config: Config,
logger: Logger,
}

impl Server {
pub fn new(
adapter_tx: mpsc::Sender<adapter::Message>,
config: Config,
logger: Logger,
) -> Self {
Server {
adapter_tx,
config,
logger,
}
}

pub async fn run(&self, shutdown_rx: broadcast::Receiver<()>) {
if let Err(err) = self.serve(shutdown_rx).await {
error!(self.logger, "{}", err);
debug!(self.logger, "error context"; "context" => format!("{:?}", err));
}
}

async fn serve(&self, mut shutdown_rx: broadcast::Receiver<()>) -> Result<()> {
let adapter_tx = self.adapter_tx.clone();
let config = self.config.clone();
let with_logger = WithLogger {
logger: self.logger.clone(),
};
mut shutdown_rx: broadcast::Receiver<()>,
) -> Result<()> {
let adapter_tx = adapter_tx.clone();
let config = config.clone();
let with_logger = WithLogger {
logger: logger.clone(),
};

let index = warp::path::end()
let index = {
let config = config.clone();
warp::path::end()
.and(warp::ws())
.and(warp::any().map(move || adapter_tx.clone()))
.and(warp::any().map(move || with_logger.clone()))
Expand All @@ -654,19 +631,19 @@ pub mod rpc {
.await
})
},
);
)
};

let (_, serve) = warp::serve(index).bind_with_graceful_shutdown(
self.config.listen_address.as_str().parse::<SocketAddr>()?,
async move {
let _ = shutdown_rx.recv().await;
},
);
let (_, serve) = warp::serve(index).bind_with_graceful_shutdown(
config.listen_address.as_str().parse::<SocketAddr>()?,
async move {
let _ = shutdown_rx.recv().await;
},
);

info!(self.logger, "starting api server"; "listen address" => self.config.listen_address.clone());
info!(logger, "starting api server"; "listen address" => config.listen_address.clone());

tokio::task::spawn(serve).await.map_err(|e| e.into())
}
tokio::task::spawn(serve).await.map_err(|e| e.into())
}

#[cfg(test)]
Expand All @@ -685,7 +662,6 @@ pub mod rpc {
SubscriptionID,
},
Config,
Server,
},
crate::agent::pythd::{
adapter,
Expand Down Expand Up @@ -819,10 +795,7 @@ pub mod rpc {
listen_address: format!("127.0.0.1:{:}", listen_port),
..Default::default()
};
let server = Server::new(adapter_tx, config, logger);
let jh = tokio::spawn(async move {
server.run(shutdown_rx).await;
});
let jh = tokio::spawn(super::run(config, logger, adapter_tx, shutdown_rx));
let test_server = TestServer { shutdown_tx, jh };

// Create a test client to interact with the server
Expand Down
Loading

0 comments on commit ca9c5fb

Please sign in to comment.