Skip to content

Commit

Permalink
refactor: add horizon support
Browse files Browse the repository at this point in the history
WIP: needs receipt parsing
Signed-off-by: Gustavo Inacio <[email protected]>
  • Loading branch information
gusinacio committed Jan 30, 2025
1 parent 496968b commit 8969bca
Show file tree
Hide file tree
Showing 19 changed files with 213 additions and 106 deletions.
21 changes: 8 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ bigdecimal = "0.4.3"
build-info = "0.0.39"
tap_core = { version = "3.0.0", default-features = false }
tap_aggregator = { version = "0.4.0", default-features = false }
tap_graph = { version = "0.1.0", default-features = false }
tap_graph = { version = "0.2.0", features = ["v2"] }
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
Expand All @@ -80,3 +80,17 @@ tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
tonic-build = { version = "0.12.3", features = ["prost"] }
prost = "0.13.3"
prost-types = "0.13.3"


[patch.crates-io.tap_core]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
rev = "dbae001"

[patch.crates-io.tap_aggregator]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
rev = "dbae001"


[patch.crates-io.tap_graph]
git = "https://github.com/semiotic-ai/timeline-aggregation-protocol"
rev = "dbae001"
4 changes: 2 additions & 2 deletions crates/service/src/middleware/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod tests {

use crate::{
middleware::auth::{self, Bearer, OrExt},
tap::IndexerTapContext,
tap::{IndexerTapContext, TapReceipt},
};

const BEARER_TOKEN: &str = "test";
Expand Down Expand Up @@ -104,7 +104,7 @@ mod tests {

// check with receipt
let mut req = Request::new(Default::default());
req.extensions_mut().insert(receipt);
req.extensions_mut().insert(TapReceipt::V1(receipt));
let res = service.call(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);

Expand Down
27 changes: 14 additions & 13 deletions crates/service/src/middleware/auth/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ use tap_core::{
manager::{adapters::ReceiptStore, Manager},
receipt::Context,
};
use tap_graph::{ReceiptAggregateVoucher, SignedReceipt};
use tap_graph::ReceiptAggregateVoucher;
use tower_http::auth::AsyncAuthorizeRequest;

use crate::{error::IndexerServiceError, middleware::prometheus_metrics::MetricLabels};
use crate::{
error::IndexerServiceError, middleware::prometheus_metrics::MetricLabels, tap::TapReceipt,
};

/// Middleware to verify and store TAP receipts
///
/// It also optionally updates a failed receipt metric if Labels are provided
///
/// Requires SignedReceipt, MetricLabels and Arc<Context> extensions
/// Requires TapReceipt, MetricLabels and Arc<Context> extensions
pub fn tap_receipt_authorize<T, B>(
tap_manager: Arc<Manager<T, SignedReceipt, ReceiptAggregateVoucher>>,
tap_manager: Arc<Manager<T, TapReceipt, ReceiptAggregateVoucher>>,
failed_receipt_metric: &'static prometheus::CounterVec,
) -> impl AsyncAuthorizeRequest<
B,
Expand All @@ -41,11 +43,11 @@ pub fn tap_receipt_authorize<T, B>(
> + Clone
+ Send
where
T: ReceiptStore<SignedReceipt> + Sync + Send + 'static,
T: ReceiptStore<TapReceipt> + Sync + Send + 'static,
B: Send,
{
move |request: Request<B>| {
let receipt = request.extensions().get::<SignedReceipt>().cloned();
move |mut request: Request<B>| {
let receipt = request.extensions_mut().remove::<TapReceipt>();
// load labels from previous middlewares
let labels = request.extensions().get::<MetricLabels>().cloned();
// load context from previous middlewares
Expand Down Expand Up @@ -91,7 +93,6 @@ mod tests {
manager::Manager,
receipt::checks::{Check, CheckError, CheckList, CheckResult},
};
use tap_graph::SignedReceipt;
use test_assets::{
assert_while_retry, create_signed_receipt, SignedReceiptRequest, TAP_EIP712_DOMAIN,
};
Expand All @@ -103,7 +104,7 @@ mod tests {
auth::tap_receipt_authorize,
prometheus_metrics::{MetricLabelProvider, MetricLabels},
},
tap::{CheckingReceipt, IndexerTapContext},
tap::{CheckingReceipt, IndexerTapContext, TapReceipt},
};

#[fixture]
Expand Down Expand Up @@ -131,13 +132,13 @@ mod tests {

struct MyCheck;
#[async_trait::async_trait]
impl Check<SignedReceipt> for MyCheck {
impl Check<TapReceipt> for MyCheck {
async fn check(
&self,
_: &tap_core::receipt::Context,
receipt: &CheckingReceipt,
) -> CheckResult {
if receipt.signed_receipt().message.nonce == FAILED_NONCE {
if receipt.signed_receipt().nonce() == FAILED_NONCE {
Err(CheckError::Failed(anyhow::anyhow!("Failed")))
} else {
Ok(())
Expand Down Expand Up @@ -175,7 +176,7 @@ mod tests {

// check with receipt
let mut req = Request::new(Body::default());
req.extensions_mut().insert(receipt);
req.extensions_mut().insert(TapReceipt::V1(receipt));
let res = service.call(req).await.unwrap();
assert_eq!(res.status(), StatusCode::OK);

Expand Down Expand Up @@ -214,7 +215,7 @@ mod tests {
// change the nonce to make the receipt invalid
receipt.message.nonce = FAILED_NONCE;
let mut req = Request::new(Body::default());
req.extensions_mut().insert(receipt);
req.extensions_mut().insert(TapReceipt::V1(receipt));
req.extensions_mut().insert(labels);
let response = service.call(req);

Expand Down
9 changes: 4 additions & 5 deletions crates/service/src/middleware/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use axum::{
response::Response,
};
use indexer_monitor::EscrowAccounts;
use tap_graph::SignedReceipt;
use thegraph_core::alloy::{primitives::Address, sol_types::Eip712Domain};
use tokio::sync::watch;

use crate::error::IndexerServiceError;
use crate::{error::IndexerServiceError, tap::TapReceipt};

/// Stated used by sender middleware
#[derive(Clone)]
Expand Down Expand Up @@ -44,7 +43,7 @@ pub async fn sender_middleware(
mut request: Request,
next: Next,
) -> Result<Response, IndexerServiceError> {
if let Some(receipt) = request.extensions().get::<SignedReceipt>() {
if let Some(receipt) = request.extensions().get::<TapReceipt>() {
let signer = receipt.recover_signer(&state.domain_separator)?;
let sender = state
.escrow_accounts
Expand Down Expand Up @@ -75,7 +74,7 @@ mod tests {
use tower::ServiceExt;

use super::{sender_middleware, Sender};
use crate::middleware::sender::SenderState;
use crate::{middleware::sender::SenderState, tap::TapReceipt};

#[tokio::test]
async fn test_sender_middleware() {
Expand Down Expand Up @@ -105,7 +104,7 @@ mod tests {
.oneshot(
Request::builder()
.uri("/")
.extension(receipt)
.extension(TapReceipt::V1(receipt))
.body(Body::empty())
.unwrap(),
)
Expand Down
19 changes: 12 additions & 7 deletions crates/service/src/middleware/tap_receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use axum::{extract::Request, middleware::Next, response::Response, RequestExt};
use axum_extra::TypedHeader;

use crate::service::TapReceipt;
use crate::service::TapHeader;

/// Injects tap receipts in the extensions
///
Expand All @@ -14,8 +14,8 @@ use crate::service::TapReceipt;
///
/// This is useful to not deserialize multiple times the same receipt
pub async fn receipt_middleware(mut request: Request, next: Next) -> Response {
if let Ok(TypedHeader(TapReceipt(receipt))) =
request.extract_parts::<TypedHeader<TapReceipt>>().await
if let Ok(TypedHeader(TapHeader(receipt))) =
request.extract_parts::<TypedHeader<TapHeader>>().await
{
request.extensions_mut().insert(receipt);
}
Expand All @@ -24,6 +24,8 @@ pub async fn receipt_middleware(mut request: Request, next: Next) -> Response {

#[cfg(test)]
mod tests {
use core::panic;

use axum::{
body::Body,
http::{Extensions, Request},
Expand All @@ -33,11 +35,10 @@ mod tests {
};
use axum_extra::headers::Header;
use reqwest::StatusCode;
use tap_graph::SignedReceipt;
use test_assets::{create_signed_receipt, SignedReceiptRequest};
use tower::ServiceExt;

use crate::{middleware::tap_receipt::receipt_middleware, service::TapReceipt};
use crate::{middleware::tap_receipt::receipt_middleware, service::TapHeader, tap::TapReceipt};

#[tokio::test]
async fn test_receipt_middleware() {
Expand All @@ -48,8 +49,12 @@ mod tests {

let handle = move |extensions: Extensions| async move {
let received_receipt = extensions
.get::<SignedReceipt>()
.get::<TapReceipt>()
.expect("Should decode tap receipt");
let received_receipt = match received_receipt {
TapReceipt::V1(receipt) => receipt,
_ => panic!("Not v1"),
};
assert_eq!(received_receipt.message, receipt.message);
assert_eq!(received_receipt.signature, receipt.signature);
Body::empty()
Expand All @@ -61,7 +66,7 @@ mod tests {
.oneshot(
Request::builder()
.uri("/")
.header(TapReceipt::name(), receipt_json)
.header(TapHeader::name(), receipt_json)
.body(Body::empty())
.unwrap(),
)
Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod router;
mod tap_receipt_header;

pub use router::ServiceRouter;
pub use tap_receipt_header::TapReceipt;
pub use tap_receipt_header::TapHeader;

#[derive(Clone)]
pub struct GraphNodeState {
Expand Down
21 changes: 12 additions & 9 deletions crates/service/src/service/tap_receipt_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ use lazy_static::lazy_static;
use prometheus::{register_counter, Counter};
use tap_graph::SignedReceipt;

use crate::tap::TapReceipt;

#[derive(Debug, PartialEq)]
pub struct TapReceipt(pub SignedReceipt);
pub struct TapHeader(pub TapReceipt);

lazy_static! {
static ref TAP_RECEIPT: HeaderName = HeaderName::from_static("tap-receipt");
pub static ref TAP_RECEIPT_INVALID: Counter =
register_counter!("indexer_tap_invalid_total", "Invalid tap receipt decode",).unwrap();
}

impl Header for TapReceipt {
impl Header for TapHeader {
fn name() -> &'static HeaderName {
&TAP_RECEIPT
}
Expand All @@ -30,9 +32,9 @@ impl Header for TapReceipt {
let raw_receipt = raw_receipt
.to_str()
.map_err(|_| headers::Error::invalid())?;
let parsed_receipt =
let parsed_receipt: SignedReceipt =
serde_json::from_str(raw_receipt).map_err(|_| headers::Error::invalid())?;
Ok(TapReceipt(parsed_receipt))
Ok(TapHeader(crate::tap::TapReceipt::V1(parsed_receipt)))
};
execute().inspect_err(|_| TAP_RECEIPT_INVALID.inc())
}
Expand All @@ -51,25 +53,26 @@ mod test {
use axum_extra::headers::Header;
use test_assets::{create_signed_receipt, SignedReceiptRequest};

use super::TapReceipt;
use super::TapHeader;
use crate::tap::TapReceipt;

#[tokio::test]
async fn test_decode_valid_tap_receipt_header() {
let original_receipt = create_signed_receipt(SignedReceiptRequest::builder().build()).await;
let serialized_receipt = serde_json::to_string(&original_receipt).unwrap();
let header_value = HeaderValue::from_str(&serialized_receipt).unwrap();
let header_values = vec![&header_value];
let decoded_receipt = TapReceipt::decode(&mut header_values.into_iter())
let decoded_receipt = TapHeader::decode(&mut header_values.into_iter())
.expect("tap receipt header value should be valid");

assert_eq!(decoded_receipt, TapReceipt(original_receipt));
assert_eq!(decoded_receipt, TapHeader(TapReceipt::V1(original_receipt)));
}

#[test]
fn test_decode_non_string_tap_receipt_header() {
let header_value = HeaderValue::from_static("123");
let header_values = vec![&header_value];
let result = TapReceipt::decode(&mut header_values.into_iter());
let result = TapHeader::decode(&mut header_values.into_iter());

assert!(result.is_err());
}
Expand All @@ -78,7 +81,7 @@ mod test {
fn test_decode_invalid_tap_receipt_header() {
let header_value = HeaderValue::from_bytes(b"invalid").unwrap();
let header_values = vec![&header_value];
let result = TapReceipt::decode(&mut header_values.into_iter());
let result = TapHeader::decode(&mut header_values.into_iter());

assert!(result.is_err());
}
Expand Down
Loading

0 comments on commit 8969bca

Please sign in to comment.