From f9fb0705614110204b1a5cd94a645129dfacf30d Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Fri, 31 Jan 2025 14:08:44 +0100 Subject: [PATCH] refactor: store horizon receipts Signed-off-by: Gustavo Inacio --- Cargo.lock | 10 + crates/service/Cargo.toml | 1 + crates/service/src/tap/receipt_store.rs | 210 +++++++++++++++--- ...250131122241_tap_horizon_receipts.down.sql | 2 + ...20250131122241_tap_horizon_receipts.up.sql | 18 ++ 5 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 migrations/20250131122241_tap_horizon_receipts.down.sql create mode 100644 migrations/20250131122241_tap_horizon_receipts.up.sql diff --git a/Cargo.lock b/Cargo.lock index 86b73546..c6d7e204 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3784,6 +3784,7 @@ dependencies = [ "indexer-query", "indexer-receipt", "insta", + "itertools 0.14.0", "lazy_static", "pin-project 1.1.8", "prometheus", @@ -3964,6 +3965,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.14" diff --git a/crates/service/Cargo.toml b/crates/service/Cargo.toml index 870b04e2..a38ee774 100644 --- a/crates/service/Cargo.toml +++ b/crates/service/Cargo.toml @@ -59,6 +59,7 @@ bip39.workspace = true tower = "0.5.1" pin-project = "1.1.7" tonic.workspace = true +itertools = "0.14.0" [dev-dependencies] hex-literal = "0.4.1" diff --git a/crates/service/src/tap/receipt_store.rs b/crates/service/src/tap/receipt_store.rs index d8569f7e..5994e020 100644 --- a/crates/service/src/tap/receipt_store.rs +++ b/crates/service/src/tap/receipt_store.rs @@ -3,6 +3,7 @@ use anyhow::anyhow; use bigdecimal::num_bigint::BigInt; +use itertools::{Either, Itertools}; use sqlx::{types::BigDecimal, PgPool}; use tap_core::{manager::adapters::ReceiptStore, receipt::WithValueAndTimestamp}; use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain}; @@ -17,7 +18,25 @@ pub struct InnerContext { } impl InnerContext { - async fn store_receipts(&self, receipts: Vec) -> Result<(), AdapterError> { + async fn process_db_receipts(&self, buffer: Vec) { + let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) = + buffer.into_iter().partition_map(|r| match r { + DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1), + DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2), + }); + let (insert_v1, insert_v2) = tokio::join!( + self.store_receipts_v1(v1_receipts), + self.store_receipts_v2(v2_receipts) + ); + if let Err(e) = insert_v1 { + tracing::error!("Failed to store v1 receipts: {}", e); + } + if let Err(e) = insert_v2 { + tracing::error!("Failed to store v2 receipts: {}", e); + } + } + + async fn store_receipts_v1(&self, receipts: Vec) -> Result<(), AdapterError> { let receipts_len = receipts.len(); let mut signers = Vec::with_capacity(receipts_len); let mut signatures = Vec::with_capacity(receipts_len); @@ -66,6 +85,71 @@ impl InnerContext { Ok(()) } + + async fn store_receipts_v2(&self, receipts: Vec) -> Result<(), AdapterError> { + let receipts_len = receipts.len(); + let mut signers = Vec::with_capacity(receipts_len); + let mut signatures = Vec::with_capacity(receipts_len); + let mut allocation_ids = Vec::with_capacity(receipts_len); + let mut payers = Vec::with_capacity(receipts_len); + let mut data_services = Vec::with_capacity(receipts_len); + let mut service_providers = Vec::with_capacity(receipts_len); + let mut timestamps = Vec::with_capacity(receipts_len); + let mut nonces = Vec::with_capacity(receipts_len); + let mut values = Vec::with_capacity(receipts_len); + + for receipt in receipts { + signers.push(receipt.signer_address); + signatures.push(receipt.signature); + allocation_ids.push(receipt.allocation_id); + payers.push(receipt.payer); + data_services.push(receipt.data_service); + service_providers.push(receipt.service_provider); + timestamps.push(receipt.timestamp_ns); + nonces.push(receipt.nonce); + values.push(receipt.value); + } + sqlx::query!( + r#"INSERT INTO tap_horizon_receipts ( + signer_address, + signature, + allocation_id, + payer, + data_service, + service_provider, + timestamp_ns, + nonce, + value + ) SELECT * FROM UNNEST( + $1::CHAR(40)[], + $2::BYTEA[], + $3::CHAR(40)[], + $4::CHAR(40)[], + $5::CHAR(40)[], + $6::CHAR(40)[], + $7::NUMERIC(20)[], + $8::NUMERIC(20)[], + $9::NUMERIC(40)[] + )"#, + &signers, + &signatures, + &allocation_ids, + &payers, + &data_services, + &service_providers, + ×tamps, + &nonces, + &values, + ) + .execute(&self.pgpool) + .await + .map_err(|e| { + tracing::error!("Failed to store receipt: {}", e); + anyhow!(e) + })?; + + Ok(()) + } } impl IndexerTapContext { @@ -81,9 +165,7 @@ impl IndexerTapContext { tokio::select! { biased; _ = receiver.recv_many(&mut buffer, BUFFER_SIZE) => { - if let Err(e) = inner_context.store_receipts(buffer).await { - tracing::error!("Failed to store receipts: {}", e); - } + inner_context.process_db_receipts(buffer).await; } _ = cancelation_token.cancelled() => { break }, } @@ -108,7 +190,21 @@ impl ReceiptStore for IndexerTapContext { } } -pub struct DatabaseReceipt { +pub enum DatabaseReceipt { + V1(DbReceiptV1), + V2(DbReceiptV2), +} + +impl DatabaseReceipt { + fn from_receipt(receipt: CheckingReceipt, separator: &Eip712Domain) -> anyhow::Result { + Ok(match receipt.signed_receipt() { + TapReceipt::V1(receipt) => Self::V1(DbReceiptV1::from_receipt(receipt, separator)?), + TapReceipt::V2(receipt) => Self::V2(DbReceiptV2::from_receipt(receipt, separator)?), + }) + } +} + +pub struct DbReceiptV1 { signer_address: String, signature: Vec, allocation_id: String, @@ -117,34 +213,80 @@ pub struct DatabaseReceipt { value: BigDecimal, } -impl DatabaseReceipt { - fn from_receipt(receipt: CheckingReceipt, separator: &Eip712Domain) -> anyhow::Result { - match receipt.signed_receipt() { - TapReceipt::V1(receipt) => { - let allocation_id = receipt.message.allocation_id.encode_hex(); - let signature = receipt.signature.as_bytes().to_vec(); - - let signer_address = receipt - .recover_signer(separator) - .map_err(|e| { - tracing::error!("Failed to recover receipt signer: {}", e); - anyhow!(e) - })? - .encode_hex(); - - let timestamp_ns = BigDecimal::from(receipt.timestamp_ns()); - let nonce = BigDecimal::from(receipt.message.nonce); - let value = BigDecimal::from(BigInt::from(receipt.value())); - Ok(Self { - allocation_id, - nonce, - signature, - signer_address, - timestamp_ns, - value, - }) - } - TapReceipt::V2(_) => unimplemented!("Horizon Receipts are not supported yet."), - } +impl DbReceiptV1 { + fn from_receipt( + receipt: &tap_graph::SignedReceipt, + separator: &Eip712Domain, + ) -> anyhow::Result { + let allocation_id = receipt.message.allocation_id.encode_hex(); + let signature = receipt.signature.as_bytes().to_vec(); + + let signer_address = receipt + .recover_signer(separator) + .map_err(|e| { + tracing::error!("Failed to recover receipt signer: {}", e); + anyhow!(e) + })? + .encode_hex(); + + let timestamp_ns = BigDecimal::from(receipt.timestamp_ns()); + let nonce = BigDecimal::from(receipt.message.nonce); + let value = BigDecimal::from(BigInt::from(receipt.value())); + Ok(Self { + allocation_id, + nonce, + signature, + signer_address, + timestamp_ns, + value, + }) + } +} + +pub struct DbReceiptV2 { + signer_address: String, + signature: Vec, + allocation_id: String, + payer: String, + data_service: String, + service_provider: String, + timestamp_ns: BigDecimal, + nonce: BigDecimal, + value: BigDecimal, +} + +impl DbReceiptV2 { + fn from_receipt( + receipt: &tap_graph::v2::SignedReceipt, + separator: &Eip712Domain, + ) -> anyhow::Result { + let allocation_id = receipt.message.allocation_id.encode_hex(); + let payer = receipt.message.payer.encode_hex(); + let data_service = receipt.message.data_service.encode_hex(); + let service_provider = receipt.message.service_provider.encode_hex(); + let signature = receipt.signature.as_bytes().to_vec(); + + let signer_address = receipt + .recover_signer(separator) + .map_err(|e| { + tracing::error!("Failed to recover receipt signer: {}", e); + anyhow!(e) + })? + .encode_hex(); + + let timestamp_ns = BigDecimal::from(receipt.timestamp_ns()); + let nonce = BigDecimal::from(receipt.message.nonce); + let value = BigDecimal::from(BigInt::from(receipt.value())); + Ok(Self { + allocation_id, + payer, + data_service, + service_provider, + nonce, + signature, + signer_address, + timestamp_ns, + value, + }) } } diff --git a/migrations/20250131122241_tap_horizon_receipts.down.sql b/migrations/20250131122241_tap_horizon_receipts.down.sql new file mode 100644 index 00000000..0e3dbeb5 --- /dev/null +++ b/migrations/20250131122241_tap_horizon_receipts.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS scalar_tap_receipts CASCADE; diff --git a/migrations/20250131122241_tap_horizon_receipts.up.sql b/migrations/20250131122241_tap_horizon_receipts.up.sql new file mode 100644 index 00000000..90860730 --- /dev/null +++ b/migrations/20250131122241_tap_horizon_receipts.up.sql @@ -0,0 +1,18 @@ +-- Add up migration script here +CREATE TABLE IF NOT EXISTS tap_horizon_receipts ( + id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent + signer_address CHAR(40) NOT NULL, + + -- Values below are the individual fields of the EIP-712 receipt + signature BYTEA NOT NULL, + allocation_id CHAR(40) NOT NULL, + payer CHAR(40) NOT NULL, + data_service CHAR(40) NOT NULL, + service_provider CHAR(40) NOT NULL, + timestamp_ns NUMERIC(20) NOT NULL, + nonce NUMERIC(20) NOT NULL, + value NUMERIC(39) NOT NULL +); + +CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id); +CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns);