Skip to content

Commit

Permalink
feat: grpc client for tap aggregator (#583)
Browse files Browse the repository at this point in the history
* feat: remove jsonrpc client in favor of grpc client

Co-authored-by: Gustavo Inacio <[email protected]>
Signed-off-by: pedro bufulin <[email protected]>
Signed-off-by: Gustavo Inacio <[email protected]>

* refactor: adjust endpoint in test

* chore: cargo fmt

* refactor: remove commented code and unnecessary stop of handler in test

* test: handle error from failed connection

* feat: log out endpoint in case of failed connection attempt

* ci: add TapAggregator server mock to minimize changes to tests

* ci: fix line indent

* ci: force ri rerun

* refactor: cargo fmt

* ci: move mock server start to test-and-coverage

* test: test aggregator server helper function

* test: instantiate server instead of using mock

* ci: remove unnecessary step

* refactor: correct typo

* refactor: correct typo

* refactor: replace space

* refactor: remove unnecessary message

* test: fix hanging test

Signed-off-by: Gustavo Inacio <[email protected]>

* refactor: change expect with context

Signed-off-by: Gustavo Inacio <[email protected]>

* test: use get_grpc_url() to spawn a grpc

Signed-off-by: Gustavo Inacio <[email protected]>

* test: use server_url directly

Signed-off-by: Gustavo Inacio <[email protected]>

* test: update all tests to use get_grpc_url

Signed-off-by: Gustavo Inacio <[email protected]>

* chore: add comment about compression

Signed-off-by: Gustavo Inacio <[email protected]>

---------

Signed-off-by: pedro bufulin <[email protected]>
Signed-off-by: Gustavo Inacio <[email protected]>
Co-authored-by: pedro bufulin <[email protected]>
  • Loading branch information
gusinacio and pedrohba1 authored Jan 24, 2025
1 parent 9f99732 commit c3ede8f
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 207 deletions.
150 changes: 86 additions & 64 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ uuid = { version = "1.11.0", features = ["v7"] }
tracing = { version = "0.1.40", default-features = false }
bigdecimal = "0.4.3"
build-info = "0.0.39"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "1c6e29f", default-features = false }
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false }
tap_aggregator = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "6af1add", default-features = false }
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
"ansi",
], default-features = false }
thegraph-core = { version = "0.9.6", features = [
thegraph-core = { version = "0.10.0", features = [
"attestation",
"alloy-eip712",
"alloy-sol-types",
Expand Down
2 changes: 2 additions & 0 deletions crates/tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ lazy_static.workspace = true
thegraph-core.workspace = true
clap.workspace = true
tracing-subscriber.workspace = true
tonic.workspace = true
bigdecimal = { workspace = true, features = ["serde"] }
graphql_client.workspace = true

Expand All @@ -48,5 +49,6 @@ futures = { version = "0.3.30", default-features = false }
[dev-dependencies]
tempfile = "3.8.0"
wiremock.workspace = true
wiremock-grpc = "0.0.3-alpha3"
test-assets = { path = "../test-assets" }
test-log = { version = "0.2.12", default-features = false }
29 changes: 22 additions & 7 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::Duration,
};

use anyhow::Context;
use bigdecimal::{num_bigint::ToBigInt, ToPrimitive};
use futures::{stream, StreamExt};
use indexer_monitor::{EscrowAccounts, SubgraphClient};
Expand All @@ -15,19 +16,20 @@ use indexer_query::{
unfinalized_transactions, UnfinalizedTransactions,
};
use indexer_watcher::watch_pipe;
use jsonrpsee::http_client::HttpClientBuilder;
use lazy_static::lazy_static;
use prometheus::{register_gauge_vec, register_int_gauge_vec, GaugeVec, IntGaugeVec};
use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent};
use reqwest::Url;
use sqlx::PgPool;
use tap_aggregator::grpc::tap_aggregator_client::TapAggregatorClient;
use tap_core::rav::SignedRAV;
use thegraph_core::alloy::{
hex::ToHexExt,
primitives::{Address, U256},
sol_types::Eip712Domain,
};
use tokio::{sync::watch::Receiver, task::JoinHandle};
use tonic::transport::{Channel, Endpoint};
use tracing::Level;

use super::sender_allocation::{
Expand Down Expand Up @@ -171,7 +173,7 @@ pub struct State {

domain_separator: Eip712Domain,
pgpool: PgPool,
sender_aggregator: jsonrpsee::http_client::HttpClient,
sender_aggregator: TapAggregatorClient<Channel>,

// Backoff info
backoff_info: BackoffInfo,
Expand Down Expand Up @@ -603,10 +605,21 @@ impl Actor for SenderAccount {
.with_label_values(&[&sender_id.to_string()])
.set(config.trigger_value as f64);

let sender_aggregator = HttpClientBuilder::default()
.request_timeout(config.rav_request_timeout)
.build(&sender_aggregator_endpoint)?;
let endpoint = Endpoint::new(sender_aggregator_endpoint.to_string())
.context("Failed to create an endpoint for the sender aggregator")?;

let sender_aggregator = TapAggregatorClient::connect(endpoint.clone())
.await
.with_context(|| {
format!(
"Failed to connect to the TapAggregator endpoint '{}'",
endpoint.uri()
)
})?;
// wiremock_grpc used for tests doesn't support Zstd compression
#[cfg(not(test))]
let sender_aggregator =
sender_aggregator.send_compressed(tonic::codec::CompressionEncoding::Zstd);
let state = State {
prefix,
sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer),
Expand Down Expand Up @@ -1070,7 +1083,7 @@ pub mod tests {
assert_not_triggered, assert_triggered,
test::{
actors::{create_mock_sender_allocation, MockSenderAllocation, TestableActor},
create_rav, store_rav_with_options, INDEXER, TAP_EIP712_DOMAIN_SEPARATOR,
create_rav, get_grpc_url, store_rav_with_options, INDEXER, TAP_EIP712_DOMAIN_SEPARATOR,
},
};

Expand Down Expand Up @@ -1192,6 +1205,8 @@ pub mod tests {
))
.expect("Failed to update escrow_accounts channel");

// Start a new mock aggregator server for this test

let prefix = format!(
"test-{}",
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
Expand All @@ -1206,7 +1221,7 @@ pub mod tests {
escrow_subgraph,
network_subgraph,
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
sender_aggregator_endpoint: Url::parse(DUMMY_URL).unwrap(),
sender_aggregator_endpoint: Url::parse(&get_grpc_url().await).unwrap(),
allocation_ids: HashSet::new(),
prefix: Some(prefix.clone()),
retry_interval: RETRY_DURATION,
Expand Down
15 changes: 8 additions & 7 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ mod tests {
use reqwest::Url;
use ruint::aliases::U256;
use sqlx::{postgres::PgListener, PgPool};
use test_assets::{flush_messages, TAP_SENDER as SENDER};
use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER};
use thegraph_core::alloy::hex::ToHexExt;
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Notify};

Expand All @@ -624,8 +624,8 @@ mod tests {
},
test::{
actors::{DummyActor, MockSenderAccount, MockSenderAllocation, TestableActor},
create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID_0,
ALLOCATION_ID_1, INDEXER, SENDER_2, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR,
create_rav, create_received_receipt, get_grpc_url, store_rav, store_receipt,
ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER_2, TAP_EIP712_DOMAIN_SEPARATOR,
},
};

Expand Down Expand Up @@ -670,6 +670,7 @@ mod tests {

let (_, escrow_accounts_rx) = watch::channel(EscrowAccounts::default());

// Start a new mock aggregator server for this test
let prefix = format!(
"test-{}",
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
Expand All @@ -683,8 +684,8 @@ mod tests {
escrow_subgraph,
network_subgraph,
sender_aggregator_endpoints: HashMap::from([
(SENDER.1, Url::parse("http://localhost:8000").unwrap()),
(SENDER_2.1, Url::parse("http://localhost:8000").unwrap()),
(SENDER.1, Url::parse(&get_grpc_url().await).unwrap()),
(SENDER_2.1, Url::parse(&get_grpc_url().await).unwrap()),
]),
prefix: Some(prefix.clone()),
};
Expand Down Expand Up @@ -727,8 +728,8 @@ mod tests {
escrow_subgraph: get_subgraph_client().await,
network_subgraph: get_subgraph_client().await,
sender_aggregator_endpoints: HashMap::from([
(SENDER.1, Url::parse("http://localhost:8000").unwrap()),
(SENDER_2.1, Url::parse("http://localhost:8000").unwrap()),
(SENDER.1, Url::parse(&get_grpc_url().await).unwrap()),
(SENDER_2.1, Url::parse(&get_grpc_url().await).unwrap()),
]),
prefix: Some(prefix),
},
Expand Down
Loading

0 comments on commit c3ede8f

Please sign in to comment.