Skip to content

Commit

Permalink
Move cost model syncing from the agent (#181)
Browse files Browse the repository at this point in the history
This PR implements syncing cost models as part of indexer status polling.
  • Loading branch information
Theodus authored Jul 18, 2022
1 parent fe7300d commit b64f044
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 360 deletions.
16 changes: 0 additions & 16 deletions graphql/cost_models.gql

This file was deleted.

104 changes: 2 additions & 102 deletions src/agent_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::{
indexer_selection::{
CostModelSource, IndexerPreferences, Indexing, IndexingData, SelectionFactors,
},
prelude::{shared_lookup::SharedLookupWriter, *},
indexer_selection::IndexerPreferences,
prelude::*,
query_engine::{APIKey, VolumeEstimator},
};
use eventuals::EventualExt as _;
use graphql_client::{GraphQLQuery, Response};
use lazy_static::lazy_static;
use reqwest;
Expand All @@ -19,7 +16,6 @@ pub fn create(
poll_interval: Duration,
slashing_percentage: EventualWriter<PPM>,
usd_to_grt_conversion: EventualWriter<USD>,
indexings: Arc<Mutex<SharedLookupWriter<Indexing, SelectionFactors, IndexingData>>>,
api_keys: EventualWriter<Ptr<HashMap<String, Arc<APIKey>>>>,
accept_empty: bool,
) {
Expand Down Expand Up @@ -53,43 +49,6 @@ pub fn create(
slashing_percentage,
accept_empty,
);
handle_cost_models(
indexings.clone(),
create_sync_client_input::<CostModels, _>(
agent_url.clone(),
poll_interval,
cost_models::OPERATION_NAME,
cost_models::QUERY,
parse_cost_models,
accept_empty,
),
);
}

fn create_sync_client_input<Q, T>(
agent_url: String,
poll_interval: Duration,
operation: &'static str,
query: &'static str,
parse_data: fn(Q::ResponseData) -> Option<T>,
accept_empty: bool,
) -> Eventual<T>
where
T: 'static + Clone + Eq + Send,
Q: GraphQLQuery,
Q::ResponseData: 'static,
{
let (writer, reader) = Eventual::new();
create_sync_client::<Q, T, _>(
agent_url,
poll_interval,
operation,
query,
parse_data,
writer,
accept_empty,
);
reader
}

fn create_sync_client<Q, T, F>(
Expand Down Expand Up @@ -402,45 +361,6 @@ fn parse_conversion_rates(data: conversion_rates::ResponseData) -> Option<GRT> {
}
}

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "graphql/sync_agent_schema.gql",
query_path = "graphql/cost_models.gql",
response_derives = "Debug"
)]
struct CostModels;

fn parse_cost_models(
data: cost_models::ResponseData,
) -> Option<Ptr<Vec<(Indexing, CostModelSource)>>> {
use cost_models::{CostModelsData, ResponseData};
let values = match data {
ResponseData {
data: Some(CostModelsData { value, .. }),
} => value,
_ => return None,
};
let parsed = values
.into_iter()
.flat_map(|value| {
let deployment = SubgraphDeploymentID::from_ipfs_hash(&value.deployment);
value.cost_models.into_iter().filter_map(move |model| {
Some((
Indexing {
deployment: deployment?,
indexer: model.indexer.id.parse().ok()?,
},
CostModelSource {
model: model.model?,
globals: model.variables.unwrap_or_default(),
},
))
})
})
.collect();
Some(Ptr::new(parsed))
}

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "graphql/sync_agent_schema.gql",
Expand All @@ -463,26 +383,6 @@ fn parse_network_parameters(data: network_parameters::ResponseData) -> Option<PP
}
}

fn handle_cost_models(
indexings: Arc<Mutex<SharedLookupWriter<Indexing, SelectionFactors, IndexingData>>>,
cost_models: Eventual<Ptr<Vec<(Indexing, CostModelSource)>>>,
) {
cost_models
.pipe_async(move |cost_models| {
let indexings = indexings.clone();
async move {
tracing::trace!(cost_models = %cost_models.len());
let mut locked = indexings.lock().await;
for (indexing, model) in cost_models.iter() {
let writer = locked.write(&indexing).await;
writer.cost_model.write(model.clone());
}
}
.instrument(tracing::info_span!("handle_cost_models"))
})
.forever();
}

#[derive(Clone)]
pub struct Metrics {
pub queries: ResponseMetricVecs,
Expand Down
13 changes: 13 additions & 0 deletions src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,16 @@ pub struct Response<T> {
pub struct Error {
pub message: String,
}

impl<T> Response<T> {
pub fn unpack(self) -> Result<T, String> {
self.data.ok_or_else(|| {
self.errors
.unwrap_or_default()
.into_iter()
.map(|err| err.message)
.collect::<Vec<String>>()
.join(", ")
})
}
}
3 changes: 1 addition & 2 deletions src/indexer_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub use crate::indexer_selection::{
allocations::{Allocations, QueryStatus, Receipt},
block_requirements::BlockRequirements,
indexers::{IndexerDataReader, IndexerDataWriter},
price_efficiency::CostModelSource,
selection_factors::{IndexingData, IndexingStatus, SelectionFactors},
};
use crate::{
Expand All @@ -32,6 +31,7 @@ use crate::{
},
};
use cost_model;
pub use cost_model::CostModel;
use lazy_static::lazy_static;
use num_traits::identities::Zero as _;
pub use ordered_float::NotNan;
Expand Down Expand Up @@ -84,7 +84,6 @@ pub enum BadIndexerReason {
MissingIndexerStake,
BehindMinimumBlock,
MissingIndexingStatus,
MissingCostModel,
QueryNotCosted,
FeeTooHigh,
NaN,
Expand Down
Loading

0 comments on commit b64f044

Please sign in to comment.