Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: grpc client for tap aggregator #583

Merged
merged 24 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d601187
feat: remove jsonrpc client in favor of grpc client
pedrohba1 Jan 7, 2025
78aa4ba
refactor: adjust endpoint in test
pedrohba1 Jan 16, 2025
20052b9
chore: cargo fmt
pedrohba1 Jan 16, 2025
b6ea2a7
refactor: remove commented code and unnecessary stop of handler in test
pedrohba1 Jan 21, 2025
c86e1e0
test: handle error from failed connection
pedrohba1 Jan 22, 2025
d8154d3
feat: log out endpoint in case of failed connection attempt
pedrohba1 Jan 22, 2025
6e426f6
ci: add TapAggregator server mock to minimize changes to tests
pedrohba1 Jan 22, 2025
910f25f
ci: fix line indent
pedrohba1 Jan 22, 2025
f680606
ci: force ri rerun
pedrohba1 Jan 22, 2025
ea023cf
refactor: cargo fmt
pedrohba1 Jan 22, 2025
8c2bdb5
ci: move mock server start to test-and-coverage
pedrohba1 Jan 22, 2025
34d2947
test: test aggregator server helper function
pedrohba1 Jan 24, 2025
36ecace
test: instantiate server instead of using mock
pedrohba1 Jan 24, 2025
90e2e3f
ci: remove unnecessary step
pedrohba1 Jan 24, 2025
990340a
refactor: correct typo
pedrohba1 Jan 24, 2025
eb13c2b
refactor: correct typo
pedrohba1 Jan 24, 2025
54fcc58
refactor: replace space
pedrohba1 Jan 24, 2025
bcec5b5
refactor: remove unnecessary message
pedrohba1 Jan 24, 2025
d99ca12
test: fix hanging test
gusinacio Jan 24, 2025
30be4a0
refactor: change expect with context
gusinacio Jan 24, 2025
ce3206f
test: use get_grpc_url() to spawn a grpc
gusinacio Jan 24, 2025
26ac63b
test: use server_url directly
gusinacio Jan 24, 2025
87dab30
test: update all tests to use get_grpc_url
gusinacio Jan 24, 2025
5decbd1
chore: add comment about compression
gusinacio Jan 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/tap-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ futures = { version = "0.3.30", default-features = false }
[dev-dependencies]
tempfile = "3.8.0"
wiremock.workspace = true
wiremock-grpc = "0.0.3-alpha3"
test-assets = { path = "../test-assets" }
test-log = { version = "0.2.12", default-features = false }
19 changes: 8 additions & 11 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,7 @@ impl Actor for SenderAccount {
.set(config.trigger_value as f64);

let endpoint = Endpoint::new(sender_aggregator_endpoint.to_string())
.expect("Failed to create an endpoint for the sender aggregator")
.connect_timeout(config.rav_request_timeout);
.context("Failed to create an endpoint for the sender aggregator")?;

let sender_aggregator = TapAggregatorClient::connect(endpoint.clone())
.await
Expand All @@ -616,8 +615,11 @@ impl Actor for SenderAccount {
"Failed to connect to the TapAggregator endpoint '{}'",
endpoint.uri()
)
})?
.send_compressed(tonic::codec::CompressionEncoding::Zstd);
})?;
// wiremock_grpc used for tests doesn't support Zstd compression
#[cfg(not(test))]
let sender_aggregator =
sender_aggregator.send_compressed(tonic::codec::CompressionEncoding::Zstd);
suchapalaver marked this conversation as resolved.
Show resolved Hide resolved
let state = State {
prefix,
sender_fee_tracker: SenderFeeTracker::new(config.rav_request_buffer),
Expand Down Expand Up @@ -1081,8 +1083,7 @@ pub mod tests {
assert_not_triggered, assert_triggered,
test::{
actors::{create_mock_sender_allocation, MockSenderAllocation, TestableActor},
create_rav, start_test_aggregator_server, store_rav_with_options, INDEXER,
TAP_EIP712_DOMAIN_SEPARATOR,
create_rav, get_grpc_url, store_rav_with_options, INDEXER, TAP_EIP712_DOMAIN_SEPARATOR,
},
};

Expand Down Expand Up @@ -1205,10 +1206,6 @@ pub mod tests {
.expect("Failed to update escrow_accounts channel");

// Start a new mock aggregator server for this test
let (_server_handle, server_addr) = start_test_aggregator_server()
.await
.expect("Failed to start mock aggregator server");
let server_url = format!("http://{}", server_addr);

let prefix = format!(
"test-{}",
Expand All @@ -1224,7 +1221,7 @@ pub mod tests {
escrow_subgraph,
network_subgraph,
domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(),
sender_aggregator_endpoint: Url::parse(&server_url).unwrap(),
sender_aggregator_endpoint: Url::parse(&get_grpc_url().await).unwrap(),
allocation_ids: HashSet::new(),
prefix: Some(prefix.clone()),
retry_interval: RETRY_DURATION,
Expand Down
43 changes: 7 additions & 36 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ mod tests {
use reqwest::Url;
use ruint::aliases::U256;
use sqlx::{postgres::PgListener, PgPool};
use test_assets::{flush_messages, TAP_SENDER as SENDER};
use test_assets::{flush_messages, TAP_SENDER as SENDER, TAP_SIGNER as SIGNER};
use thegraph_core::alloy::hex::ToHexExt;
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Notify};

Expand All @@ -624,9 +624,8 @@ mod tests {
},
test::{
actors::{DummyActor, MockSenderAccount, MockSenderAllocation, TestableActor},
create_rav, create_received_receipt, start_test_aggregator_server, store_rav,
store_receipt, ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER_2, SIGNER,
TAP_EIP712_DOMAIN_SEPARATOR,
create_rav, create_received_receipt, get_grpc_url, store_rav, store_receipt,
ALLOCATION_ID_0, ALLOCATION_ID_1, INDEXER, SENDER_2, TAP_EIP712_DOMAIN_SEPARATOR,
},
};

Expand Down Expand Up @@ -672,19 +671,6 @@ mod tests {
let (_, escrow_accounts_rx) = watch::channel(EscrowAccounts::default());

// Start a new mock aggregator server for this test
let (server1_url, server2_url) = {
let (_server1_handle, server1_addr) = start_test_aggregator_server()
.await
.expect("Failed to start the first mock aggregator server");
let server1_url = format!("http://{}", server1_addr);

let (_server2_handle, server2_addr) = start_test_aggregator_server()
.await
.expect("Failed to start the second mock aggregator server");
let server2_url = format!("http://{}", server2_addr);

(server1_url, server2_url)
};
let prefix = format!(
"test-{}",
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
Expand All @@ -698,8 +684,8 @@ mod tests {
escrow_subgraph,
network_subgraph,
sender_aggregator_endpoints: HashMap::from([
(SENDER.1, Url::parse(&server1_url).unwrap()),
(SENDER_2.1, Url::parse(&server2_url).unwrap()),
(SENDER.1, Url::parse(&get_grpc_url().await).unwrap()),
(SENDER_2.1, Url::parse(&get_grpc_url().await).unwrap()),
]),
prefix: Some(prefix.clone()),
};
Expand All @@ -724,21 +710,6 @@ mod tests {
let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect();
let escrow_accounts = EscrowAccounts::new(HashMap::new(), senders_to_signers);

// Start a new mock aggregator server for this test
let (server1_url, server2_url) = {
let (_server1_handle, server1_addr) = start_test_aggregator_server()
.await
.expect("Failed to start the first mock aggregator server");
let server1_url = format!("http://{}", server1_addr);

let (_server2_handle, server2_addr) = start_test_aggregator_server()
.await
.expect("Failed to start the second mock aggregator server");
let server2_url = format!("http://{}", server2_addr);

(server1_url, server2_url)
};

let prefix = format!(
"test-{}",
PREFIX_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
Expand All @@ -757,8 +728,8 @@ mod tests {
escrow_subgraph: get_subgraph_client().await,
network_subgraph: get_subgraph_client().await,
sender_aggregator_endpoints: HashMap::from([
(SENDER.1, Url::parse(&server1_url).unwrap()),
(SENDER_2.1, Url::parse(&server2_url).unwrap()),
(SENDER.1, Url::parse(&get_grpc_url().await).unwrap()),
(SENDER_2.1, Url::parse(&get_grpc_url().await).unwrap()),
]),
prefix: Some(prefix),
},
Expand Down
Loading
Loading