Skip to content

Commit

Permalink
feat(scalar): add scalar TAP receipt signing
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Nov 10, 2023
1 parent a0eb6b4 commit a81a2f4
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 51 deletions.
76 changes: 73 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions graph-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ serde_json = { version = "1.0", features = ["raw_value"] }
serde_with = "3.1"
serde_yaml = "0.9"
simple-rate-limiter = "1.0"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", tag = "tap_core-v0.6.0" }
thiserror = "1.0.40"
tokio.workspace = true
toolshed.workspace = true
Expand Down
10 changes: 4 additions & 6 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
},
};

use alloy_primitives::Address;
use alloy_sol_types::Eip712Domain;
use anyhow::{anyhow, bail, Context as _};
use axum::extract::OriginalUri;
Expand Down Expand Up @@ -51,7 +50,7 @@ use crate::chains::BlockCache;
use crate::indexer_client::{check_block_error, IndexerClient, IndexerError, ResponsePayload};
use crate::indexing::IndexingStatus;
use crate::metrics::{with_metric, METRICS};
use crate::receipts::{ReceiptSigner, ReceiptStatus};
use crate::receipts::{ReceiptSigner, ReceiptStatus, ScalarReceipt};
use crate::reports::{self, serialize_attestation, KafkaClient};
use crate::topology::{Deployment, GraphNetwork, Subgraph};
use crate::unattestable_errors::{miscategorized_attestable, miscategorized_unattestable};
Expand Down Expand Up @@ -747,7 +746,7 @@ async fn handle_indexer_query(
.receipt_signer
.create_receipt(&selection.indexing, selection.fee)
.await
.map_err(|_| IndexerError::NoAllocation);
.ok_or(IndexerError::NoAllocation);

let result = match receipt.as_ref() {
Ok(receipt) => {
Expand Down Expand Up @@ -803,7 +802,7 @@ async fn handle_indexer_query_inner(
ctx: &mut IndexerQueryContext,
selection: Selection,
deterministic_query: String,
receipt: &[u8],
receipt: &ScalarReceipt,
) -> Result<ResponsePayload, IndexerError> {
let start_time = Instant::now();
let result = ctx
Expand All @@ -816,8 +815,7 @@ async fn handle_indexer_query_inner(
hist.observe(ctx.response_time.as_millis() as f64)
});

let allocation = Address::from_slice(&receipt[0..20]);

let allocation = receipt.allocation();
tracing::info!(target: reports::INDEXER_QUERY_TARGET, ?allocation);

let response = result?;
Expand Down
19 changes: 15 additions & 4 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::str::FromStr;
use std::time::Duration;
use std::{collections::BTreeMap, fmt, path::PathBuf};

use alloy_primitives::Address;
use alloy_primitives::{Address, U256};
use ethers::signers::coins_bip39::English;
use ethers::signers::MnemonicBuilder;
use graph_subscriptions::subscription_tier::{SubscriptionTier, SubscriptionTiers};
Expand Down Expand Up @@ -67,9 +67,8 @@ pub struct Config {
pub port_metrics: u16,
/// Target for indexer fees paid per query
pub query_fees_target: f64,
/// Mnemonic for voucher signing
#[serde_as(as = "DisplayFromStr")]
pub signer_key: SignerKey,
/// Scalar TAP config (receipt signing)
pub scalar: Scalar,
/// API keys that won't be blocked for non-payment
#[serde(default)]
pub special_api_keys: Vec<String>,
Expand Down Expand Up @@ -150,6 +149,18 @@ impl From<KafkaConfig> for rdkafka::config::ClientConfig {
}
}

#[serde_as]
#[derive(Debug, Deserialize)]
pub struct Scalar {
/// Mnemonic for voucher signing
#[serde_as(as = "DisplayFromStr")]
pub signer_key: SignerKey,
/// Scalar TAP verifier contract chain
pub chain_id: U256,
/// Scalar TAP verifier contract address
pub verifier: Address,
}

pub struct SignerKey(pub SecretKey);

impl fmt::Debug for SignerKey {
Expand Down
8 changes: 4 additions & 4 deletions graph-gateway/src/indexer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use indexer_selection::Selection;
use serde::Deserialize;
use toolshed::thegraph::attestation::Attestation;

use crate::receipts::ScalarReceipt;

#[derive(Debug)]
pub struct IndexerResponse {
pub status: u16,
Expand Down Expand Up @@ -55,10 +57,8 @@ impl IndexerClient {
&self,
selection: &Selection,
query: String,
receipt: &[u8],
receipt: &ScalarReceipt,
) -> Result<IndexerResponse, IndexerError> {
let receipt = hex::encode(receipt);
let receipt = &receipt[0..(receipt.len() - 64)];
let url = selection
.url
.join(&format!("subgraphs/id/{:?}", selection.indexing.deployment))
Expand All @@ -67,7 +67,7 @@ impl IndexerClient {
.client
.post(url)
.header("Content-Type", "application/json")
.header("Scalar-Receipt", receipt)
.header("Scalar-Receipt", &receipt.serialize())
.body(query)
.send()
.await
Expand Down
9 changes: 8 additions & 1 deletion graph-gateway/src/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct IndexingStatus {
pub block: BlockPointer,
pub min_block: Option<u64>,
pub cost_model: Option<Ptr<CostModel>>,
pub legacy_scalar: bool,
}

pub async fn indexing_statuses(
Expand Down Expand Up @@ -132,7 +133,7 @@ async fn update_indexer(
apply_geoblocking(&mut locked_actor, &url).await?;
drop(locked_actor);

query_status(actor, &client, indexer, url)
query_status(actor, &client, indexer, url, version)
.await
.map_err(|err| format!("IndexerStatusError({err})"))
}
Expand Down Expand Up @@ -193,6 +194,7 @@ async fn query_status(
client: &reqwest::Client,
indexer: Address,
url: Url,
version: Version,
) -> Result<Vec<(Indexing, IndexingStatus)>, String> {
let status_url = url.join("status").map_err(|err| err.to_string())?;
let statuses = query_indexer_for_indexing_statuses(client.clone(), status_url.into()).await?;
Expand Down Expand Up @@ -240,6 +242,10 @@ async fn query_status(
.collect::<HashMap<DeploymentId, Ptr<CostModel>>>();
drop(actor);

// TODO: Minimum indexer version supporting Scalar TAP (temporary, as non-TAP Scalar is deprecated)
let min_scalar_tap_version: Version = "100.0.0".parse().unwrap();
let legacy_scalar = version < min_scalar_tap_version;

Ok(statuses
.into_iter()
.filter_map(|status| {
Expand All @@ -261,6 +267,7 @@ async fn query_status(
.as_ref()
.and_then(|b| b.number.parse::<u64>().ok()),
cost_model,
legacy_scalar,
};
Some((indexing, status))
})
Expand Down
22 changes: 19 additions & 3 deletions graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn main() {
})
.collect::<HashMap<String, BlockCache>>();
let block_caches: &'static HashMap<String, BlockCache> = Box::leak(Box::new(block_caches));
let signer_key = config.signer_key.0;
let signer_key = config.scalar.signer_key.0;

let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(20))
Expand Down Expand Up @@ -141,8 +141,6 @@ async fn main() {
))
.unwrap();

let receipt_signer: &'static ReceiptSigner =
Box::leak(Box::new(ReceiptSigner::new(signer_key)));
let attestation_domain: &'static Eip712Domain =
Box::leak(Box::new(attestation::eip712_domain(
U256::from_str_radix(&config.attestations.chain_id, 10)
Expand Down Expand Up @@ -184,6 +182,24 @@ async fn main() {
)
.await;

let legacy_indexers = indexing_statuses.clone().map(|statuses| async move {
let legacy_indexers: HashSet<Address> = statuses
.iter()
.filter(|(_, status)| status.legacy_scalar)
.map(|(indexing, _)| indexing.indexer)
.collect();
Ptr::new(legacy_indexers)
});
let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new(
ReceiptSigner::new(
signer_key,
legacy_indexers,
config.scalar.chain_id,
config.scalar.verifier,
)
.await,
));

{
let update_writer = update_writer.clone();
let indexing_statuses = indexing_statuses.clone();
Expand Down
Loading

0 comments on commit a81a2f4

Please sign in to comment.