Skip to content

Commit

Permalink
fix(long_tail_assets): fixes from review
Browse files Browse the repository at this point in the history
  • Loading branch information
akhercha committed Jul 26, 2024
1 parent 90f6197 commit b1842e0
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 183 deletions.
136 changes: 58 additions & 78 deletions src/processing/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,54 +35,48 @@ pub async fn process_data_by_pair(

let config = get_config(None).await;

let result: Result<FutureEntry, _> = match config.network().name {
let data: FutureEntry = match config.network().name {
NetworkName::Testnet => {
testnet_dsl::future_entry
.filter(testnet_dsl::pair_id.eq(pair.clone()))
.order(testnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
.await?
}
NetworkName::Mainnet => {
mainnet_dsl::mainnet_future_entry
.filter(mainnet_dsl::pair_id.eq(pair.clone()))
.order(mainnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
.await?
}
};

log::info!("Processing data for pair: {}", pair);

match result {
Ok(data) => {
let network_env = &config.network_str();
let data_type = "future";
let network_env = &config.network_str();
let data_type = "future";

let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]);
let num_sources_labels =
NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]);
let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PAIR_ID.with_label_values(&[network_env, &pair, data_type]);
let num_sources_labels = NUM_SOURCES.with_label_values(&[network_env, &pair, data_type]);

time_labels.set(seconds_since_last_publish as f64);
time_labels.set(seconds_since_last_publish as f64);

let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation(
pair.clone(),
data.timestamp.timestamp() as u64,
DataType::Future,
)
.await?;
let (on_off_deviation, num_sources_aggregated) = on_off_price_deviation(
pair.clone(),
data.timestamp.timestamp() as u64,
DataType::Future,
)
.await?;

ON_OFF_PRICE_DEVIATION
.with_label_values(&[network_env, &pair.clone(), data_type])
.set(on_off_deviation);
num_sources_labels.set(num_sources_aggregated as i64);
ON_OFF_PRICE_DEVIATION
.with_label_values(&[network_env, &pair.clone(), data_type])
.set(on_off_deviation);
num_sources_labels.set(num_sources_aggregated as i64);

Ok(seconds_since_last_publish)
}
Err(e) => Err(e.into()),
}
Ok(seconds_since_last_publish)
}

pub async fn process_data_by_pair_and_sources(
Expand Down Expand Up @@ -118,56 +112,50 @@ pub async fn process_data_by_pair_and_source(

let config = get_config(None).await;

let filtered_by_source_result: Result<FutureEntry, _> = match config.network().name {
let data: FutureEntry = match config.network().name {
NetworkName::Testnet => {
testnet_dsl::future_entry
.filter(testnet_dsl::pair_id.eq(pair))
.filter(testnet_dsl::source.eq(src))
.order(testnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
.await?
}
NetworkName::Mainnet => {
mainnet_dsl::mainnet_future_entry
.filter(mainnet_dsl::pair_id.eq(pair))
.filter(mainnet_dsl::source.eq(src))
.order(mainnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
.await?
}
};

match filtered_by_source_result {
Ok(data) => {
let network_env = &config.network_str();
let data_type = "future";

// Get the labels
let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]);
let deviation_labels =
PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]);
let source_deviation_labels =
PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]);

// Compute metrics
let time = time_since_last_update(&data);
let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price(
"Failed to convert price to f64".to_string(),
))?;
let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64;

let deviation = price_deviation(&data, normalized_price).await?;
let (source_deviation, _) = source_deviation(&data, normalized_price).await?;

// Set the metrics
price_labels.set(normalized_price);
deviation_labels.set(deviation);
source_deviation_labels.set(source_deviation);

Ok(time)
}
Err(e) => Err(e.into()),
}
let network_env = &config.network_str();
let data_type = "future";

// Get the labels
let price_labels = PAIR_PRICE.with_label_values(&[network_env, pair, src, data_type]);
let deviation_labels = PRICE_DEVIATION.with_label_values(&[network_env, pair, src, data_type]);
let source_deviation_labels =
PRICE_DEVIATION_SOURCE.with_label_values(&[network_env, pair, src, data_type]);

// Compute metrics
let time = time_since_last_update(&data);
let price_as_f64 = data.price.to_f64().ok_or(MonitoringError::Price(
"Failed to convert price to f64".to_string(),
))?;
let normalized_price = price_as_f64 / (10_u64.pow(decimals)) as f64;

let deviation = price_deviation(&data, normalized_price).await?;
let (source_deviation, _) = source_deviation(&data, normalized_price).await?;

// Set the metrics
price_labels.set(normalized_price);
deviation_labels.set(deviation);
source_deviation_labels.set(source_deviation);

Ok(time)
}

pub async fn process_data_by_publisher(
Expand All @@ -178,40 +166,32 @@ pub async fn process_data_by_publisher(

let config = get_config(None).await;

let result: Result<FutureEntry, _> = match config.network().name {
let data: FutureEntry = match config.network().name {
NetworkName::Testnet => {
testnet_dsl::future_entry
.filter(testnet_dsl::publisher.eq(publisher.clone()))
.order(testnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
.await?
}
NetworkName::Mainnet => {
mainnet_dsl::mainnet_future_entry
.filter(mainnet_dsl::publisher.eq(publisher.clone()))
.order(mainnet_dsl::block_timestamp.desc())
.first(&mut conn)
.await
.await?
}
};

log::info!("Processing data for publisher: {}", publisher);

match result {
Ok(data) => {
let network_env = &config.network_str();
let network_env = &config.network_str();

let seconds_since_last_publish = time_since_last_update(&data);
let time_labels = TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[
network_env,
&publisher,
"future",
]);
let seconds_since_last_publish = time_since_last_update(&data);
let time_labels =
TIME_SINCE_LAST_UPDATE_PUBLISHER.with_label_values(&[network_env, &publisher, "future"]);

time_labels.set(seconds_since_last_publish as f64);
time_labels.set(seconds_since_last_publish as f64);

Ok(())
}
Err(e) => Err(e.into()),
}
Ok(())
}
Loading

0 comments on commit b1842e0

Please sign in to comment.