Skip to content

Commit

Permalink
refactor(graph-gateway): update subgraph client with version query (#422
Browse files Browse the repository at this point in the history
)
  • Loading branch information
LNSD authored Nov 21, 2023
1 parent 47f4dbe commit 4d74f07
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 45 deletions.
54 changes: 27 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ rdkafka = { version = "0.36.0", features = ["gssapi", "tracing"] }
receipts = { git = "ssh://[email protected]/edgeandnode/receipts.git", rev = "89a821c" }
reqwest.workspace = true
secp256k1 = { version = "0.24", default-features = false }
semver = "1.0"
semver = { version = "1.0", features = ["serde"] }
serde.workspace = true
serde_json = { version = "1.0", features = ["raw_value"] }
serde_with = "3.1"
Expand Down
1 change: 1 addition & 0 deletions graph-gateway/src/indexers_status.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod cost_models;
pub mod indexing_statuses;
pub mod public_poi;
pub mod version;
50 changes: 50 additions & 0 deletions graph-gateway/src/indexers_status/version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
pub use semver::Version;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
pub struct IndexerVersion {
pub version: Version,
}

pub mod client {
use toolshed::url::Url;

use super::IndexerVersion;

/// Sends a version query to the indexer and returns the version.
pub async fn send_version_query(
client: reqwest::Client,
version_url: Url,
) -> anyhow::Result<IndexerVersion> {
let version = client
.get(version_url.0)
.send()
.await
.map_err(|err| anyhow::anyhow!("IndexerVersionError({err})"))?
.json::<IndexerVersion>()
.await
.map_err(|err| anyhow::anyhow!("IndexerVersionError({err})"))?;

Ok(version)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn deserialize_indexer_version_json() {
//// Given
let json = r#"{
"version": "0.1.0"
}"#;

//// When
let version: IndexerVersion =
serde_json::from_str(json).expect("Failed to deserialize IndexerVersion");

//// Then
assert_eq!(version.version, Version::new(0, 1, 0));
}
}
31 changes: 14 additions & 17 deletions graph-gateway/src/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use futures::future::join_all;
use indexer_selection::Indexing;
use prelude::epoch_cache::EpochCache;
use semver::Version;
use serde::Deserialize;
use tokio::sync::Mutex;
use toolshed::thegraph::{BlockPointer, DeploymentId};
use toolshed::url::{url::Host, Url};
Expand All @@ -16,6 +15,7 @@ use trust_dns_resolver::TokioAsyncResolver as DNSResolver;
use crate::geoip::GeoIP;
use crate::indexers_status::cost_models::{self, CostModelQuery, CostModelSourceResponse};
use crate::indexers_status::indexing_statuses::{self, IndexingStatusResponse};
use crate::indexers_status::version;
use crate::topology::Deployment;

pub struct IndexingStatus {
Expand Down Expand Up @@ -112,17 +112,7 @@ async fn update_indexer(
let version_url = url
.join("version")
.map_err(|err| format!("IndexerVersionError({err})"))?;
let version = client
.get(version_url)
.send()
.await
.map_err(|err| format!("IndexerVersionError({err})"))?
.json::<IndexerVersion>()
.await
.map_err(|err| format!("IndexerVersionError({err})"))?
.version
.parse::<Version>()
.map_err(|err| format!("IndexerVersionError({err})"))?;
let version = query_indexer_version(client.clone(), version_url.into()).await?;

let mut locked_actor = actor.lock().await;
if version < locked_actor.min_version {
Expand Down Expand Up @@ -200,6 +190,18 @@ async fn query_indexer_for_cost_models(
.map(|res| res.cost_models)
}

/// Convenience wrapper method around [`client::send_version_query`] to map the result
/// types to the expected by [`update_indexer`] method.
async fn query_indexer_version(
client: reqwest::Client,
version_url: Url,
) -> Result<Version, String> {
version::client::send_version_query(client, version_url)
.await
.map_err(|err| err.to_string())
.map(|res| res.version)
}

async fn query_status(
actor: &'static Mutex<Actor>,
client: &reqwest::Client,
Expand Down Expand Up @@ -298,8 +300,3 @@ struct CostModelSource {
model: String,
variables: String,
}

#[derive(Deserialize)]
struct IndexerVersion {
version: String,
}
27 changes: 27 additions & 0 deletions graph-gateway/tests/it_indexers_status_version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::time::Duration;

use assert_matches::assert_matches;
use tokio::time::timeout;

use graph_gateway::indexers_status::version::client;

#[tokio::test]
async fn query_indexer_public_pois() {
//// Given
let client = reqwest::Client::new();
let version_url = "https://testnet-indexer-03-europe-cent.thegraph.com/version"
.parse()
.expect("Invalid version url");

//// When
let request = client::send_version_query(client, version_url);
let response = timeout(Duration::from_secs(60), request)
.await
.expect("timeout");

//// Then
// Assert version is present and greater than 0.1.0
assert_matches!(response, Ok(resp) => {
assert!(resp.version > semver::Version::new(0, 1, 0));
});
}

0 comments on commit 4d74f07

Please sign in to comment.