Skip to content

Commit

Permalink
refactor(graph-gateway): add optimistic indexer attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 28, 2023
1 parent f6bf28f commit fd10161
Showing 1 changed file with 50 additions and 2 deletions.
52 changes: 50 additions & 2 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ async fn handle_client_query_inner(

let block_constraints = block_constraints(&context)
.ok_or_else(|| Error::InvalidQuery(anyhow!("Failed to determine block constraints.")))?;
let resolved_blocks = join_all(
let mut resolved_blocks = join_all(
block_constraints
.iter()
.filter_map(|constraint| constraint.clone().into_unresolved())
Expand Down Expand Up @@ -636,6 +636,17 @@ async fn handle_client_query_inner(
Error::InvalidQuery(anyhow!("Failed to set block constraints"))
})?;

let optimistic_query = optimistic_query(
context.clone(),
&mut resolved_blocks,
&latest_block,
&latest_query_block,
&utility_params.requirements,
&block_cache,
&selection,
)
.await;

let indexer_query_context = indexer_query_context.clone();
let outcome_tx = outcome_tx.clone();
// We must manually construct this span before the spawned task, since otherwise
Expand All @@ -653,6 +664,7 @@ async fn handle_client_query_inner(
selection.clone(),
deterministic_query,
latest_query_block.number,
optimistic_query,
)
.await;
let _ = outcome_tx.try_send(response.map(|response| QueryOutcome {
Expand Down Expand Up @@ -702,6 +714,7 @@ async fn handle_indexer_query(
selection: Selection,
deterministic_query: String,
latest_query_block: u64,
optimistic_query: Option<String>,
) -> Result<ResponsePayload, IndexerError> {
let indexing = selection.indexing;
let deployment = indexing.deployment.to_string();
Expand All @@ -715,7 +728,16 @@ async fn handle_indexer_query(
subgraph_chain = %ctx.deployment.manifest.network,
);

let result = handle_indexer_query_inner(&mut ctx, selection, deterministic_query).await;
let optimistic_response = match optimistic_query {
Some(query) => handle_indexer_query_inner(&mut ctx, selection.clone(), query)
.await
.ok(),
None => None,
};
let result = match optimistic_response {
Some(response) => Ok(response),
None => handle_indexer_query_inner(&mut ctx, selection, deterministic_query).await,
};
METRICS.indexer_query.check(&[&deployment], &result);

tracing::info!(
Expand Down Expand Up @@ -875,6 +897,32 @@ pub fn indexer_fee(
}
}

async fn optimistic_query(
ctx: AgoraContext<'_, String>,
resolved: &mut BTreeSet<BlockPointer>,
latest: &BlockPointer,
latest_query_block: &BlockPointer,
requirements: &BlockRequirements,
block_cache: &BlockCache,
selection: &Selection,
) -> Option<String> {
if !requirements.has_latest {
return None;
}
let blocks_per_minute = block_cache.blocks_per_minute.value_immediate()?;
if selection.blocks_behind >= blocks_per_minute {
return None;
}
let optimistic_block_number = latest.number.saturating_sub(blocks_per_minute / 30);
if optimistic_block_number == latest_query_block.number {
return None;
}
let unresolved = UnresolvedBlock::WithNumber(optimistic_block_number);
let optimistic_block = block_cache.fetch_block(unresolved).await.ok()?;
resolved.insert(optimistic_block.clone());
make_query_deterministic(ctx, resolved, &optimistic_block)
}

/// This adapter middleware extracts the authorization token from the `api_key` path parameter,
/// and adds it to the request in the `Authorization` header.
///
Expand Down

0 comments on commit fd10161

Please sign in to comment.