Skip to content

Commit

Permalink
Query current deployments
Browse files Browse the repository at this point in the history
This is a small step toward making all network subgraph queries directly
from the gateway rather than relying on the gateway-agent.
  • Loading branch information
Theodus committed May 13, 2022
1 parent b4c9631 commit 110e6d6
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 64 deletions.
9 changes: 0 additions & 9 deletions graphql/current_deployments.gql

This file was deleted.

37 changes: 0 additions & 37 deletions src/agent_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub fn create(
indexings,
..
},
current_deployments,
deployment_indexers,
..
} = inputs;
Expand Down Expand Up @@ -84,15 +83,6 @@ pub fn create(
accept_empty,
),
);
create_sync_client::<CurrentDeployments, _, _>(
agent_url.clone(),
poll_interval,
current_deployments::OPERATION_NAME,
current_deployments::QUERY,
parse_current_deployments,
current_deployments,
accept_empty,
);
handle_indexers(
indexers,
deployment_indexers,
Expand Down Expand Up @@ -507,33 +497,6 @@ fn parse_cost_models(
Some(Ptr::new(parsed))
}

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "graphql/sync_agent_schema.gql",
query_path = "graphql/current_deployments.gql",
response_derives = "Debug"
)]
struct CurrentDeployments;

fn parse_current_deployments(
data: current_deployments::ResponseData,
) -> Option<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>> {
use current_deployments::{CurrentDeploymentsData, ResponseData};
let values = match data {
ResponseData {
data: Some(CurrentDeploymentsData { value, .. }),
} => value,
_ => return None,
};
let parsed = values.into_iter().filter_map(|value| {
Some((
value.subgraph.parse::<SubgraphID>().ok()?,
SubgraphDeploymentID::from_ipfs_hash(&value.deployment)?,
))
});
Some(Ptr::new(HashMap::from_iter(parsed)))
}

#[derive(GraphQLQuery)]
#[graphql(
schema_path = "graphql/sync_agent_schema.gql",
Expand Down
14 changes: 9 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod agent_client;
mod block_resolver;
mod ethereum_client;
mod fisherman_client;
Expand All @@ -6,12 +7,12 @@ mod indexer_selection;
mod ipfs_client;
mod kafka_client;
mod manifest_client;
mod network_subgraph;
mod opt;
mod prelude;
mod query_engine;
mod rate_limiter;
mod stats_db;
mod subgraph_client;
mod vouchers;
mod ws_client;
use crate::{
Expand Down Expand Up @@ -130,6 +131,8 @@ async fn main() {
let fisherman_client = opt
.fisherman
.map(|url| Arc::new(FishermanClient::new(http_client.clone(), url)));
let network_subgraph_data =
network_subgraph::Client::create(http_client.clone(), opt.network_subgraph.clone());
let subgraph_query_data = SubgraphQueryData {
config: query_engine::Config {
indexer_selection_retry_limit: opt.indexer_selection_retry_limit,
Expand All @@ -144,6 +147,7 @@ async fn main() {
},
block_resolvers: block_resolvers.clone(),
subgraph_info,
current_deployments: network_subgraph_data.current_deployments,
inputs: inputs.clone(),
api_keys,
stats_db,
Expand Down Expand Up @@ -300,7 +304,7 @@ async fn handle_ready(
#[derive(Clone)]
struct NetworkSubgraphQueryData {
http_client: reqwest::Client,
network_subgraph: String,
network_subgraph: Url,
network_subgraph_auth_token: String,
}

Expand All @@ -314,7 +318,7 @@ async fn handle_network_query(
let post_request = |body: String| async {
let response = data
.http_client
.post(&data.network_subgraph)
.post(data.network_subgraph.clone())
.body(body)
.header(header::CONTENT_TYPE.as_str(), "application/json")
.header(
Expand Down Expand Up @@ -352,6 +356,7 @@ struct SubgraphQueryData {
fisherman_client: Option<Arc<FishermanClient>>,
block_resolvers: Arc<HashMap<String, BlockResolver>>,
subgraph_info: SubgraphInfoMap,
current_deployments: Eventual<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
inputs: Inputs,
api_keys: Eventual<Ptr<HashMap<String, Arc<APIKey>>>>,
stats_db: mpsc::UnboundedSender<stats_db::Msg>,
Expand All @@ -376,8 +381,7 @@ impl SubgraphQueryData {
let subgraph = id
.parse::<SubgraphID>()
.map_err(|_| SubgraphResolutionError::InvalidSubgraphID(id.to_string()))?;
self.inputs
.current_deployments
self.current_deployments
.value_immediate()
.and_then(|map| map.get(&subgraph).cloned())
.ok_or_else(|| SubgraphResolutionError::SubgraphNotFound(id.to_string()))?
Expand Down
176 changes: 176 additions & 0 deletions src/network_subgraph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use crate::prelude::*;
use eventuals::{self, EventualExt as _};
use serde::Deserialize;
use serde_json::json;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use url::Url;

pub struct Data {
pub current_deployments: Eventual<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
}

pub struct Client {
network_subgraph: Url,
http_client: reqwest::Client,
latest_block: u64,
current_deployments: EventualWriter<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
}

impl Client {
pub fn create(http_client: reqwest::Client, network_subgraph: Url) -> Data {
let (current_deployments_tx, current_deployments_rx) = Eventual::new();
let client = Arc::new(Mutex::new(Client {
network_subgraph,
http_client,
latest_block: 0,
current_deployments: current_deployments_tx,
}));
eventuals::timer(Duration::from_secs(5))
.pipe_async(move |_| {
let client = client.clone();
async move {
let mut client = client.lock().await;
if let Err(poll_subgraphs_err) = client.poll_subgraphs().await {
tracing::error!(%poll_subgraphs_err);
}
}
})
.forever();
Data {
current_deployments: current_deployments_rx,
}
}

async fn poll_subgraphs(&mut self) -> Result<(), String> {
let response = self
.paginated_query::<Subgraph>(
r#"
subgraphs(
block: $block, skip: $skip, first: $first
where: { active: true }
) {
id
currentVersion { subgraphDeployment { ipfsHash } }
}
"#,
)
.await?;
let current_deployments = response
.into_iter()
.map(|subgraph| (subgraph.id, subgraph.current_version.subgraph_deployment.id))
.collect();
self.current_deployments
.write(Ptr::new(current_deployments));
Ok(())
}

async fn paginated_query<T: for<'de> Deserialize<'de>>(
&mut self,
query: &'static str,
) -> Result<Vec<T>, String> {
let batch_size: u32 = 1000;
let mut index: u32 = 0;
let mut query_block: Option<BlockPointer> = None;
let mut results = Vec::new();
loop {
let block = query_block
.as_ref()
.map(|block| json!({ "hash": block.hash }))
.unwrap_or(json!({ "number_gte": self.latest_block }));
let response = self
.http_client
.post(self.network_subgraph.clone())
.json(&json!({
"query": format!(r#"
query q($block: Block_height!, $skip: Int!, $first: Int!) {{
meta: _meta(block: $block) {{ block {{ number hash }} }}
results: {}
}}"#,
query,
),
"variables": {
"block": block,
"skip": index * batch_size,
"first": batch_size,
},
}))
.send()
.await
.map_err(|err| err.to_string())?
.json::<GraphQLResponse<PaginatedQueryResponse<T>>>()
.await
.map_err(|err| err.to_string())?;
let errors = response
.errors
.unwrap_or_default()
.into_iter()
.map(|err| err.message)
.collect::<Vec<String>>();
if errors
.iter()
.any(|err| err.contains("no block with that hash found"))
{
tracing::info!("Reorg detected. Restarting query to try a new block.");
index = 0;
query_block = None;
continue;
}
if !errors.is_empty() {
return Err(errors.join(", "));
}
let mut data = match response.data {
Some(data) if !data.results.is_empty() => data,
_ => break,
};
index += 1;
query_block = Some(data.meta.block);
results.append(&mut data.results);
}
if let Some(block) = query_block {
self.latest_block = block.number;
}
Ok(results)
}
}

#[derive(Deserialize)]
struct GraphQLResponse<T> {
data: Option<T>,
errors: Option<Vec<Error>>,
}

#[derive(Deserialize)]
struct Error {
message: String,
}

#[derive(Deserialize)]
struct Meta {
block: BlockPointer,
}

#[derive(Deserialize)]
struct PaginatedQueryResponse<T> {
meta: Meta,
results: Vec<T>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Subgraph {
id: SubgraphID,
current_version: CurrentVersion,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CurrentVersion {
subgraph_deployment: SubgraphDeployment,
}

#[derive(Deserialize)]
struct SubgraphDeployment {
#[serde(rename = "ipfsHash")]
id: SubgraphDeploymentID,
}
2 changes: 1 addition & 1 deletion src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct Opt {
long = "--network-subgraph",
env = "NETWORK_SUBGRAPH"
)]
pub network_subgraph: String,
pub network_subgraph: Url,
#[structopt(
help = "Network subgraph auth token",
long = "--network-subgraph-auth-token",
Expand Down
3 changes: 2 additions & 1 deletion src/prelude/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use prometheus::{
self,
core::{MetricVec, MetricVecBuilder},
};
use serde::Deserialize;
use siphasher::sip::SipHasher24;
use std::hash::{Hash, Hasher as _};
pub use std::{cmp::Ordering, fmt, str::FromStr};
Expand Down Expand Up @@ -131,7 +132,7 @@ pub type GRT = UDecimal<18>;
/// Decimal GRT Wei (10^-18 GRT)
pub type GRTWei = UDecimal<0>;

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
pub struct BlockPointer {
pub number: u64,
pub hash: Bytes32,
Expand Down
5 changes: 0 additions & 5 deletions src/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,28 @@ pub struct Config {
#[derive(Clone)]
pub struct Inputs {
pub indexers: Arc<Indexers>,
pub current_deployments: Eventual<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
pub deployment_indexers: Eventual<Ptr<HashMap<SubgraphDeploymentID, Vec<Address>>>>,
}

pub struct InputWriters {
pub indexer_inputs: indexer_selection::InputWriters,
pub indexers: Arc<Indexers>,
pub current_deployments: EventualWriter<Ptr<HashMap<SubgraphID, SubgraphDeploymentID>>>,
pub deployment_indexers: EventualWriter<Ptr<HashMap<SubgraphDeploymentID, Vec<Address>>>>,
}

impl Inputs {
pub fn new() -> (InputWriters, Self) {
let (indexer_input_writers, indexer_inputs) = Indexers::inputs();
let indexers = Arc::new(Indexers::new(indexer_inputs));
let (current_deployments_writer, current_deployments) = Eventual::new();
let (deployment_indexers_writer, deployment_indexers) = Eventual::new();
(
InputWriters {
indexer_inputs: indexer_input_writers,
indexers: indexers.clone(),
current_deployments: current_deployments_writer,
deployment_indexers: deployment_indexers_writer,
},
Inputs {
indexers,
current_deployments,
deployment_indexers,
},
)
Expand Down
6 changes: 0 additions & 6 deletions src/query_engine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,6 @@ impl Topology {
});
}
}
self.inputs.current_deployments.write(Ptr::new(
self.subgraphs
.iter()
.filter_map(|(_, subgraph)| Some((subgraph.id, subgraph.deployments.last()?.id)))
.collect(),
));
self.inputs.deployment_indexers.write(Ptr::new(
self.deployments()
.iter()
Expand Down

0 comments on commit 110e6d6

Please sign in to comment.