Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Update config periodically #16

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 49 additions & 21 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, str::FromStr, sync::Arc};
use std::{
collections::HashMap,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};

use arc_swap::{ArcSwap, Guard};
use starknet::{
Expand All @@ -13,6 +18,8 @@ use strum::{Display, EnumString, IntoStaticStr};
use tokio::sync::OnceCell;
use url::Url;

use crate::constants::CONFIG_UPDATE_INTERVAL;

#[derive(Debug, Clone, EnumString, IntoStaticStr)]
pub enum NetworkName {
#[strum(ascii_case_insensitive)]
Expand Down Expand Up @@ -102,6 +109,22 @@ impl Config {
}
}

pub async fn create_from_env() -> Config {
let network = std::env::var("NETWORK").expect("NETWORK must be set");
let oracle_address = std::env::var("ORACLE_ADDRESS").expect("ORACLE_ADDRESS must be set");
let spot_pairs = std::env::var("SPOT_PAIRS").expect("SPOT_PAIRS must be set");
let future_pairs = std::env::var("FUTURE_PAIRS").expect("FUTURE_PAIRS must be set");

Config::new(ConfigInput {
network: NetworkName::from_str(&network).expect("Invalid network name"),
oracle_address: FieldElement::from_hex_be(&oracle_address)
.expect("Invalid oracle address"),
spot_pairs: parse_pairs(&spot_pairs),
future_pairs: parse_pairs(&future_pairs),
})
.await
}

pub fn sources(&self, data_type: DataType) -> &HashMap<String, Vec<String>> {
&self.data_info.get(&data_type).unwrap().sources
}
Expand Down Expand Up @@ -135,7 +158,7 @@ impl Config {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConfigInput {
pub network: NetworkName,
pub oracle_address: FieldElement,
Expand All @@ -148,31 +171,36 @@ pub async fn get_config(config_input: Option<ConfigInput>) -> Guard<Arc<Config>>
.get_or_init(|| async {
match config_input {
Some(config_input) => ArcSwap::from_pointee(Config::new(config_input).await),
None => {
let network = std::env::var("NETWORK").expect("NETWORK must be set");
let oracle_address =
std::env::var("ORACLE_ADDRESS").expect("ORACLE_ADDRESS must be set");
let spot_pairs = std::env::var("SPOT_PAIRS").expect("SPOT_PAIRS must be set");
let future_pairs =
std::env::var("FUTURE_PAIRS").expect("FUTURE_PAIRS must be set");

ArcSwap::from_pointee(
Config::new(ConfigInput {
network: NetworkName::from_str(&network).expect("Invalid network name"),
oracle_address: FieldElement::from_hex_be(&oracle_address)
.expect("Invalid oracle address"),
spot_pairs: parse_pairs(&spot_pairs),
future_pairs: parse_pairs(&future_pairs),
})
.await,
)
}
None => ArcSwap::from_pointee(Config::create_from_env().await),
}
})
.await;
cfg.load()
}

/// This function is used to periodically update the configuration settings
/// from the environment variables. This is useful when we want to update the
/// configuration settings without restarting the service.
pub async fn periodic_config_update() {
let interval = Duration::from_secs(CONFIG_UPDATE_INTERVAL); // Set the update interval as needed (3 hours in this example)

let mut next_update = Instant::now() + interval;

loop {
let new_config = Config::create_from_env().await;
let updated_config = ArcSwap::from_pointee(new_config.clone());

let current_config_cell = CONFIG.get_or_init(|| async { updated_config }).await;

// Store the updated config in the ArcSwap
current_config_cell.store(new_config.into());

tokio::time::sleep_until(next_update.into()).await;

next_update += interval;
}
}

/// OnceCell only allows us to initialize the config once and that's how it should be on production.
/// However, when running tests, we often want to reinitialize because we want to clear the DB and
/// set it up again for reuse in new tests. By calling `config_force_init` we replace the already
Expand Down
2 changes: 2 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ lazy_static! {
pub const FEE_TOKEN_DECIMALS: i32 = 18;
pub const FEE_TOKEN_ADDRESS: &str =
"0x049d36570d4e46f48e99674bd3fcc84644ddd6b96f7c741b1562b82f9e004dc7";

pub const CONFIG_UPDATE_INTERVAL: u64 = 3 * 3600;
10 changes: 8 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate diesel;
extern crate dotenv;

use config::{get_config, DataType};
use config::{get_config, periodic_config_update, DataType};
use diesel_async::pooled_connection::deadpool::*;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::AsyncPgConnection;
Expand Down Expand Up @@ -44,7 +44,6 @@ async fn main() {
dotenv().ok();

// Define the pairs to monitor

let monitoring_config = get_config(None).await;

log::info!("Successfully fetched config: {:?}", monitoring_config);
Expand All @@ -62,12 +61,15 @@ async fn main() {

let api_monitoring = tokio::spawn(monitor_api());

let config_update = tokio::spawn(periodic_config_update());

// Wait for the monitoring to finish
let results = futures::future::join_all(vec![
spot_monitoring,
future_monitoring,
api_monitoring,
publisher_monitoring,
config_update,
])
.await;

Expand All @@ -84,6 +86,10 @@ async fn main() {
if let Err(e) = &results[3] {
log::error!("[PUBLISHERS] Monitoring failed: {:?}", e);
}

if let Err(e) = &results[4] {
log::error!("[CONFIG] Config Update failed: {:?}", e);
}
}

pub(crate) async fn monitor_api() {
Expand Down
Loading