Skip to content

Commit

Permalink
refactor: store horizon receipts
Browse files Browse the repository at this point in the history
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Jan 31, 2025
1 parent 1e4ea11 commit f9fb070
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 34 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ bip39.workspace = true
tower = "0.5.1"
pin-project = "1.1.7"
tonic.workspace = true
itertools = "0.14.0"

[dev-dependencies]
hex-literal = "0.4.1"
Expand Down
210 changes: 176 additions & 34 deletions crates/service/src/tap/receipt_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use anyhow::anyhow;
use bigdecimal::num_bigint::BigInt;
use itertools::{Either, Itertools};
use sqlx::{types::BigDecimal, PgPool};
use tap_core::{manager::adapters::ReceiptStore, receipt::WithValueAndTimestamp};
use thegraph_core::alloy::{hex::ToHexExt, sol_types::Eip712Domain};
Expand All @@ -17,7 +18,25 @@ pub struct InnerContext {
}

impl InnerContext {
async fn store_receipts(&self, receipts: Vec<DatabaseReceipt>) -> Result<(), AdapterError> {
async fn process_db_receipts(&self, buffer: Vec<DatabaseReceipt>) {
let (v1_receipts, v2_receipts): (Vec<_>, Vec<_>) =
buffer.into_iter().partition_map(|r| match r {
DatabaseReceipt::V1(db_receipt_v1) => Either::Left(db_receipt_v1),
DatabaseReceipt::V2(db_receipt_v2) => Either::Right(db_receipt_v2),
});
let (insert_v1, insert_v2) = tokio::join!(
self.store_receipts_v1(v1_receipts),
self.store_receipts_v2(v2_receipts)
);
if let Err(e) = insert_v1 {
tracing::error!("Failed to store v1 receipts: {}", e);
}
if let Err(e) = insert_v2 {
tracing::error!("Failed to store v2 receipts: {}", e);
}
}

async fn store_receipts_v1(&self, receipts: Vec<DbReceiptV1>) -> Result<(), AdapterError> {
let receipts_len = receipts.len();
let mut signers = Vec::with_capacity(receipts_len);
let mut signatures = Vec::with_capacity(receipts_len);
Expand Down Expand Up @@ -66,6 +85,71 @@ impl InnerContext {

Ok(())
}

async fn store_receipts_v2(&self, receipts: Vec<DbReceiptV2>) -> Result<(), AdapterError> {
let receipts_len = receipts.len();
let mut signers = Vec::with_capacity(receipts_len);
let mut signatures = Vec::with_capacity(receipts_len);
let mut allocation_ids = Vec::with_capacity(receipts_len);
let mut payers = Vec::with_capacity(receipts_len);
let mut data_services = Vec::with_capacity(receipts_len);
let mut service_providers = Vec::with_capacity(receipts_len);
let mut timestamps = Vec::with_capacity(receipts_len);
let mut nonces = Vec::with_capacity(receipts_len);
let mut values = Vec::with_capacity(receipts_len);

for receipt in receipts {
signers.push(receipt.signer_address);
signatures.push(receipt.signature);
allocation_ids.push(receipt.allocation_id);
payers.push(receipt.payer);
data_services.push(receipt.data_service);
service_providers.push(receipt.service_provider);
timestamps.push(receipt.timestamp_ns);
nonces.push(receipt.nonce);
values.push(receipt.value);
}
sqlx::query!(
r#"INSERT INTO tap_horizon_receipts (
signer_address,
signature,
allocation_id,
payer,
data_service,
service_provider,
timestamp_ns,
nonce,
value
) SELECT * FROM UNNEST(
$1::CHAR(40)[],
$2::BYTEA[],
$3::CHAR(40)[],
$4::CHAR(40)[],
$5::CHAR(40)[],
$6::CHAR(40)[],
$7::NUMERIC(20)[],
$8::NUMERIC(20)[],
$9::NUMERIC(40)[]
)"#,
&signers,
&signatures,
&allocation_ids,
&payers,
&data_services,
&service_providers,
&timestamps,
&nonces,
&values,
)
.execute(&self.pgpool)
.await
.map_err(|e| {
tracing::error!("Failed to store receipt: {}", e);
anyhow!(e)
})?;

Ok(())
}
}

impl IndexerTapContext {
Expand All @@ -81,9 +165,7 @@ impl IndexerTapContext {
tokio::select! {
biased;
_ = receiver.recv_many(&mut buffer, BUFFER_SIZE) => {
if let Err(e) = inner_context.store_receipts(buffer).await {
tracing::error!("Failed to store receipts: {}", e);
}
inner_context.process_db_receipts(buffer).await;
}
_ = cancelation_token.cancelled() => { break },
}
Expand All @@ -108,7 +190,21 @@ impl ReceiptStore<TapReceipt> for IndexerTapContext {
}
}

pub struct DatabaseReceipt {
pub enum DatabaseReceipt {
V1(DbReceiptV1),
V2(DbReceiptV2),
}

impl DatabaseReceipt {
fn from_receipt(receipt: CheckingReceipt, separator: &Eip712Domain) -> anyhow::Result<Self> {
Ok(match receipt.signed_receipt() {
TapReceipt::V1(receipt) => Self::V1(DbReceiptV1::from_receipt(receipt, separator)?),
TapReceipt::V2(receipt) => Self::V2(DbReceiptV2::from_receipt(receipt, separator)?),
})
}
}

pub struct DbReceiptV1 {
signer_address: String,
signature: Vec<u8>,
allocation_id: String,
Expand All @@ -117,34 +213,80 @@ pub struct DatabaseReceipt {
value: BigDecimal,
}

impl DatabaseReceipt {
fn from_receipt(receipt: CheckingReceipt, separator: &Eip712Domain) -> anyhow::Result<Self> {
match receipt.signed_receipt() {
TapReceipt::V1(receipt) => {
let allocation_id = receipt.message.allocation_id.encode_hex();
let signature = receipt.signature.as_bytes().to_vec();

let signer_address = receipt
.recover_signer(separator)
.map_err(|e| {
tracing::error!("Failed to recover receipt signer: {}", e);
anyhow!(e)
})?
.encode_hex();

let timestamp_ns = BigDecimal::from(receipt.timestamp_ns());
let nonce = BigDecimal::from(receipt.message.nonce);
let value = BigDecimal::from(BigInt::from(receipt.value()));
Ok(Self {
allocation_id,
nonce,
signature,
signer_address,
timestamp_ns,
value,
})
}
TapReceipt::V2(_) => unimplemented!("Horizon Receipts are not supported yet."),
}
impl DbReceiptV1 {
fn from_receipt(
receipt: &tap_graph::SignedReceipt,
separator: &Eip712Domain,
) -> anyhow::Result<Self> {
let allocation_id = receipt.message.allocation_id.encode_hex();
let signature = receipt.signature.as_bytes().to_vec();

let signer_address = receipt
.recover_signer(separator)
.map_err(|e| {
tracing::error!("Failed to recover receipt signer: {}", e);
anyhow!(e)
})?
.encode_hex();

let timestamp_ns = BigDecimal::from(receipt.timestamp_ns());
let nonce = BigDecimal::from(receipt.message.nonce);
let value = BigDecimal::from(BigInt::from(receipt.value()));
Ok(Self {
allocation_id,
nonce,
signature,
signer_address,
timestamp_ns,
value,
})
}
}

pub struct DbReceiptV2 {
signer_address: String,
signature: Vec<u8>,
allocation_id: String,
payer: String,
data_service: String,
service_provider: String,
timestamp_ns: BigDecimal,
nonce: BigDecimal,
value: BigDecimal,
}

impl DbReceiptV2 {
fn from_receipt(
receipt: &tap_graph::v2::SignedReceipt,
separator: &Eip712Domain,
) -> anyhow::Result<Self> {
let allocation_id = receipt.message.allocation_id.encode_hex();
let payer = receipt.message.payer.encode_hex();
let data_service = receipt.message.data_service.encode_hex();
let service_provider = receipt.message.service_provider.encode_hex();
let signature = receipt.signature.as_bytes().to_vec();

let signer_address = receipt
.recover_signer(separator)
.map_err(|e| {
tracing::error!("Failed to recover receipt signer: {}", e);
anyhow!(e)
})?
.encode_hex();

let timestamp_ns = BigDecimal::from(receipt.timestamp_ns());
let nonce = BigDecimal::from(receipt.message.nonce);
let value = BigDecimal::from(BigInt::from(receipt.value()));
Ok(Self {
allocation_id,
payer,
data_service,
service_provider,
nonce,
signature,
signer_address,
timestamp_ns,
value,
})
}
}
2 changes: 2 additions & 0 deletions migrations/20250131122241_tap_horizon_receipts.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Add down migration script here
DROP TABLE IF EXISTS scalar_tap_receipts CASCADE;
18 changes: 18 additions & 0 deletions migrations/20250131122241_tap_horizon_receipts.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Add up migration script here
CREATE TABLE IF NOT EXISTS tap_horizon_receipts (
id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent
signer_address CHAR(40) NOT NULL,

-- Values below are the individual fields of the EIP-712 receipt
signature BYTEA NOT NULL,
allocation_id CHAR(40) NOT NULL,
payer CHAR(40) NOT NULL,
data_service CHAR(40) NOT NULL,
service_provider CHAR(40) NOT NULL,
timestamp_ns NUMERIC(20) NOT NULL,
nonce NUMERIC(20) NOT NULL,
value NUMERIC(39) NOT NULL
);

CREATE INDEX IF NOT EXISTS scalar_tap_receipts_allocation_id_idx ON scalar_tap_receipts (allocation_id);
CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_receipts (timestamp_ns);

0 comments on commit f9fb070

Please sign in to comment.