From c3ede8f8bfe1820dd0bdc8876af083a8971d39ff Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Fri, 24 Jan 2025 15:15:10 -0300 Subject: [PATCH] feat: grpc client for tap aggregator (#583) * feat: remove jsonrpc client in favor of grpc client Co-authored-by: Gustavo Inacio Signed-off-by: pedro bufulin Signed-off-by: Gustavo Inacio * refactor: adjust endpoint in test * chore: cargo fmt * refactor: remove commented code and unnecessary stop of handler in test * test: handle error from failed connection * feat: log out endpoint in case of failed connection attempt * ci: add TapAggregator server mock to minimize changes to tests * ci: fix line indent * ci: force ri rerun * refactor: cargo fmt * ci: move mock server start to test-and-coverage * test: test aggregator server helper function * test: instantiate server instead of using mock * ci: remove unnecessary step * refactor: correct typo * refactor: correct typo * refactor: replace space * refactor: remove unnecessary message * test: fix hanging test Signed-off-by: Gustavo Inacio * refactor: change expect with context Signed-off-by: Gustavo Inacio * test: use get_grpc_url() to spawn a grpc Signed-off-by: Gustavo Inacio * test: use server_url directly Signed-off-by: Gustavo Inacio * test: update all tests to use get_grpc_url Signed-off-by: Gustavo Inacio * chore: add comment about compression Signed-off-by: Gustavo Inacio --------- Signed-off-by: pedro bufulin Signed-off-by: Gustavo Inacio Co-authored-by: pedro bufulin --- Cargo.lock | 150 +++++++------ Cargo.toml | 6 +- crates/tap-agent/Cargo.toml | 2 + crates/tap-agent/src/agent/sender_account.rs | 29 ++- .../src/agent/sender_accounts_manager.rs | 15 +- .../tap-agent/src/agent/sender_allocation.rs | 198 +++++++----------- crates/tap-agent/src/tap/context/rav.rs | 4 +- crates/tap-agent/src/test.rs | 34 ++- 8 files changed, 231 insertions(+), 207 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03147fa9..788b3948 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,9 +68,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "alloy" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b0561294ccedc6181e5528b850b4579e3fbde696507baa00109bfd9054c5bb" +checksum = "59febb24956a41c29bb5f450978fbe825bd6456b3f80586c8bd558dc882e7b6a" dependencies = [ "alloy-consensus", "alloy-contract", @@ -107,9 +107,9 @@ dependencies = [ [[package]] name = "alloy-consensus" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a101d4d016f47f13890a74290fdd17b05dd175191d9337bc600791fb96e4dea8" +checksum = "e88e1edea70787c33e11197d3f32ae380f3db19e6e061e539a5bcf8184a6b326" dependencies = [ "alloy-eips", "alloy-primitives", @@ -125,9 +125,9 @@ dependencies = [ [[package]] name = "alloy-consensus-any" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa60357dda9a3d0f738f18844bd6d0f4a5924cc5cf00bfad2ff1369897966123" +checksum = "57b1bb53f40c0273cd1975573cd457b39213e68584e36d1401d25fd0398a1d65" dependencies = [ "alloy-consensus", "alloy-eips", @@ -139,9 +139,9 @@ dependencies = [ [[package]] name = "alloy-contract" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2869e4fb31331d3b8c58c7db567d1e4e4e94ef64640beda3b6dd9b7045690941" +checksum = "1b668c78c4b1f12f474ede5a85e8ce550d0aa1ef7d49fd1d22855a43b960e725" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -160,9 +160,9 @@ dependencies = [ [[package]] name = "alloy-core" -version = "0.8.13" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8d22df68fa7d9744be0b1a9be3260e9aa089fbf41903ab182328333061ed186" +checksum = "c618bd382f0bc2ac26a7e4bfae01c9b015ca8f21b37ca40059ae35a7e62b3dc6" dependencies = [ "alloy-dyn-abi", "alloy-json-abi", @@ -173,9 +173,9 @@ dependencies = [ [[package]] name = "alloy-dyn-abi" -version = "0.8.13" +version = "0.8.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cf633ae9a1f0c82fdb9e559ed2be1c8e415c3e48fc47e1feaf32c6078ec0cdd" +checksum = "41056bde53ae10ffbbf11618efbe1e0290859e5eab0fe9ef82ebdb62f12a866f" dependencies = [ "alloy-json-abi", "alloy-primitives", @@ -215,9 +215,9 @@ dependencies = [ [[package]] name = "alloy-eips" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6755b093afef5925f25079dd5a7c8d096398b804ba60cb5275397b06b31689" +checksum = "5f9fadfe089e9ccc0650473f2d4ef0a28bc015bbca5631d9f0f09e49b557fdb3" dependencies = [ "alloy-eip2930", "alloy-eip7702", @@ -233,9 +233,9 @@ dependencies = [ [[package]] name = "alloy-genesis" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aeec8e6eab6e52b7c9f918748c9b811e87dbef7312a2e3a2ca1729a92966a6af" +checksum = "2b2a4cf7b70f3495788e74ce1c765260ffe38820a2a774ff4aacb62e31ea73f9" dependencies = [ "alloy-primitives", "alloy-serde", @@ -257,9 +257,9 @@ dependencies = [ [[package]] name = "alloy-json-rpc" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fa077efe0b834bcd89ff4ba547f48fb081e4fdc3673dd7da1b295a2cf2bb7b7" +checksum = "e29040b9d5fe2fb70415531882685b64f8efd08dfbd6cc907120650504821105" dependencies = [ "alloy-primitives", "alloy-sol-types", @@ -271,9 +271,9 @@ dependencies = [ [[package]] name = "alloy-network" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "209a1882a08e21aca4aac6e2a674dc6fcf614058ef8cb02947d63782b1899552" +checksum = "510cc00b318db0dfccfdd2d032411cfae64fc144aef9679409e014145d3dacc4" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -296,9 +296,9 @@ dependencies = [ [[package]] name = "alloy-network-primitives" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20219d1ad261da7a6331c16367214ee7ded41d001fabbbd656fbf71898b2773" +checksum = "9081c099e798b8a2bba2145eb82a9a146f01fc7a35e9ab6e7b43305051f97550" dependencies = [ "alloy-consensus", "alloy-eips", @@ -337,9 +337,9 @@ dependencies = [ [[package]] name = "alloy-provider" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9eefa6f4c798ad01f9b4202d02cea75f5ec11fa180502f4701e2b47965a8c0bb" +checksum = "dc2dfaddd9a30aa870a78a4e1316e3e115ec1e12e552cbc881310456b85c1f24" dependencies = [ "alloy-chains", "alloy-consensus", @@ -377,9 +377,9 @@ dependencies = [ [[package]] name = "alloy-pubsub" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aac9a7210e0812b1d814118f426f57eb7fc260a419224dd1c76d169879c06907" +checksum = "695809e743628d54510c294ad17a4645bd9f465aeb0d20ee9ce9877c9712dc9c" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -418,9 +418,9 @@ dependencies = [ [[package]] name = "alloy-rpc-client" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed30bf1041e84cabc5900f52978ca345dd9969f2194a945e6fdec25b0620705c" +checksum = "531137b283547d5b9a5cafc96b006c64ef76810c681d606f28be9781955293b6" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -444,9 +444,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab686b0fa475d2a4f5916c5f07797734a691ec58e44f0f55d4746ea39cbcefb" +checksum = "3410a472ce26c457e9780f708ee6bd540b30f88f1f31fdab7a11d00bd6aa1aee" dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-any" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "200661999b6e235d9840be5d60a6e8ae2f0af9eb2a256dd378786744660e36ec" +checksum = "ed98e1af55a7d856bfa385f30f63d8d56be2513593655c904a8f4a7ec963aa3e" dependencies = [ "alloy-consensus-any", "alloy-rpc-types-eth", @@ -468,9 +468,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-engine" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d297268357e3eae834ddd6888b15f764cbc0f4b3be9265f5f6ec239013f3d68" +checksum = "03bd16fa4959255ebf4a7702df08f325e5631df5cdca07c8a8e58bdc10fe02e3" dependencies = [ "alloy-consensus", "alloy-eips", @@ -484,9 +484,9 @@ dependencies = [ [[package]] name = "alloy-rpc-types-eth" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0600b8b5e2dc0cab12cbf91b5a885c35871789fb7b3a57b434bd4fced5b7a8b" +checksum = "8737d7a6e37ca7bba9c23e9495c6534caec6760eb24abc9d5ffbaaba147818e1" dependencies = [ "alloy-consensus", "alloy-consensus-any", @@ -504,9 +504,9 @@ dependencies = [ [[package]] name = "alloy-serde" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9afa753a97002a33b2ccb707d9f15f31c81b8c1b786c95b73cc62bb1d1fd0c3f" +checksum = "5851bf8d5ad33014bd0c45153c603303e730acc8a209450a7ae6b4a12c2789e2" dependencies = [ "alloy-primitives", "serde", @@ -515,9 +515,9 @@ dependencies = [ [[package]] name = "alloy-signer" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2cbff01a673936c2efd7e00d4c0e9a4dbbd6d600e2ce298078d33efbb19cd7" +checksum = "7e10ca565da6500cca015ba35ee424d59798f2e1b85bc0dd8f81dafd401f029a" dependencies = [ "alloy-dyn-abi", "alloy-primitives", @@ -531,9 +531,9 @@ dependencies = [ [[package]] name = "alloy-signer-aws" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71ce77227fdb9059fd7a3b38a8679c0dae95d81886ee8c13ef8ad99d74866bbd" +checksum = "1e774d4203ad7dbeba06876c8528a169b7cb56770bd900bc061e6a2c2756a736" dependencies = [ "alloy-consensus", "alloy-network", @@ -549,9 +549,9 @@ dependencies = [ [[package]] name = "alloy-signer-gcp" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7622438a51e1fa6379cad6bff52e0cde88b0d4e5e3f2f15e5feebdee527ef5f2" +checksum = "9843facd50077d2010ac0ef9e9176f8a06f2e2c8e653d83d82859803c623c6fc" dependencies = [ "alloy-consensus", "alloy-network", @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "alloy-signer-ledger" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b56789cbd13bace37acd7afd080aa7002ed65ab84f0220cd0c32e162b0afd6" +checksum = "08367716d2eee6f15f0f7ee2e855decbfedd12be12fe5f490a2d2717deda95bf" dependencies = [ "alloy-consensus", "alloy-dyn-abi", @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "alloy-signer-local" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6d988cb6cd7d2f428a74476515b1a6e901e08c796767f9f93311ab74005c8b" +checksum = "47fababf5a745133490cde927d48e50267f97d3d1209b9fc9f1d1d666964d172" dependencies = [ "alloy-consensus", "alloy-network", @@ -678,9 +678,9 @@ dependencies = [ [[package]] name = "alloy-transport" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d69d36982b9e46075ae6b792b0f84208c6c2c15ad49f6c500304616ef67b70e0" +checksum = "538a04a37221469cac0ce231b737fd174de2fdfcdd843bdd068cb39ed3e066ad" dependencies = [ "alloy-json-rpc", "base64 0.22.1", @@ -698,9 +698,9 @@ dependencies = [ [[package]] name = "alloy-transport-http" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e02ffd5d93ffc51d72786e607c97de3b60736ca3e636ead0ec1f7dce68ea3fd" +checksum = "2ed40eb1e1265b2911512f6aa1dcece9702d078f5a646730c45e39e2be00ac1c" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -713,9 +713,9 @@ dependencies = [ [[package]] name = "alloy-transport-ipc" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b6f8b87cb84bae6d81ae6604b37741c8116f84f9784a0ecc6038c302e679d23" +checksum = "a7a172a59d24706b26a79a837f86d51745cb26ca6f8524712acd0208a14cff95" dependencies = [ "alloy-json-rpc", "alloy-pubsub", @@ -732,9 +732,9 @@ dependencies = [ [[package]] name = "alloy-transport-ws" -version = "0.7.3" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c085c4e1e7680b723ffc558f61a22c061ed3f70eb3436f93f3936779c59cec1" +checksum = "fba0e39d181d13c266dbb8ca54ed584a2c66d6e9279afca89c7a6b1825e98abb" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -3331,7 +3331,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.8", "tokio", "tower-service", "tracing", @@ -3818,9 +3818,11 @@ dependencies = [ "thegraph-core", "thiserror 1.0.69", "tokio", + "tonic", "tracing", "tracing-subscriber", "wiremock", + "wiremock-grpc", ] [[package]] @@ -5195,7 +5197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.13.0", "log", "multimap", @@ -6909,24 +6911,29 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tap_aggregator" -version = "0.3.2" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=1c6e29f#1c6e29f56fc1672087070c7e8e710bac0564e273" +version = "0.3.3" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=6af1add#6af1add4a326c77100491ba353050cb38f319631" dependencies = [ "alloy", "anyhow", "axum", "clap", "futures-util", + "hyper 1.5.1", "jsonrpsee", "lazy_static", "log", "prometheus", + "prost", + "rayon", "ruint", "serde", "serde_json", "strum", "tap_core", "tokio", + "tonic", + "tonic-build", "tower 0.4.13", "tracing-subscriber", ] @@ -6934,7 +6941,7 @@ dependencies = [ [[package]] name = "tap_core" version = "2.0.0" -source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=1c6e29f#1c6e29f56fc1672087070c7e8e710bac0564e273" +source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol?rev=6af1add#6af1add4a326c77100491ba353050cb38f319631" dependencies = [ "alloy", "anyhow", @@ -7015,9 +7022,9 @@ dependencies = [ [[package]] name = "thegraph-core" -version = "0.9.6" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e669ad507b7afcf8b2d303e98de2d8bcd7af56042a8626cd708838349cc4d928" +checksum = "0bcee8212008d3f49e00b4d052b6c3c7b422d8fd7c8c92efe9aa608762473293" dependencies = [ "alloy", "bs58", @@ -7344,6 +7351,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd", ] [[package]] @@ -8020,7 +8028,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -8318,6 +8326,20 @@ dependencies = [ "url", ] +[[package]] +name = "wiremock-grpc" +version = "0.0.3-alpha3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba47a86473b3cf4c25d09048bc65ecc26b24d921fb927708b66169dff92dd3a" +dependencies = [ + "http-body 1.0.1", + "log", + "prost", + "rand", + "tokio", + "tonic", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 35855d4a..fa9784de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,14 +52,14 @@ uuid = { version = "1.11.0", features = ["v7"] } tracing = { version = "0.1.40", default-features = false } bigdecimal = "0.4.3" build-info = "0.0.39" -tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false } -tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false } +tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false } +tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false } tracing-subscriber = { version = "0.3", features = [ "json", "env-filter", "ansi", ], default-features = false } -thegraph-core = { version = "0.9.6", features = [ +thegraph-core = { version = "0.10.0", features = [ "attestation", "alloy-eip712", "alloy-sol-types", diff --git a/crates/tap-agent/Cargo.toml b/crates/tap-agent/Cargo.toml index 93f5263b..b4be9171 100644 --- a/crates/tap-agent/Cargo.toml +++ b/crates/tap-agent/Cargo.toml @@ -31,6 +31,7 @@ lazy_static.workspace = true thegraph-core.workspace = true clap.workspace = true tracing-subscriber.workspace = true +tonic.workspace = true bigdecimal = { workspace = true, features = ["serde"] } graphql_client.workspace = true @@ -48,5 +49,6 @@ futures = { version = "0.3.30", default-features = false } [dev-dependencies] tempfile = "3.8.0" wiremock.workspace = true +wiremock-grpc = "0.0.3-alpha3" test-assets = { path = "../test-assets" } test-log = { version = "0.2.12", default-features = false } diff --git a/crates/tap-agent/src/agent/sender_account.rs b/crates/tap-agent/src/agent/sender_account.rs index 26eef826..de8219ca 100644 --- a/crates/tap-agent/src/agent/sender_account.rs +++ b/crates/tap-agent/src/agent/sender_account.rs @@ -7,6 +7,7 @@ use std::{ time::Duration, }; +use anyhow::Context; use bigdecimal::{num_bigint::ToBigInt, ToPrimitive}; use futures::{stream, StreamExt}; use indexer_monitor::{EscrowAccounts, SubgraphClient}; @@ -15,12 +16,12 @@ use indexer_query::{ unfinalized_transactions, UnfinalizedTransactions, }; use indexer_watcher::watch_pipe; -use jsonrpsee::http_client::HttpClientBuilder; use lazy_static::lazy_static; use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec}; use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent}; use reqwest::Url; use sqlx::PgPool; +use tap_aggregator::grpc::tap_aggregator_client::TapAggregatorClient; use tap_core::rav::SignedRAV; use thegraph_core::alloy::{ hex::ToHexExt, @@ -28,6 +29,7 @@ use thegraph_core::alloy::{ sol_types::Eip712Domain, }; use tokio::{sync::watch::Receiver, task::JoinHandle}; +use tonic::transport::{Channel, Endpoint}; use tracing::Level; use super::sender_allocation::{ @@ -171,7 +173,7 @@ pub struct State { domain_separator: Eip712Domain, pgpool: PgPool, - sender_aggregator: jsonrpsee::http_client::HttpClient, + sender_aggregator: TapAggregatorClient, // Backoff info backoff_info: BackoffInfo, @@ -603,10 +605,21 @@ impl Actor for SenderAccount { .with_label_values(&[&sender_id.to_string()]) .set(config.trigger_value as f64); - let sender_aggregator = HttpClientBuilder::default() - .request_timeout(config.rav_request_timeout) - .build(&sender_aggregator_endpoint)?; + let endpoint = Endpoint::new(sender_aggregator_endpoint.to_string()) + .context("Failed to create an endpoint for the sender aggregator")?; + let sender_aggregator = TapAggregatorClient::connect(endpoint.clone()) + .await + .with_context(|| { + format!( + "Failed to connect to the TapAggregator endpoint '{}'", + endpoint.uri() + ) + })?; + // wiremock_grpc used for tests doesn't support Zstd compression + #[cfg(not(test))] + let sender_aggregator = + sender_aggregator.send_compressed(tonic::codec::CompressionEncoding::Zstd); let state = State { prefix, sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer), @@ -1070,7 +1083,7 @@ pub mod tests { assert_not_triggered, assert_triggered, test::{ actors::{create_mock_sender_allocation, MockSenderAllocation, TestableActor}, - create_rav, store_rav_with_options, INDEXER, TAP_EIP712_DOMAIN_SEPARATOR, + create_rav, get_grpc_url, store_rav_with_options, INDEXER, TAP_EIP712_DOMAIN_SEPARATOR, }, }; @@ -1192,6 +1205,8 @@ pub mod tests { )) .expect("Failed to update escrow_accounts channel"); + // Start a new mock aggregator server for this test + let prefix = format!( "test-{}", PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) @@ -1206,7 +1221,7 @@ pub mod tests { escrow_subgraph, network_subgraph, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), - sender_aggregator_endpoint: Url::parse(DUMMY_URL).unwrap(), + sender_aggregator_endpoint: Url::parse(&get_grpc_url().await).unwrap(), allocation_ids: HashSet::new(), prefix: Some(prefix.clone()), retry_interval: RETRY_DURATION, diff --git a/crates/tap-agent/src/agent/sender_accounts_manager.rs b/crates/tap-agent/src/agent/sender_accounts_manager.rs index 615633fd..94797979 100644 --- a/crates/tap-agent/src/agent/sender_accounts_manager.rs +++ b/crates/tap-agent/src/agent/sender_accounts_manager.rs @@ -609,7 +609,7 @@ mod tests { use reqwest::Url; use ruint::aliases::U256; use sqlx::{postgres::PgListener, PgPool}; - use test_assets::{flush_messages, TAP_SENDER as SENDER}; + use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use thegraph_core::alloy::hex::ToHexExt; use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Notify}; @@ -624,8 +624,8 @@ mod tests { }, test::{ actors::{DummyActor, MockSenderAccount, MockSenderAllocation, TestableActor}, - create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0, - ALLOCATION_ID_1, INDEXER, SENDER_2, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, + create_rav, create_received_receipt, get_grpc_url, store_rav, store_receipt, + ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER_2, TAP_EIP712_DOMAIN_SEPARATOR, }, }; @@ -670,6 +670,7 @@ mod tests { let (_, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); + // Start a new mock aggregator server for this test let prefix = format!( "test-{}", PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst) @@ -683,8 +684,8 @@ mod tests { escrow_subgraph, network_subgraph, sender_aggregator_endpoints: HashMap::from([ - (SENDER.1, Url::parse("http://localhost:8000").unwrap()), - (SENDER_2.1, Url::parse("http://localhost:8000").unwrap()), + (SENDER.1, Url::parse(&get_grpc_url().await).unwrap()), + (SENDER_2.1, Url::parse(&get_grpc_url().await).unwrap()), ]), prefix: Some(prefix.clone()), }; @@ -727,8 +728,8 @@ mod tests { escrow_subgraph: get_subgraph_client().await, network_subgraph: get_subgraph_client().await, sender_aggregator_endpoints: HashMap::from([ - (SENDER.1, Url::parse("http://localhost:8000").unwrap()), - (SENDER_2.1, Url::parse("http://localhost:8000").unwrap()), + (SENDER.1, Url::parse(&get_grpc_url().await).unwrap()), + (SENDER_2.1, Url::parse(&get_grpc_url().await).unwrap()), ]), prefix: Some(prefix), }, diff --git a/crates/tap-agent/src/agent/sender_allocation.rs b/crates/tap-agent/src/agent/sender_allocation.rs index decb5c4c..0c0f938a 100644 --- a/crates/tap-agent/src/agent/sender_allocation.rs +++ b/crates/tap-agent/src/agent/sender_allocation.rs @@ -9,11 +9,10 @@ use std::{ use anyhow::{anyhow, ensure}; use bigdecimal::{num_bigint::BigInt, ToPrimitive}; use indexer_monitor::{EscrowAccounts, SubgraphClient}; -use jsonrpsee::{core::client::ClientT, rpc_params}; use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; use ractor::{Actor, ActorProcessingErr, ActorRef}; use sqlx::{types::BigDecimal, PgPool}; -use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; +use tap_aggregator::grpc::{tap_aggregator_client::TapAggregatorClient, RavRequest}; use tap_core::{ manager::adapters::RAVRead, rav::{RAVRequest, ReceiptAggregateVoucher, SignedRAV}, @@ -27,6 +26,7 @@ use tap_core::{ use thegraph_core::alloy::{hex::ToHexExt, primitives::Address, sol_types::Eip712Domain}; use thiserror::Error; use tokio::sync::watch::Receiver; +use tonic::{transport::Channel, Code, Status}; use super::sender_account::SenderAccountConfig; use crate::{ @@ -81,7 +81,7 @@ pub enum RavError { TapCore(#[from] tap_core::Error), #[error(transparent)] - JsonRpsee(#[from] jsonrpsee::core::ClientError), + Grpc(#[from] tonic::Status), #[error("All receipts are invalid")] AllReceiptsInvalid, @@ -106,9 +106,7 @@ pub struct SenderAllocationState { escrow_accounts: Receiver, domain_separator: Eip712Domain, sender_account_ref: ActorRef, - - sender_aggregator: jsonrpsee::http_client::HttpClient, - + sender_aggregator: TapAggregatorClient, //config timestamp_buffer_ns: u64, rav_request_receipt_limit: u64, @@ -141,7 +139,7 @@ pub struct SenderAllocationArgs { pub escrow_subgraph: &'static SubgraphClient, pub domain_separator: Eip712Domain, pub sender_account_ref: ActorRef, - pub sender_aggregator: jsonrpsee::http_client::HttpClient, + pub sender_aggregator: TapAggregatorClient, //config pub config: AllocationConfig, @@ -383,7 +381,6 @@ impl SenderAllocationState { sender, escrow_accounts, domain_separator, - sender_account_ref: sender_account_ref.clone(), unaggregated_fees: UnaggregatedReceipts::default(), invalid_receipts_fees: UnaggregatedReceipts::default(), @@ -592,20 +589,17 @@ impl SenderAllocationState { .into_iter() .map(|r| r.signed_receipt().clone()) .collect(); + + let rav_request = RavRequest::new(valid_receipts, previous_rav); + let rav_response_time_start = Instant::now(); - let response: JsonRpcResponse> = self + + let response = self .sender_aggregator - .request( - "aggregate_receipts", - rpc_params!( - "0.0", // TODO: Set the version in a smarter place. - valid_receipts, - previous_rav - ), - ) + .aggregate_receipts(rav_request) .await - .inspect_err(|err| { - if let jsonrpsee::core::ClientError::RequestTimeout = &err { + .inspect_err(|status: &Status| { + if status.code() == Code::DeadlineExceeded { tracing::warn!( "Rav request is timing out, maybe request_timeout_secs is too \ low in your config file, try adding more secs to the value. \ @@ -634,13 +628,11 @@ impl SenderAllocationState { self.store_invalid_receipts(invalid_receipts.as_slice()) .await?; } + let signed_rav = response.into_inner().signed_rav()?; - if let Some(warnings) = response.warnings { - tracing::warn!("Warnings from sender's TAP aggregator: {:?}", warnings); - } match self .tap_manager - .verify_and_store_rav(expected_rav.clone(), response.data.clone()) + .verify_and_store_rav(expected_rav.clone(), signed_rav.clone()) .await { Ok(_) => {} @@ -662,7 +654,7 @@ impl SenderAllocationState { | e @ tap_core::Error::SignatureError(_) | e @ tap_core::Error::InvalidRecoveredSigner { address: _ }, ) => { - Self::store_failed_rav(self, &expected_rav, &response.data, &e.to_string()) + Self::store_failed_rav(self, &expected_rav, &signed_rav, &e.to_string()) .await?; return Err(anyhow::anyhow!( "Invalid RAV, sender could be malicious: {:?}.", @@ -681,7 +673,7 @@ impl SenderAllocationState { .into()); } } - Ok(response.data) + Ok(signed_rav) } (Err(tap_core::Error::NoValidReceiptsForRAVRequest), true, true) => Err(anyhow!( "It looks like there are no valid receipts for the RAV request.\ @@ -874,12 +866,11 @@ pub mod tests { use futures::future::join_all; use indexer_monitor::{DeploymentDetails, EscrowAccounts, SubgraphClient}; - use jsonrpsee::http_client::HttpClientBuilder; use ractor::{call, cast, Actor, ActorRef, ActorStatus}; use ruint::aliases::U256; use serde_json::json; use sqlx::PgPool; - use tap_aggregator::{jsonrpsee_helpers::JsonRpcResponse, server::run_server}; + use tap_aggregator::grpc::{tap_aggregator_client::TapAggregatorClient, RavResponse}; use tap_core::receipt::{ checks::{Check, CheckError, CheckList, CheckResult}, state::Checking, @@ -890,10 +881,12 @@ pub mod tests { TAP_SENDER as SENDER, TAP_SIGNER as SIGNER, }; use tokio::sync::{watch, Notify}; + use tonic::{transport::Endpoint, Code}; use wiremock::{ matchers::{body_string_contains, method}, - Mock, MockGuard, MockServer, Respond, ResponseTemplate, + Mock, MockGuard, MockServer, ResponseTemplate, }; + use wiremock_grpc::{MockBuilder, Then}; use super::{ SenderAllocation, SenderAllocationArgs, SenderAllocationMessage, SenderAllocationState, @@ -906,13 +899,11 @@ pub mod tests { }, test::{ actors::{create_mock_sender_account, TestableActor}, - create_rav, create_received_receipt, store_invalid_receipt, store_rav, store_receipt, - INDEXER, + create_rav, create_received_receipt, get_grpc_url, store_invalid_receipt, store_rav, + store_receipt, INDEXER, }, }; - const DUMMY_URL: &str = "http://localhost:1234"; - async fn mock_escrow_subgraph() -> (MockServer, MockGuard) { let mock_ecrow_subgraph_server: MockServer = MockServer::start().await; let _mock_ecrow_subgraph = mock_ecrow_subgraph_server @@ -956,9 +947,17 @@ pub mod tests { None => create_mock_sender_account().await.1, }; - let sender_aggregator = HttpClientBuilder::default() - .build(&sender_aggregator_endpoint) - .unwrap(); + let endpoint = Endpoint::new(sender_aggregator_endpoint.to_string()).unwrap(); + + let sender_aggregator = TapAggregatorClient::connect(endpoint.clone()) + .await + .unwrap_or_else(|err| { + panic!( + "Failed to connect to the TapAggregator endpoint '{}': Err: {err:?}", + endpoint.uri() + ) + }); + SenderAllocationArgs { pgpool: pgpool.clone(), allocation_id: ALLOCATION_ID_0, @@ -1012,7 +1011,7 @@ pub mod tests { let (sender_allocation, _notify) = create_sender_allocation( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), Some(sender_account), ) @@ -1055,7 +1054,7 @@ pub mod tests { let (sender_allocation, _notify) = create_sender_allocation( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), Some(sender_account), ) @@ -1091,15 +1090,15 @@ pub mod tests { // Check that the unaggregated fees are correct. assert_eq!(total_unaggregated_fees.value, 0u128); } - #[sqlx::test(migrations = "../../migrations")] async fn test_receive_new_receipt(pgpool: PgPool) { let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let (mut message_receiver, sender_account) = create_mock_sender_account().await; let (sender_allocation, notify) = create_sender_allocation( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), Some(sender_account), ) @@ -1160,19 +1159,6 @@ pub mod tests { #[sqlx::test(migrations = "../../migrations")] async fn test_trigger_rav_request(pgpool: PgPool) { - // Start a TAP aggregator server. - let (handle, aggregator_endpoint) = run_server( - 0, - SIGNER.0.clone(), - vec![SIGNER.1].into_iter().collect(), - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - 100 * 1024, - 100 * 1024, - 1, - ) - .await - .unwrap(); - // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1206,7 +1192,7 @@ pub mod tests { // Create a sender_allocation. let (sender_allocation, notify) = create_sender_allocation( pgpool.clone(), - "http://".to_owned() + &aggregator_endpoint.to_string(), + get_grpc_url().await, &mock_server.uri(), Some(sender_account), ) @@ -1256,10 +1242,6 @@ pub mod tests { message_receiver.recv().await.unwrap(), SenderAccountMessage::UpdateReceiptFees(_, ReceiptFees::RavRequestResponse(_)) )); - - // Stop the TAP aggregator server. - handle.stop().unwrap(); - handle.stopped().await; } #[sqlx::test(migrations = "../../migrations")] @@ -1270,7 +1252,7 @@ pub mod tests { // create allocation let (sender_allocation, _notify) = create_sender_allocation( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), Some(sender_account), ) @@ -1291,46 +1273,29 @@ pub mod tests { ); } - #[sqlx::test(migrations = "../../migrations")] - async fn test_close_allocation_with_pending_fees(pgpool: PgPool) { - struct Response { - data: Arc, - } - - impl Respond for Response { - fn respond(&self, _request: &wiremock::Request) -> wiremock::ResponseTemplate { - self.data.notify_one(); - - let mock_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 10, 45); - - let json_response = JsonRpcResponse { - data: mock_rav, - warnings: None, - }; + // used for test_close_allocation_with_pending_fees(pgpool: + mod wiremock_gen { + wiremock_grpc::generate!("tap_aggregator.v1.TapAggregator", MockTapAggregator); + } - ResponseTemplate::new(200).set_body_json(json! ( - { - "id": 0, - "jsonrpc": "2.0", - "result": json_response + #[test_log::test(sqlx::test(migrations = "../../migrations"))] + async fn test_close_allocation_with_pending_fees(pgpool: PgPool) { + use wiremock_gen::MockTapAggregator; + let mut mock_aggregator = MockTapAggregator::start_default().await; + + let request1 = mock_aggregator.setup( + MockBuilder::when() + // 👇 RPC prefix + .path("/tap_aggregator.v1.TapAggregator/AggregateReceipts") + .then() + .return_status(Code::Ok) + .return_body(|| { + let mock_rav = create_rav(ALLOCATION_ID_0, SIGNER.0.clone(), 10, 45); + RavResponse { + rav: Some(mock_rav.into()), } - )) - } - } - - let await_trigger = Arc::new(tokio::sync::Notify::new()); - // Start a TAP aggregator server. - let aggregator_server = MockServer::start().await; - - aggregator_server - .register( - Mock::given(method("POST")) - .and(body_string_contains("aggregate_receipts")) - .respond_with(Response { - data: await_trigger.clone(), - }), - ) - .await; + }), + ); // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1360,7 +1325,7 @@ pub mod tests { // create allocation let (sender_allocation, _notify) = create_sender_allocation( pgpool.clone(), - aggregator_server.uri(), + format!("http://[::1]:{}", mock_aggregator.address().port()), &mock_server.uri(), Some(sender_account), ) @@ -1368,11 +1333,9 @@ pub mod tests { sender_allocation.stop_and_wait(None, None).await.unwrap(); - // should trigger rav request - await_trigger.notified().await; - - // check if rav request is made - assert!(aggregator_server.received_requests().await.is_some()); + // check if rav request was made + assert!(mock_aggregator.find_request_count() > 0); + assert!(mock_aggregator.find(&request1).is_some()); // check if the actor is actually stopped assert_eq!(sender_allocation.get_status(), ActorStatus::Stopped); @@ -1381,9 +1344,10 @@ pub mod tests { #[sqlx::test(migrations = "../../migrations")] async fn should_return_unaggregated_fees_without_rav(pgpool: PgPool) { let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let args = create_sender_allocation_args( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), None, ) @@ -1410,7 +1374,7 @@ pub mod tests { let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let args = create_sender_allocation_args( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), None, ) @@ -1443,7 +1407,7 @@ pub mod tests { let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let args = create_sender_allocation_args( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), None, ) @@ -1473,9 +1437,10 @@ pub mod tests { #[sqlx::test(migrations = "../../migrations")] async fn test_store_failed_rav(pgpool: PgPool) { let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let args = create_sender_allocation_args( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), None, ) @@ -1510,7 +1475,7 @@ pub mod tests { let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let args = create_sender_allocation_args( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), None, ) @@ -1552,7 +1517,7 @@ pub mod tests { let args = create_sender_allocation_args( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), None, ) @@ -1584,7 +1549,7 @@ pub mod tests { // Create a sender_allocation. let (sender_allocation, notify) = create_sender_allocation( pgpool.clone(), - DUMMY_URL.to_string(), + get_grpc_url().await, &mock_escrow_subgraph_server.uri(), Some(sender_account), ) @@ -1633,19 +1598,6 @@ pub mod tests { #[sqlx::test(migrations = "../../migrations")] async fn test_rav_request_when_all_receipts_invalid(pgpool: PgPool) { - // Start a TAP aggregator server. - let (_handle, aggregator_endpoint) = run_server( - 0, - SIGNER.0.clone(), - vec![SIGNER.1].into_iter().collect(), - TAP_EIP712_DOMAIN_SEPARATOR.clone(), - 100 * 1024, - 100 * 1024, - 1, - ) - .await - .unwrap(); - // Start a mock graphql server using wiremock let mock_server = MockServer::start().await; @@ -1684,7 +1636,7 @@ pub mod tests { let (sender_allocation, notify) = create_sender_allocation( pgpool.clone(), - "http://".to_owned() + &aggregator_endpoint.to_string(), + get_grpc_url().await, &mock_server.uri(), Some(sender_account), ) diff --git a/crates/tap-agent/src/tap/context/rav.rs b/crates/tap-agent/src/tap/context/rav.rs index 43003759..4ac2cf44 100644 --- a/crates/tap-agent/src/tap/context/rav.rs +++ b/crates/tap-agent/src/tap/context/rav.rs @@ -135,11 +135,11 @@ impl RAVStore for TapAgentContext { mod test { use indexer_monitor::EscrowAccounts; use sqlx::PgPool; - use test_assets::TAP_SENDER as SENDER; + use test_assets::{TAP_SENDER as SENDER, TAP_SIGNER as SIGNER}; use tokio::sync::watch; use super::*; - use crate::test::{create_rav, ALLOCATION_ID_0, SIGNER}; + use crate::test::{create_rav, ALLOCATION_ID_0}; #[derive(Debug)] struct TestableRav(SignedRAV); diff --git a/crates/tap-agent/src/test.rs b/crates/tap-agent/src/test.rs index 8413198d..405ea399 100644 --- a/crates/tap-agent/src/test.rs +++ b/crates/tap-agent/src/test.rs @@ -4,17 +4,21 @@ use bigdecimal::num_bigint::BigInt; use lazy_static::lazy_static; use sqlx::{types::BigDecimal, PgPool}; +use std::net::SocketAddr; +use tap_aggregator::server::run_server; use tap_core::{ rav::{ReceiptAggregateVoucher, SignedRAV}, receipt::{state::Checking, Receipt, ReceiptWithState, SignedReceipt}, signed_message::EIP712SignedMessage, tap_eip712_domain, }; +use test_assets::TAP_SIGNER as SIGNER; use thegraph_core::alloy::{ primitives::{address, hex::ToHexExt, Address}, signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner}, sol_types::Eip712Domain, }; +use tokio::task::JoinHandle; pub const ALLOCATION_ID_0: Address = address!("abababababababababababababababababababab"); pub const ALLOCATION_ID_1: Address = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbc"); @@ -22,7 +26,6 @@ pub const ALLOCATION_ID_1: Address = address!("bcbcbcbcbcbcbcbcbcbcbcbcbcbcbcbcb lazy_static! { // pub static ref SENDER: (PrivateKeySigner, Address) = wallet(0); pub static ref SENDER_2: (PrivateKeySigner, Address) = wallet(1); - pub static ref SIGNER: (PrivateKeySigner, Address) = wallet(2); pub static ref INDEXER: (PrivateKeySigner, Address) = wallet(3); pub static ref TAP_EIP712_DOMAIN_SEPARATOR: Eip712Domain = tap_eip712_domain(1, Address::from([0x11u8; 20]),); @@ -147,6 +150,35 @@ pub async fn store_rav( store_rav_with_options(pgpool, signed_rav, sender, false, false).await } +// TODO use static and check for possible errors with connection refused +pub async fn get_grpc_url() -> String { + let (_, addr) = create_grpc_aggregator().await; + format!("http://{}", addr) +} + +/// Function to start a aggregator server for testing +async fn create_grpc_aggregator() -> (JoinHandle<()>, SocketAddr) { + let wallet = SIGNER.0.clone(); + let accepted_addresses = vec![SIGNER.1].into_iter().collect(); + let domain_separator = TAP_EIP712_DOMAIN_SEPARATOR.clone(); + let max_request_body_size = 1024 * 1024; // 1 MB + let max_response_body_size = 1024 * 1024; // 1 MB + let max_concurrent_connections = 255; + let port = 0; + + run_server( + port, + wallet, + accepted_addresses, + domain_separator, + max_request_body_size, + max_response_body_size, + max_concurrent_connections, + ) + .await + .unwrap() +} + pub async fn store_rav_with_options( pgpool: &PgPool, signed_rav: SignedRAV,