Skip to content

Commit

Permalink
✨ Update config periodically (#16)
Browse files Browse the repository at this point in the history
* init

* fix: cleanup code

---------

Co-authored-by: 0xevolve <[email protected]>
  • Loading branch information
JordyRo1 and EvolveArt authored Feb 6, 2024
1 parent 1a4b87a commit 1f7c567
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
70 changes: 49 additions & 21 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, str::FromStr, sync::Arc};
use std::{
collections::HashMap,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

use arc_swap::{ArcSwap, Guard};
use starknet::{
Expand All @@ -13,6 +18,8 @@ use strum::{Display, EnumString, IntoStaticStr};
use tokio::sync::OnceCell;
use url::Url;

use crate::constants::CONFIG_UPDATE_INTERVAL;

#[derive(Debug, Clone, EnumString, IntoStaticStr)]
pub enum NetworkName {
#[strum(ascii_case_insensitive)]
Expand Down Expand Up @@ -102,6 +109,22 @@ impl Config {
}
}

pub async fn create_from_env() -> Config {
let network = std::env::var("NETWORK").expect("NETWORK must be set");
let oracle_address = std::env::var("ORACLE_ADDRESS").expect("ORACLE_ADDRESS must be set");
let spot_pairs = std::env::var("SPOT_PAIRS").expect("SPOT_PAIRS must be set");
let future_pairs = std::env::var("FUTURE_PAIRS").expect("FUTURE_PAIRS must be set");

Config::new(ConfigInput {
network: NetworkName::from_str(&network).expect("Invalid network name"),
oracle_address: FieldElement::from_hex_be(&oracle_address)
.expect("Invalid oracle address"),
spot_pairs: parse_pairs(&spot_pairs),
future_pairs: parse_pairs(&future_pairs),
})
.await
}

pub fn sources(&self, data_type: DataType) -> &HashMap<String, Vec<String>> {
&self.data_info.get(&data_type).unwrap().sources
}
Expand Down Expand Up @@ -135,7 +158,7 @@ impl Config {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConfigInput {
pub network: NetworkName,
pub oracle_address: FieldElement,
Expand All @@ -148,31 +171,36 @@ pub async fn get_config(config_input: Option<ConfigInput>) -> Guard<Arc<Config>>
.get_or_init(|| async {
match config_input {
Some(config_input) => ArcSwap::from_pointee(Config::new(config_input).await),
None => {
let network = std::env::var("NETWORK").expect("NETWORK must be set");
let oracle_address =
std::env::var("ORACLE_ADDRESS").expect("ORACLE_ADDRESS must be set");
let spot_pairs = std::env::var("SPOT_PAIRS").expect("SPOT_PAIRS must be set");
let future_pairs =
std::env::var("FUTURE_PAIRS").expect("FUTURE_PAIRS must be set");

ArcSwap::from_pointee(
Config::new(ConfigInput {
network: NetworkName::from_str(&network).expect("Invalid network name"),
oracle_address: FieldElement::from_hex_be(&oracle_address)
.expect("Invalid oracle address"),
spot_pairs: parse_pairs(&spot_pairs),
future_pairs: parse_pairs(&future_pairs),
})
.await,
)
}
None => ArcSwap::from_pointee(Config::create_from_env().await),
}
})
.await;
cfg.load()
}

/// This function is used to periodically update the configuration settings
/// from the environment variables. This is useful when we want to update the
/// configuration settings without restarting the service.
pub async fn periodic_config_update() {
let interval = Duration::from_secs(CONFIG_UPDATE_INTERVAL); // Set the update interval as needed (3 hours in this example)

let mut next_update = Instant::now() + interval;

loop {
let new_config = Config::create_from_env().await;
let updated_config = ArcSwap::from_pointee(new_config.clone());

let current_config_cell = CONFIG.get_or_init(|| async { updated_config }).await;

// Store the updated config in the ArcSwap
current_config_cell.store(new_config.into());

tokio::time::sleep_until(next_update.into()).await;

next_update += interval;
}
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
Expand Down
2 changes: 2 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ lazy_static! {
pub const FEE_TOKEN_DECIMALS: i32 = 18;
pub const FEE_TOKEN_ADDRESS: &str =
"0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7";

pub const CONFIG_UPDATE_INTERVAL: u64 = 3 * 3600;
10 changes: 8 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate diesel;
extern crate dotenv;

use config::{get_config, DataType};
use config::{get_config, periodic_config_update, DataType};
use diesel_async::pooled_connection::deadpool::*;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
Expand Down Expand Up @@ -44,7 +44,6 @@ async fn main() {
dotenv().ok();

// Define the pairs to monitor

let monitoring_config = get_config(None).await;

log::info!("Successfully fetched config: {:?}", monitoring_config);
Expand All @@ -62,12 +61,15 @@ async fn main() {

let api_monitoring = tokio::spawn(monitor_api());

let config_update = tokio::spawn(periodic_config_update());

// Wait for the monitoring to finish
let results = futures::future::join_all(vec![
spot_monitoring,
future_monitoring,
api_monitoring,
publisher_monitoring,
config_update,
])
.await;

Expand All @@ -84,6 +86,10 @@ async fn main() {
if let Err(e) = &results[3] {
log::error!("[PUBLISHERS] Monitoring failed: {:?}", e);
}

if let Err(e) = &results[4] {
log::error!("[CONFIG] Config Update failed: {:?}", e);
}
}

pub(crate) async fn monitor_api() {
Expand Down

0 comments on commit 1f7c567

Please sign in to comment.