diff --git a/Cargo.lock b/Cargo.lock index 9aa25c53..32fafbad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,7 +340,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "strum", + "strum 0.25.0", "syn 2.0.38", "thiserror", ] @@ -1265,7 +1265,7 @@ dependencies = [ "rlp", "serde", "serde_json", - "strum", + "strum 0.25.0", "syn 2.0.38", "tempfile", "thiserror", @@ -1730,6 +1730,7 @@ dependencies = [ "serde_with 3.4.0", "serde_yaml", "simple-rate-limiter", + "tap_core", "thiserror", "tokio", "toolshed", @@ -3315,6 +3316,32 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rstest" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de1bb486a691878cd320c2f0d319ba91eeaa2e894066d8b5f8f117c000e9d962" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version 0.4.0", +] + +[[package]] +name = "rstest_macros" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290ca1a1c8ca7edb7c3283bd44dc35dd54fdec6253a3912e201ba1072018fca8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "rustc_version 0.4.0", + "syn 1.0.109", + "unicode-ident", +] + [[package]] name = "ruint" version = "1.10.1" @@ -3910,13 +3937,32 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strum" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" + [[package]] name = "strum" version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros", + "strum_macros 0.25.3", +] + +[[package]] +name = "strum_macros" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 1.0.109", ] [[package]] @@ -4017,6 +4063,30 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tap_core" +version = "0.6.0" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?tag=tap_core-v0.6.0#ca5f0a97b63a6a607a265d006b30e89595505e64" +dependencies = [ + "alloy-primitives", + "alloy-sol-types", + "anyhow", + "async-trait", + "ethereum-types", + "ethers", + "ethers-contract", + "ethers-contract-derive", + "ethers-core", + "rand", + "rand_core", + "rstest", + "serde", + "strum 0.24.1", + "strum_macros 0.24.3", + "thiserror", + "tokio", +] + [[package]] name = "tempfile" version = "3.8.0" diff --git a/graph-gateway/Cargo.toml b/graph-gateway/Cargo.toml index 08ed8479..6ee588b3 100644 --- a/graph-gateway/Cargo.toml +++ b/graph-gateway/Cargo.toml @@ -40,6 +40,7 @@ serde_json = { version = "1.0", features = ["raw_value"] } serde_with = "3.1" serde_yaml = "0.9" simple-rate-limiter = "1.0" +tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", tag = "tap_core-v0.6.0" } thiserror = "1.0.40" tokio.workspace = true toolshed.workspace = true diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 5d485a8d..306354dc 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -9,7 +9,6 @@ use std::{ }, }; -use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::{anyhow, bail, Context as _}; use axum::extract::OriginalUri; @@ -51,7 +50,7 @@ use crate::chains::BlockCache; use crate::indexer_client::{check_block_error, IndexerClient, IndexerError, ResponsePayload}; use crate::indexing::IndexingStatus; use crate::metrics::{with_metric, METRICS}; -use crate::receipts::{ReceiptSigner, ReceiptStatus}; +use crate::receipts::{ReceiptSigner, ReceiptStatus, ScalarReceipt}; use crate::reports::{self, serialize_attestation, KafkaClient}; use crate::topology::{Deployment, GraphNetwork, Subgraph}; use crate::unattestable_errors::{miscategorized_attestable, miscategorized_unattestable}; @@ -747,7 +746,7 @@ async fn handle_indexer_query( .receipt_signer .create_receipt(&selection.indexing, selection.fee) .await - .map_err(|_| IndexerError::NoAllocation); + .ok_or(IndexerError::NoAllocation); let result = match receipt.as_ref() { Ok(receipt) => { @@ -803,7 +802,7 @@ async fn handle_indexer_query_inner( ctx: &mut IndexerQueryContext, selection: Selection, deterministic_query: String, - receipt: &[u8], + receipt: &ScalarReceipt, ) -> Result { let start_time = Instant::now(); let result = ctx @@ -816,8 +815,7 @@ async fn handle_indexer_query_inner( hist.observe(ctx.response_time.as_millis() as f64) }); - let allocation = Address::from_slice(&receipt[0..20]); - + let allocation = receipt.allocation(); tracing::info!(target: reports::INDEXER_QUERY_TARGET, ?allocation); let response = result?; diff --git a/graph-gateway/src/config.rs b/graph-gateway/src/config.rs index cdac0cb4..5053b9dc 100644 --- a/graph-gateway/src/config.rs +++ b/graph-gateway/src/config.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::time::Duration; use std::{collections::BTreeMap, fmt, path::PathBuf}; -use alloy_primitives::Address; +use alloy_primitives::{Address, U256}; use ethers::signers::coins_bip39::English; use ethers::signers::MnemonicBuilder; use graph_subscriptions::subscription_tier::{SubscriptionTier, SubscriptionTiers}; @@ -67,9 +67,8 @@ pub struct Config { pub port_metrics: u16, /// Target for indexer fees paid per query pub query_fees_target: f64, - /// Mnemonic for voucher signing - #[serde_as(as = "DisplayFromStr")] - pub signer_key: SignerKey, + /// Scalar TAP config (receipt signing) + pub scalar: Scalar, /// API keys that won't be blocked for non-payment #[serde(default)] pub special_api_keys: Vec, @@ -150,6 +149,18 @@ impl From for rdkafka::config::ClientConfig { } } +#[serde_as] +#[derive(Debug, Deserialize)] +pub struct Scalar { + /// Mnemonic for voucher signing + #[serde_as(as = "DisplayFromStr")] + pub signer_key: SignerKey, + /// Scalar TAP verifier contract chain + pub chain_id: U256, + /// Scalar TAP verifier contract address + pub verifier: Address, +} + pub struct SignerKey(pub SecretKey); impl fmt::Debug for SignerKey { diff --git a/graph-gateway/src/indexer_client.rs b/graph-gateway/src/indexer_client.rs index a460263c..4640e803 100644 --- a/graph-gateway/src/indexer_client.rs +++ b/graph-gateway/src/indexer_client.rs @@ -6,6 +6,8 @@ use indexer_selection::Selection; use serde::Deserialize; use toolshed::thegraph::attestation::Attestation; +use crate::receipts::ScalarReceipt; + #[derive(Debug)] pub struct IndexerResponse { pub status: u16, @@ -55,10 +57,8 @@ impl IndexerClient { &self, selection: &Selection, query: String, - receipt: &[u8], + receipt: &ScalarReceipt, ) -> Result { - let receipt = hex::encode(receipt); - let receipt = &receipt[0..(receipt.len() - 64)]; let url = selection .url .join(&format!("subgraphs/id/{:?}", selection.indexing.deployment)) @@ -67,7 +67,7 @@ impl IndexerClient { .client .post(url) .header("Content-Type", "application/json") - .header("Scalar-Receipt", receipt) + .header("Scalar-Receipt", &receipt.serialize()) .body(query) .send() .await diff --git a/graph-gateway/src/indexing.rs b/graph-gateway/src/indexing.rs index 041b6574..e5c73cdf 100644 --- a/graph-gateway/src/indexing.rs +++ b/graph-gateway/src/indexing.rs @@ -25,6 +25,7 @@ pub struct IndexingStatus { pub block: BlockPointer, pub min_block: Option, pub cost_model: Option>, + pub legacy_scalar: bool, } pub async fn indexing_statuses( @@ -132,7 +133,7 @@ async fn update_indexer( apply_geoblocking(&mut locked_actor, &url).await?; drop(locked_actor); - query_status(actor, &client, indexer, url) + query_status(actor, &client, indexer, url, version) .await .map_err(|err| format!("IndexerStatusError({err})")) } @@ -193,6 +194,7 @@ async fn query_status( client: &reqwest::Client, indexer: Address, url: Url, + version: Version, ) -> Result, String> { let status_url = url.join("status").map_err(|err| err.to_string())?; let statuses = query_indexer_for_indexing_statuses(client.clone(), status_url.into()).await?; @@ -240,6 +242,10 @@ async fn query_status( .collect::>>(); drop(actor); + // TODO: Minimum indexer version supporting Scalar TAP (temporary, as non-TAP Scalar is deprecated) + let min_scalar_tap_version: Version = "100.0.0".parse().unwrap(); + let legacy_scalar = version < min_scalar_tap_version; + Ok(statuses .into_iter() .filter_map(|status| { @@ -261,6 +267,7 @@ async fn query_status( .as_ref() .and_then(|b| b.number.parse::().ok()), cost_model, + legacy_scalar, }; Some((indexing, status)) }) diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 2ba91d0e..c039f6af 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -111,7 +111,7 @@ async fn main() { }) .collect::>(); let block_caches: &'static HashMap = Box::leak(Box::new(block_caches)); - let signer_key = config.signer_key.0; + let signer_key = config.scalar.signer_key.0; let http_client = reqwest::Client::builder() .timeout(Duration::from_secs(20)) @@ -141,8 +141,6 @@ async fn main() { )) .unwrap(); - let receipt_signer: &'static ReceiptSigner = - Box::leak(Box::new(ReceiptSigner::new(signer_key))); let attestation_domain: &'static Eip712Domain = Box::leak(Box::new(attestation::eip712_domain( U256::from_str_radix(&config.attestations.chain_id, 10) @@ -184,6 +182,24 @@ async fn main() { ) .await; + let legacy_indexers = indexing_statuses.clone().map(|statuses| async move { + let legacy_indexers: HashSet
= statuses + .iter() + .filter(|(_, status)| status.legacy_scalar) + .map(|(indexing, _)| indexing.indexer) + .collect(); + Ptr::new(legacy_indexers) + }); + let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new( + ReceiptSigner::new( + signer_key, + legacy_indexers, + config.scalar.chain_id, + config.scalar.verifier, + ) + .await, + )); + { let update_writer = update_writer.clone(); let indexing_statuses = indexing_statuses.clone(); diff --git a/graph-gateway/src/receipts.rs b/graph-gateway/src/receipts.rs index 722781a1..f55b679d 100644 --- a/graph-gateway/src/receipts.rs +++ b/graph-gateway/src/receipts.rs @@ -1,56 +1,137 @@ -use std::{ - collections::{hash_map::Entry, HashMap}, - sync::Arc, -}; +use std::collections::HashSet; +use std::collections::{hash_map::Entry, HashMap}; +use std::sync::Arc; +use std::time::SystemTime; -use alloy_primitives::Address; +use alloy_primitives::{Address, U256}; +use alloy_sol_types::Eip712Domain; +use ethers::signers::Wallet; +use eventuals::{Eventual, Ptr}; +use rand::RngCore; +use tap_core::eip_712_signed_message::EIP712SignedMessage; +use tap_core::tap_receipt::Receipt; use tokio::sync::{Mutex, RwLock}; pub use indexer_selection::receipts::QueryStatus as ReceiptStatus; -use indexer_selection::{ - receipts::{BorrowFail, ReceiptPool}, - Indexing, SecretKey, -}; +use indexer_selection::receipts::ReceiptPool; +use indexer_selection::{Indexing, SecretKey}; use prelude::GRT; pub struct ReceiptSigner { // TODO: When legacy (non-TAP) Scalar is removed, this should contain the only owned reference // to the SignerKey. This will resolve https://github.com/edgeandnode/graph-gateway/issues/13. signer_key: SecretKey, + domain: Eip712Domain, + allocations: RwLock>, + legacy_indexers: Eventual>>, legacy_pools: RwLock>>>, } +pub enum ScalarReceipt { + Legacy(Vec), + TAP(EIP712SignedMessage), +} + +impl ScalarReceipt { + pub fn allocation(&self) -> Address { + match self { + ScalarReceipt::Legacy(receipt) => Address::from_slice(&receipt[0..20]), + ScalarReceipt::TAP(receipt) => receipt.message.allocation_id, + } + } + + pub fn serialize(&self) -> String { + match self { + ScalarReceipt::Legacy(receipt) => hex::encode(&receipt[..(receipt.len() - 32)]), + ScalarReceipt::TAP(receipt) => serde_json::to_string(&receipt).unwrap(), + } + } +} + impl ReceiptSigner { - pub fn new(signer_key: SecretKey) -> Self { + pub async fn new( + signer_key: SecretKey, + legacy_indexers: Eventual>>, + chain_id: U256, + verifier: Address, + ) -> Self { + let _ = legacy_indexers.value().await; Self { signer_key, + domain: Eip712Domain { + name: Some("Scalar TAP".into()), + version: Some("1".into()), + chain_id: Some(chain_id), + verifying_contract: Some(verifier), + salt: None, + }, + allocations: RwLock::default(), + legacy_indexers, legacy_pools: RwLock::default(), } } - pub async fn create_receipt( - &self, - indexing: &Indexing, - fee: GRT, - ) -> Result, BorrowFail> { - let legacy_pools = self.legacy_pools.read().await; - let legacy_pool = legacy_pools.get(indexing).ok_or(BorrowFail::NoAllocation)?; - let mut legacy_pool = legacy_pool.lock().await; - legacy_pool.commit(fee.shift::<0>().as_u256()) - } + pub async fn create_receipt(&self, indexing: &Indexing, fee: GRT) -> Option { + if self + .legacy_indexers + .value_immediate() + .unwrap_or_default() + .contains(&indexing.indexer) + { + let legacy_pools = self.legacy_pools.read().await; + let legacy_pool = legacy_pools.get(indexing)?; + let mut legacy_pool = legacy_pool.lock().await; + let receipt = legacy_pool.commit(fee.shift::<0>().as_u256()).ok()?; + return Some(ScalarReceipt::Legacy(receipt)); + } - pub async fn record_receipt(&self, indexing: &Indexing, receipt: &[u8], status: ReceiptStatus) { - let legacy_pool = self.legacy_pools.read().await; - let mut legacy_pool = match legacy_pool.get(indexing) { - Some(legacy_pool) => legacy_pool.lock().await, - None => return, + let allocation = *self.allocations.read().await.get(indexing)?; + // TODO: risk management (cap on outstanding debts that proactively prevents sending receipt) + let nonce = rand::thread_rng().next_u64(); + let timestamp_ns = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() + .try_into() + .unwrap(); + let receipt = Receipt { + allocation_id: allocation.0 .0.into(), + timestamp_ns, + nonce, + value: fee.shift::<0>().as_u256().as_u128(), }; - legacy_pool.release(receipt, status); + let wallet = + Wallet::from_bytes(self.signer_key.as_ref()).expect("failed to prepare receipt wallet"); + let signed = EIP712SignedMessage::new(&self.domain, receipt, &wallet) + .await + .expect("failed to sign receipt"); + Some(ScalarReceipt::TAP(signed)) + } + + pub async fn record_receipt( + &self, + indexing: &Indexing, + receipt: &ScalarReceipt, + status: ReceiptStatus, + ) { + match receipt { + ScalarReceipt::Legacy(receipt) => { + let legacy_pool = self.legacy_pools.read().await; + let mut legacy_pool = match legacy_pool.get(indexing) { + Some(legacy_pool) => legacy_pool.lock().await, + None => return, + }; + legacy_pool.release(receipt, status); + } + ScalarReceipt::TAP(_) => { + // TODO: TAP collateral management + } + } } pub async fn update_allocations(&self, indexings: HashMap) { - for (indexing, allocation) in indexings { - let legacy_pool = self.get(&indexing).await; + for (indexing, allocation) in &indexings { + let legacy_pool = self.get(indexing).await; let mut legacy_pool = legacy_pool.lock().await; // remove stale allocations for old_allocation in legacy_pool @@ -61,10 +142,18 @@ impl ReceiptSigner { legacy_pool.remove_allocation(&old_allocation); } // add allocation, if not already present - if !legacy_pool.contains_allocation(&allocation) { - legacy_pool.add_allocation(self.signer_key, allocation.into()); + if !legacy_pool.contains_allocation(allocation) { + legacy_pool.add_allocation(self.signer_key, *allocation.0); } } + + let mut allocations = self.allocations.write().await; + // remove stale allocations + allocations.retain(|k, _| indexings.contains_key(k)); + // update allocations + for (indexing, allocation) in indexings { + allocations.insert(indexing, allocation); + } } async fn get(&self, indexing: &Indexing) -> Arc> {