From a83082a7520caf329416095420872ad2bb66b24e Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 5 Sep 2024 13:40:06 -0700 Subject: [PATCH] implement recursive auto sync for grafts --- chain/ethereum/src/chain.rs | 12 + core/src/subgraph/registrar.rs | 218 +++++++++++++++--- graph/src/blockchain/block_stream.rs | 8 + graph/src/components/store/traits.rs | 2 + graph/src/data/subgraph/mod.rs | 44 ++++ node/resources/tests/full_config.toml | 1 + node/src/chain.rs | 1 + node/src/config.rs | 6 + node/src/opt.rs | 9 + node/src/store_builder.rs | 1 + store/postgres/src/subgraph_store.rs | 8 + .../auto-graft-sync/abis/Contract.abi | 33 +++ .../runner-tests/auto-graft-sync/config.toml | 20 ++ .../generateAndPublishManifestFromTemplate.js | 21 ++ .../auto-graft-sync/graft-base.yaml | 63 +++++ .../runner-tests/auto-graft-sync/package.json | 13 ++ .../auto-graft-sync/schema.graphql | 5 + .../auto-graft-sync/src/mapping.ts | 39 ++++ .../auto-graft-sync/template.yaml | 66 ++++++ tests/src/fixture/mod.rs | 29 ++- tests/tests/runner_tests.rs | 127 +++++++++- 21 files changed, 686 insertions(+), 40 deletions(-) create mode 100644 tests/runner-tests/auto-graft-sync/abis/Contract.abi create mode 100644 tests/runner-tests/auto-graft-sync/config.toml create mode 100644 tests/runner-tests/auto-graft-sync/generateAndPublishManifestFromTemplate.js create mode 100644 tests/runner-tests/auto-graft-sync/graft-base.yaml create mode 100644 tests/runner-tests/auto-graft-sync/package.json create mode 100644 tests/runner-tests/auto-graft-sync/schema.graphql create mode 100644 tests/runner-tests/auto-graft-sync/src/mapping.ts create mode 100644 tests/runner-tests/auto-graft-sync/template.yaml diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 1def8c483cc..6ce304636fc 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -15,6 +15,7 @@ use graph::prelude::{ EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, }; use graph::schema::InputSchema; +use graph::slog::info; use graph::substreams::Clock; use graph::{ blockchain::{ @@ -451,6 +452,17 @@ impl Blockchain for Chain { logger: &Logger, number: BlockNumber, ) -> Result { + // temporary hack until we can refactor this to use the block stream + if self.block_stream_builder.can_directly_resolve_blocks() { + info!(&logger, "block_pointer_from_number - can_directly_resolve_blocks"; "number" => number); + if let Some(block_ptr) = self + .block_stream_builder + .directly_resolve_block_from_number(number) + { + return Ok(block_ptr); + } + } + info!(&logger, "block_pointer_from_number"; "number" => number); match self.client.as_ref() { ChainClient::Firehose(endpoints) => endpoints .endpoint() diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index fe80d118457..6df7777c775 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -28,6 +28,8 @@ use graph::tokio_retry::Retry; use graph::util::futures::retry_strategy; use graph::util::futures::RETRY_DEFAULT_LIMIT; +const MAX_AUTO_GRAFT_SYNC_DEPTH: u32 = 42; + pub struct SubgraphRegistrar { logger: Logger, logger_factory: LoggerFactory, @@ -303,20 +305,8 @@ where .logger_factory .subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone())); - let raw: serde_yaml::Mapping = { - let file_bytes = self - .resolver - .cat(&logger, &hash.to_ipfs_link()) - .await - .map_err(|e| { - SubgraphRegistrarError::ResolveError( - SubgraphManifestResolveError::ResolveError(e), - ) - })?; - - serde_yaml::from_slice(&file_bytes) - .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? - }; + let raw: serde_yaml::Mapping = + resolve_raw_manifest(&self.resolver, &self.logger, &hash).await?; let kind = BlockchainKind::from_manifest(&raw).map_err(|e| { SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e)) @@ -326,9 +316,15 @@ where let history_blocks = history_blocks.or(self.settings.for_name(&name).map(|c| c.history_blocks)); + let auto_graft_sync_depth = if self.store.auto_graft_sync() { + Some(0) + } else { + None + }; + let deployment_locator = match kind { BlockchainKind::Arweave => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -342,11 +338,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Ethereum => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -360,11 +358,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Near => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -378,11 +378,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Cosmos => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -396,11 +398,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Substreams => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -414,11 +418,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Starknet => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -432,6 +438,8 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } @@ -555,9 +563,9 @@ async fn start_subgraph( } /// Resolves the subgraph's earliest block -async fn resolve_start_block( - manifest: &SubgraphManifest, - chain: &impl Blockchain, +async fn resolve_start_block( + manifest: &SubgraphManifest, + chain: &C, logger: &Logger, ) -> Result, SubgraphRegistrarError> { // If the minimum start block is 0 (i.e. the genesis block), @@ -591,20 +599,26 @@ async fn resolve_graft_block( chain: &impl Blockchain, logger: &Logger, ) -> Result { + debug!(&logger, "Resolve graft block"; "base" => base.base.to_string(), "block" => base.block); chain .block_pointer_from_number(logger, base.block) .await - .map_err(|_| { + .map_err(|err| { + error!(&logger, "Failed to resolve graft block"; "error" => err.to_string()); SubgraphRegistrarError::ManifestValidationError(vec![ SubgraphManifestValidationError::BlockNotFound(format!( - "graft base block {} not found", - base.block + "graft base {} block {} not found", + base.base, base.block )), ]) }) } -async fn create_subgraph_version( +async fn create_subgraph_version< + C: Blockchain, + S: SubgraphStore, + P: SubgraphAssignmentProviderTrait, +>( logger: &Logger, store: Arc, chains: Arc, @@ -618,9 +632,13 @@ async fn create_subgraph_version( version_switching_mode: SubgraphVersionSwitchingMode, resolver: &Arc, history_blocks_override: Option, + depth: Option, + provider: Arc

, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); - let unvalidated = UnvalidatedSubgraphManifest::::resolve( + + // We need to defer validation of the manifest until after we have synced the base subgraph. + let unvalidated_manifest = UnvalidatedSubgraphManifest::::resolve( deployment.clone(), raw, resolver, @@ -630,16 +648,38 @@ async fn create_subgraph_version( .map_err(SubgraphRegistrarError::ResolveError) .await?; - // Determine if the graft_base should be validated. - // Validate the graft_base if there is a pending graft, ensuring its presence. - // If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated. - // If the subgraph already exists and there is no pending graft, graft_base validation is not required. + if let (Some(depth), Some(graft)) = (depth, unvalidated_manifest.unvalidated_graft()) { + if depth < MAX_AUTO_GRAFT_SYNC_DEPTH { + Box::pin(auto_sync_graft::( + graft, + resolver, + logger, + &store, + &chains, + &name, + &node_id, + &debug_fork, + version_switching_mode, + history_blocks_override, + depth, + provider, + )) + .await?; + } else { + warn!( + logger, + "auto-graft-sync: subgraph grafts depth limit reached"; + "depth" => depth + ); + } + } + let should_validate = match store.graft_pending(&deployment) { Ok(graft_pending) => graft_pending, Err(StoreError::DeploymentNotFound(_)) => true, Err(e) => return Err(SubgraphRegistrarError::StoreError(e)), }; - let manifest = unvalidated + let manifest = unvalidated_manifest .validate(store.cheap_clone(), should_validate) .await .map_err(SubgraphRegistrarError::ManifestValidationError)?; @@ -732,3 +772,115 @@ async fn create_subgraph_version( ) .map_err(SubgraphRegistrarError::SubgraphDeploymentError) } + +/// Automatically syncs a subgraph graft from the base subgraph. +/// This will await the syncing of the base subgraph before proceeding. +/// Recursively calls `create_subgraph_version` to create any grafts of +/// this graft up to `MAX_AUTO_GRAFT_SYNC_DEPTH`.` +async fn auto_sync_graft( + graft: &Graft, + resolver: &Arc, + logger: &Logger, + store: &Arc, + chains: &Arc, + name: &SubgraphName, + node_id: &NodeId, + debug_fork: &Option, + version_switching_mode: SubgraphVersionSwitchingMode, + history_blocks_override: Option, + depth: u32, + provider: Arc

, +) -> Result { + info!( + logger, + "auto-graft-sync: begin graft sync"; + "subgraph" => name.to_string(), + "hash" => graft.base.to_string(), + "depth" => depth, + "block" => graft.block + ); + let subgraft_raw_manifest = resolve_raw_manifest(resolver, logger, &graft.base).await?; + + let deployment = graft.base.clone(); + + let name = &deployment[deployment.len().saturating_sub(10)..]; + let name = format!("auto-graft-sync/{}", name); + let name = + SubgraphName::new(name.clone()).map_err(|_| SubgraphRegistrarError::NameNotValid(name))?; + + info!( + logger, + "auto-graft-sync: create subgraph"; + "subgraph" => name.to_string(), + "hash" => graft.base.to_string() + ); + + let _ = store.create_subgraph(name.clone())?; + info!(logger, "Created subgraph"; "subgraph_name" => name.to_string(), "id" => deployment.to_string()); + + let locator = create_subgraph_version::( + logger, + store.clone(), + chains.clone(), + name.clone(), + graft.base.clone(), + None, + None, + subgraft_raw_manifest.clone(), + node_id.clone(), + debug_fork.clone(), + version_switching_mode, + resolver, + history_blocks_override, + Some(depth + 1), + provider.clone(), + ) + .await?; + + info!( + logger, + "auto-graft-sync: awaiting subgraph sync"; + "subgraph" => name.to_string(), + "hash" => graft.base.to_string() + ); + + info!(&logger, "auto-graft-sync: starting graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string()); + provider + .start(locator.clone(), Some(graft.block)) + .await + .map_err(SubgraphRegistrarError::AutoGraftSubgraphAssignmentError)?; + + info!(&logger, "auto-graft-sync: waiting for graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string()); + graft + .await_sync(store.clone(), Duration::from_secs(1)) + .await?; + + info!( + logger, + "auto-graft-sync: sync complete"; + "subgraph" => name.to_string(), + "graft-hash" => graft.base.to_string(), + "depth" => depth, + "hash" => graft.base.to_string() + ); + Ok(locator) +} + +async fn resolve_raw_manifest( + resolver: &Arc, + logger: &Logger, + deployment_hash: &DeploymentHash, +) -> Result { + let subgraft_raw_manifest: serde_yaml::Mapping = { + let file_bytes = resolver + .cat(&logger, &deployment_hash.to_ipfs_link()) + .await + .map_err(|e| { + SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e)) + })?; + + serde_yaml::from_slice(&file_bytes) + .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? + }; + Ok(subgraft_raw_manifest) +} diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 25a923dd502..bd19cf9e723 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -118,6 +118,14 @@ pub trait BlockRefetcher: Send + Sync { /// BlockStreamBuilder is an abstraction that would separate the logic for building streams from the blockchain trait #[async_trait] pub trait BlockStreamBuilder: Send + Sync { + fn can_directly_resolve_blocks(&self) -> bool { + false + } + + fn directly_resolve_block_from_number(&self, _number: BlockNumber) -> Option { + None + } + async fn build_firehose( &self, chain: &C, diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 69ed67c16b2..c009bf227b7 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -215,6 +215,8 @@ pub trait SubgraphStore: Send + Sync + 'static { /// When this flag is set, indexing of the deployment should log /// additional diagnostic information fn instrument(&self, deployment: &DeploymentLocator) -> Result; + + fn auto_graft_sync(&self) -> bool; } pub trait ReadStore: Send + Sync + 'static { diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 52b0f4dfed1..549eb75e9b5 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -26,6 +26,8 @@ use stable_hash_legacy::SequenceNumber; use std::{ collections::{BTreeSet, HashMap, HashSet}, marker::PhantomData, + time::Duration, + time::Instant, }; use thiserror::Error; use wasmparser; @@ -284,6 +286,8 @@ pub enum SubgraphRegistrarError { NameExists(String), #[error("subgraph name not found: {0}")] NameNotFound(String), + #[error("subgraph name not valid: {0}")] + NameNotValid(String), #[error("network not supported by registrar: {0}")] NetworkNotSupported(Error), #[error("deployment not found: {0}")] @@ -298,6 +302,8 @@ pub enum SubgraphRegistrarError { ManifestValidationError(Vec), #[error("subgraph deployment error: {0}")] SubgraphDeploymentError(StoreError), + #[error("auto-graft-sync subgraph assignment error: {0}")] + AutoGraftSubgraphAssignmentError(SubgraphAssignmentProviderError), #[error("subgraph registrar error: {0}")] Unknown(#[from] anyhow::Error), } @@ -522,6 +528,39 @@ impl Graft { (Some(_), _) => Ok(()), } } + + /// Awaits the target block sync for the graft. + pub async fn await_sync( + &self, + store: Arc, + interval: Duration, + ) -> Result<(), SubgraphRegistrarError> { + const MAX_WAIT_NO_BLOCKS: Duration = Duration::from_secs(10); + let start = Instant::now(); + + loop { + let maybe_latest_block = store + .least_block_ptr(&self.base) + .await + .map_err(|e| SubgraphRegistrarError::DeploymentNotFound(e.to_string()))?; + + // TODO: could we get a stream over the block pointers? + if let Some(block) = maybe_latest_block { + if block.block_number() >= self.block { + break Ok(()); + } else { + tokio::time::sleep(interval).await; + } + } else { + if start.elapsed() > MAX_WAIT_NO_BLOCKS { + return Err(SubgraphRegistrarError::ManifestValidationError(vec![SubgraphManifestValidationError::GraftBaseInvalid(format!( + "failed to graft onto `{}` at block {} since it has not processed any blocks", + self.base, self.block + ))])); + } + } + } + } } #[derive(Clone, Debug)] @@ -753,6 +792,11 @@ impl UnvalidatedSubgraphManifest { pub fn spec_version(&self) -> &Version { &self.0.spec_version } + + /// Get the graft from this unvalidated manifest. + pub fn unvalidated_graft(&self) -> Option<&Graft> { + self.0.graft.as_ref() + } } impl SubgraphManifest { diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 1f907539194..34f9b1d3622 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -1,5 +1,6 @@ [general] query = "query_node_.*" +auto_sync_grafts = false [store] [store.primary] diff --git a/node/src/chain.rs b/node/src/chain.rs index 3e87ff8295b..db03d926576 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -706,6 +706,7 @@ mod test { ethereum_ws: vec![], ethereum_ipc: vec![], unsafe_config: false, + auto_graft_sync: false, }; let metrics = Arc::new(EndpointMetrics::mock()); diff --git a/node/src/config.rs b/node/src/config.rs index 93aab34ee8c..1d4d6d63f53 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -48,6 +48,7 @@ pub struct Opt { pub ethereum_ws: Vec, pub ethereum_ipc: Vec, pub unsafe_config: bool, + pub auto_graft_sync: bool, } impl Default for Opt { @@ -64,6 +65,7 @@ impl Default for Opt { ethereum_ws: vec![], ethereum_ipc: vec![], unsafe_config: false, + auto_graft_sync: false, } } } @@ -77,6 +79,8 @@ pub struct Config { pub stores: BTreeMap, pub chains: ChainSection, pub deployment: Deployment, + #[serde(default)] + pub auto_graft_sync: bool, } fn validate_name(s: &str) -> Result<()> { @@ -192,6 +196,7 @@ impl Config { let chains = ChainSection::from_opt(opt)?; let node = NodeId::new(opt.node_id.to_string()) .map_err(|()| anyhow!("invalid node id {}", opt.node_id))?; + let auto_graft_sync = opt.auto_graft_sync; stores.insert(PRIMARY_SHARD.to_string(), Shard::from_opt(true, opt)?); Ok(Config { node, @@ -199,6 +204,7 @@ impl Config { stores, chains, deployment, + auto_graft_sync, }) } diff --git a/node/src/opt.rs b/node/src/opt.rs index c2945959514..3310387008e 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -231,6 +231,13 @@ pub struct Opt { help = "Base URL for forking subgraphs" )] pub fork_base: Option, + + #[clap( + long, + env = "AUTO_GRAFT_SYNC", + help = "Automatically sync grafts from the base subgraph" + )] + pub auto_graft_sync: bool, } impl From for config::Opt { @@ -247,6 +254,7 @@ impl From for config::Opt { ethereum_ws, ethereum_ipc, unsafe_config, + auto_graft_sync, .. } = opt; @@ -262,6 +270,7 @@ impl From for config::Opt { ethereum_ws, ethereum_ipc, unsafe_config, + auto_graft_sync, } } } diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 2a39d0ea6ed..e57dad7b391 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -155,6 +155,7 @@ impl StoreBuilder { notification_sender, fork_base, registry, + config.auto_graft_sync, )); (store, pools, coord) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 41cbef15982..6d134f54317 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -207,6 +207,8 @@ pub struct SubgraphStore { /// subgraph forks will fetch entities. /// Example: https://api.thegraph.com/subgraphs/ fork_base: Option, + + auto_graft_sync: bool, } impl SubgraphStore { @@ -231,12 +233,14 @@ impl SubgraphStore { sender: Arc, fork_base: Option, registry: Arc, + auto_graft_sync: bool, ) -> Self { Self { inner: Arc::new(SubgraphStoreInner::new( logger, stores, placer, sender, registry, )), fork_base, + auto_graft_sync, } } @@ -1600,4 +1604,8 @@ impl SubgraphStoreTrait for SubgraphStore { let info = store.subgraph_info(site)?; Ok(info.instrument) } + + fn auto_graft_sync(&self) -> bool { + self.auto_graft_sync + } } diff --git a/tests/runner-tests/auto-graft-sync/abis/Contract.abi b/tests/runner-tests/auto-graft-sync/abis/Contract.abi new file mode 100644 index 00000000000..02da1a9e7f3 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/abis/Contract.abi @@ -0,0 +1,33 @@ +[ + { + "inputs": [], + "stateMutability": "nonpayable", + "type": "constructor" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint16", + "name": "x", + "type": "uint16" + } + ], + "name": "Trigger", + "type": "event" + }, + { + "inputs": [ + { + "internalType": "uint16", + "name": "x", + "type": "uint16" + } + ], + "name": "emitTrigger", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + } +] diff --git a/tests/runner-tests/auto-graft-sync/config.toml b/tests/runner-tests/auto-graft-sync/config.toml new file mode 100644 index 00000000000..0d2c55e11a3 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/config.toml @@ -0,0 +1,20 @@ +# This is the config flag for enabling auto-graft-sync, which is the target of this test. +auto_graft_sync = true + +[store] +[store.primary] +connection = "$THEGRAPH_STORE_POSTGRES_DIESEL_URL" +pool_size = 10 + +[deployment] +[[deployment.rule]] +store = "primary" +indexers = ["default"] + +[chains] +ingestor = "default" + +# The tests do not talk to ethereum clients +[chains.test] +shard = "primary" +provider = [{ label = "penguin", url = "http://localhost:1/", features = [] }] diff --git a/tests/runner-tests/auto-graft-sync/generateAndPublishManifestFromTemplate.js b/tests/runner-tests/auto-graft-sync/generateAndPublishManifestFromTemplate.js new file mode 100644 index 00000000000..e9ae741152f --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/generateAndPublishManifestFromTemplate.js @@ -0,0 +1,21 @@ + +const fs = require('fs'); +const { execSync } = require('child_process'); + +// This takes a subgraphName, outPath, and Qm.. hash as a CLI input, which is the graft base. +const outPath = process.argv[2]; +const graftBase = process.argv[3]; +const graftBlock = process.argv[4]; + +const yamlPath = './template.yaml'; +let yamlContent = fs.readFileSync(yamlPath, 'utf-8'); +yamlContent = yamlContent.replace(/base: .+/, `base: ${graftBase}`); +yamlContent = yamlContent.replace(/block: .+/, `block: ${graftBlock}`); +fs.writeFileSync(outPath, yamlContent); +console.log("fuzzba") + +// Assuming you have your IPFS_URI exported as environment variables. +// Instead of deploy, run graph build to -only upload to ipfs-. +execSync('graph build ' + outPath + ' --ipfs $IPFS_URI', { + stdio: 'inherit' +}); diff --git a/tests/runner-tests/auto-graft-sync/graft-base.yaml b/tests/runner-tests/auto-graft-sync/graft-base.yaml new file mode 100644 index 00000000000..5fd468a3342 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/graft-base.yaml @@ -0,0 +1,63 @@ +# for this test, this is the lowest level of the grafting hierarchy +specVersion: 0.0.4 +features: + - grafting +schema: + file: ./schema.graphql +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + file: ./src/mapping.ts + # Tests that adding a data source is possible in a graft + - kind: ethereum/contract + name: Contract2 + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + callHandlers: + - handler: handleBlock + function: emitTrigger(uint16) + file: ./src/mapping.ts +templates: + - kind: ethereum/contract + name: Template + network: test + source: + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlockTemplate + file: ./src/mapping.ts diff --git a/tests/runner-tests/auto-graft-sync/package.json b/tests/runner-tests/auto-graft-sync/package.json new file mode 100644 index 00000000000..d766288c0b8 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/package.json @@ -0,0 +1,13 @@ +{ + "name": "auto-graft-sync", + "version": "0.1.0", + "scripts": { + "codegen": "graph codegen graft-base.yaml --skip-migrations", + "build-graft-root": "graph build graft-base.yaml --ipfs $IPFS_URI", + "build:test-auto-graft-sync": "node generateAndPublishManifestFromTemplate.js" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.60.0", + "@graphprotocol/graph-ts": "0.31.0" + } +} diff --git a/tests/runner-tests/auto-graft-sync/schema.graphql b/tests/runner-tests/auto-graft-sync/schema.graphql new file mode 100644 index 00000000000..6c007b3245b --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/schema.graphql @@ -0,0 +1,5 @@ +# The `id` is the block number and `count` the handler invocations at that block. +type DataSourceCount @entity { + id: ID! + count: Int! +} diff --git a/tests/runner-tests/auto-graft-sync/src/mapping.ts b/tests/runner-tests/auto-graft-sync/src/mapping.ts new file mode 100644 index 00000000000..feb6f313bbc --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/src/mapping.ts @@ -0,0 +1,39 @@ +import { + ethereum, + DataSourceContext, + dataSource, + Address, + BigInt, +} from "@graphprotocol/graph-ts"; +import { Template } from "../generated/templates"; +import { DataSourceCount } from "../generated/schema"; + +export function handleBlock(block: ethereum.Block): void { + let context = new DataSourceContext(); + context.setBigInt("number", block.number); + context.setBytes("hash", block.hash); + + Template.createWithContext( + changetype

(Address.fromHexString( + "0x2E645469f354BB4F5c8a05B3b30A929361cf77eC" + )), + context + ); +} + +export function handleBlockTemplate(block: ethereum.Block): void { + let count = DataSourceCount.load(block.number.toString()); + if (count == null) { + count = new DataSourceCount(block.number.toString()); + count.count = 0; + } + + let ctx = dataSource.context(); + let number = ctx.getBigInt("number"); + assert( + count.count == number.toI32(), + "wrong count, found " + BigInt.fromI32(count.count).toString() + ); + count.count += 1; + count.save(); +} diff --git a/tests/runner-tests/auto-graft-sync/template.yaml b/tests/runner-tests/auto-graft-sync/template.yaml new file mode 100644 index 00000000000..bf5c1b47a06 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/template.yaml @@ -0,0 +1,66 @@ +specVersion: 0.0.4 +features: + - grafting +schema: + file: ./schema.graphql +graft: + # This value will be overwritten by the templates + base: QmcAL39QSKZvRssr2ToCJrav7XK9ggajxvBR7M1NNUCqdh + block: 3 +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + file: ./src/mapping.ts + # Tests that adding a data source is possible in a graft + - kind: ethereum/contract + name: Contract2 + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + callHandlers: + - handler: handleBlock + function: emitTrigger(uint16) + file: ./src/mapping.ts +templates: + - kind: ethereum/contract + name: Template + network: test + source: + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlockTemplate + file: ./src/mapping.ts diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index ebed1d3a115..b8360467a33 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -644,7 +644,7 @@ pub async fn wait_for_sync( return Err(fatal_error); } - if block_ptr == stop_block { + if block_ptr.number >= stop_block.number { info!(logger, "TEST: reached stop block"); return Ok(()); } @@ -679,6 +679,17 @@ pub struct MutexBlockStreamBuilder(pub Mutex BlockStreamBuilder for MutexBlockStreamBuilder { + fn can_directly_resolve_blocks(&self) -> bool { + true + } + + fn directly_resolve_block_from_number(&self, number: BlockNumber) -> Option { + self.0 + .lock() + .unwrap() + .directly_resolve_block_from_number(number) + } + async fn build_firehose( &self, chain: &C, @@ -743,6 +754,22 @@ impl BlockStreamBuilder for StaticStreamBuilder where C::TriggerData: Clone, { + fn can_directly_resolve_blocks(&self) -> bool { + true + } + + fn directly_resolve_block_from_number(&self, number: BlockNumber) -> Option { + let mut blocks = self.chain.iter().map(|b| b.ptr()); + let block = blocks.find_map(|b| { + if b.number == number { + Some(b.clone()) + } else { + None + } + }); + block + } + async fn build_substreams( &self, _chain: &C, diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 7da707ac7cd..20814c369fe 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -143,10 +143,10 @@ async fn data_source_revert() -> anyhow::Result<()> { // Test grafted version let subgraph_name = SubgraphName::new("data-source-revert-grafted").unwrap(); - let hash = build_subgraph_with_yarn_cmd_and_arg( + let hash = build_subgraph_with_yarn_cmd_and_args( "./runner-tests/data-source-revert", "deploy:test-grafted", - Some(&test_info.hash), + vec![&test_info.hash], ) .await; let test_info = TestInfo { @@ -1261,6 +1261,121 @@ async fn arweave_file_data_sources() { ); } +#[tokio::test] +async fn auto_graft_sync() -> anyhow::Result<()> { + let stores = fixture::stores( + "auto_graft_sync", + "./runner-tests/auto-graft-sync/config.toml", + ) + .await; + + assert_eq!( + stores.network_store.subgraph_store().auto_graft_sync(), + true, + "Auto graft sync should be enabled" + ); + + let test_name = "auto_graft_sync"; + + let blocks = { + let block0 = genesis(); + let mut last_block = block0.ptr(); + let mut blocks = vec![block0]; + for i in 1..=20 { + let block = empty_block(last_block, test_ptr(i)); + last_block = block.ptr(); + blocks.push(block); + } + blocks + }; + + let chain = chain(&test_name, blocks.clone(), &stores, None).await; + + // Root graft + // only build the subgraph and create it, but don't deploy it to be assigned + let root_deployment_hash = build_subgraph_with_yarn_cmd_and_args( + "./runner-tests/auto-graft-sync", + "build-graft-root", + vec![], + ) + .await; + + let mut prev_hash = root_deployment_hash.clone(); + + let subgraph_name = SubgraphName::new("auto-graft-sync-tip".to_string()).unwrap(); + let root_deployment = TestInfo { + test_name: test_name.to_string(), + hash: root_deployment_hash.clone(), + test_dir: "./runner-tests/auto-graft-sync".to_string(), + subgraph_name, + }; + + // our "root deployment" does not have a graft, so this should complete. + let base_ctx = fixture::setup(&root_deployment, &stores, &chain, None, None).await; + base_ctx.start_and_sync_to(test_ptr(1)).await; + + let block_ptr = stores + .network_store + .subgraph_store() + .least_block_ptr(&root_deployment.hash) + .await + .unwrap() + .unwrap(); + + base_ctx + .start_and_sync_to(test_ptr(block_ptr.number + 1)) + .await; + + for i in 0..3 { + let outfile = format!("auto-graft-sync-{}.yaml", i); + // Test grafted version + let hash = build_subgraph_with_yarn_cmd_and_args( + "./runner-tests/auto-graft-sync", + "build:test-auto-graft-sync", + vec![&outfile, &prev_hash, &(block_ptr.number + 1).to_string()], + ) + .await; + prev_hash = hash; + } + + let top_graft_info = TestInfo { + test_name: test_name.to_string(), + hash: prev_hash.clone(), + test_dir: "./runner-tests/auto-graft-sync".to_string(), + subgraph_name: SubgraphName::new("auto-graft-sync-tip".to_string()).unwrap(), + }; + + let ctx = fixture::setup(&top_graft_info, &stores, &chain, None, None).await; + ctx.start_and_sync_to(test_ptr(block_ptr.number + 1)).await; + + tokio::time::sleep(Duration::from_secs(3)).await; + + for i in 0..10 { + let query_res = ctx + .query(&format!( + "{{ dataSourceCount(id: \"{}\") {{ id, count }} }}", + i + )) + .await + .unwrap(); + println!("query_res: {:?}", query_res); + } + + // TODO: The semantically correct value for `count` would be 5. But because the test fixture + // uses a `NoopTriggersAdapter` the data sources are not reprocessed in the block in which they + // are created. + let query_res = base_ctx + .query(r#"{ dataSourceCount(id: "3") { id, count } }"#) + .await + .unwrap(); + assert_eq!( + query_res, + Some(object! { dataSourceCount: object!{ id: "3", count: 3 } }) + ); + + Ok(()) +} + /// deploy_cmd is the command to run to deploy the subgraph. If it is None, the /// default `yarn deploy:test` is used. async fn build_subgraph(dir: &str, deploy_cmd: Option<&str>) -> DeploymentHash { @@ -1268,13 +1383,13 @@ async fn build_subgraph(dir: &str, deploy_cmd: Option<&str>) -> DeploymentHash { } async fn build_subgraph_with_yarn_cmd(dir: &str, yarn_cmd: &str) -> DeploymentHash { - build_subgraph_with_yarn_cmd_and_arg(dir, yarn_cmd, None).await + build_subgraph_with_yarn_cmd_and_args(dir, yarn_cmd, vec![]).await } -async fn build_subgraph_with_yarn_cmd_and_arg( +async fn build_subgraph_with_yarn_cmd_and_args( dir: &str, yarn_cmd: &str, - arg: Option<&str>, + mut additional_args: Vec<&str>, ) -> DeploymentHash { // Test that IPFS is up. IpfsClient::localhost() @@ -1296,7 +1411,7 @@ async fn build_subgraph_with_yarn_cmd_and_arg( run_cmd(Command::new("yarn").arg("codegen").current_dir(dir)); let mut args = vec![yarn_cmd]; - args.extend(arg); + args.append(&mut additional_args); // Run `deploy` for the side effect of uploading to IPFS, the graph node url // is fake and the actual deploy call is meant to fail.