From fd101615f03ec47b7005cba26e33dacd87c0c001 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 27 Nov 2023 20:31:11 -0500 Subject: [PATCH] refactor(graph-gateway): add optimistic indexer attempt --- graph-gateway/src/client_query.rs | 52 +++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index ab8b39243..3efc8378e 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -452,7 +452,7 @@ async fn handle_client_query_inner( let block_constraints = block_constraints(&context) .ok_or_else(|| Error::InvalidQuery(anyhow!("Failed to determine block constraints.")))?; - let resolved_blocks = join_all( + let mut resolved_blocks = join_all( block_constraints .iter() .filter_map(|constraint| constraint.clone().into_unresolved()) @@ -636,6 +636,17 @@ async fn handle_client_query_inner( Error::InvalidQuery(anyhow!("Failed to set block constraints")) })?; + let optimistic_query = optimistic_query( + context.clone(), + &mut resolved_blocks, + &latest_block, + &latest_query_block, + &utility_params.requirements, + &block_cache, + &selection, + ) + .await; + let indexer_query_context = indexer_query_context.clone(); let outcome_tx = outcome_tx.clone(); // We must manually construct this span before the spawned task, since otherwise @@ -653,6 +664,7 @@ async fn handle_client_query_inner( selection.clone(), deterministic_query, latest_query_block.number, + optimistic_query, ) .await; let _ = outcome_tx.try_send(response.map(|response| QueryOutcome { @@ -702,6 +714,7 @@ async fn handle_indexer_query( selection: Selection, deterministic_query: String, latest_query_block: u64, + optimistic_query: Option, ) -> Result { let indexing = selection.indexing; let deployment = indexing.deployment.to_string(); @@ -715,7 +728,16 @@ async fn handle_indexer_query( subgraph_chain = %ctx.deployment.manifest.network, ); - let result = handle_indexer_query_inner(&mut ctx, selection, deterministic_query).await; + let optimistic_response = match optimistic_query { + Some(query) => handle_indexer_query_inner(&mut ctx, selection.clone(), query) + .await + .ok(), + None => None, + }; + let result = match optimistic_response { + Some(response) => Ok(response), + None => handle_indexer_query_inner(&mut ctx, selection, deterministic_query).await, + }; METRICS.indexer_query.check(&[&deployment], &result); tracing::info!( @@ -875,6 +897,32 @@ pub fn indexer_fee( } } +async fn optimistic_query( + ctx: AgoraContext<'_, String>, + resolved: &mut BTreeSet, + latest: &BlockPointer, + latest_query_block: &BlockPointer, + requirements: &BlockRequirements, + block_cache: &BlockCache, + selection: &Selection, +) -> Option { + if !requirements.has_latest { + return None; + } + let blocks_per_minute = block_cache.blocks_per_minute.value_immediate()?; + if selection.blocks_behind >= blocks_per_minute { + return None; + } + let optimistic_block_number = latest.number.saturating_sub(blocks_per_minute / 30); + if optimistic_block_number == latest_query_block.number { + return None; + } + let unresolved = UnresolvedBlock::WithNumber(optimistic_block_number); + let optimistic_block = block_cache.fetch_block(unresolved).await.ok()?; + resolved.insert(optimistic_block.clone()); + make_query_deterministic(ctx, resolved, &optimistic_block) +} + /// This adapter middleware extracts the authorization token from the `api_key` path parameter, /// and adds it to the request in the `Authorization` header. ///