From 294fa1a0c351e0bed0e826656842857ef68228c5 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Mon, 15 Apr 2024 13:24:27 +0200 Subject: [PATCH] fix: applying account and transction filters to grpc subscription request --- .github/workflows/test.yml | 2 +- grpc-ingest/README.md | 2 +- grpc-ingest/config-grpc2redis.yml | 18 ++++++++----- grpc-ingest/src/config.rs | 4 +-- grpc-ingest/src/grpc.rs | 25 +++++++++++++++--- nft_ingester/src/tasks/mod.rs | 4 +-- .../src/bubblegum/decompress.rs | 26 ------------------- program_transformers/src/lib.rs | 2 +- 8 files changed, 40 insertions(+), 43 deletions(-) delete mode 100644 program_transformers/src/bubblegum/decompress.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4f4b4111d..60f1b69ab 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,7 +38,7 @@ jobs: run: | cargo tree git checkout Cargo.lock - cargo tree --frozen + cargo tree # fmt - name: Check fmt diff --git a/grpc-ingest/README.md b/grpc-ingest/README.md index a2afa2226..d721d863b 100644 --- a/grpc-ingest/README.md +++ b/grpc-ingest/README.md @@ -30,4 +30,4 @@ psql: ``` PGPASSWORD=solana psql -h localhost -U solana -d solana -``` +``` \ No newline at end of file diff --git a/grpc-ingest/config-grpc2redis.yml b/grpc-ingest/config-grpc2redis.yml index 73ed908f4..a7e79d1dc 100644 --- a/grpc-ingest/config-grpc2redis.yml +++ b/grpc-ingest/config-grpc2redis.yml @@ -1,21 +1,25 @@ prometheus: 127.0.0.1:8873 endpoint: http://127.0.0.1:10000 x_token: null -commitment: processed +commitment: finalized accounts: stream: ACCOUNTS stream_maxlen: 100_000_000 stream_data_key: data - filters: - - owner: - - TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA - - metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s + filter: + owner: + - "metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s" + - "TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb" + - "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + - "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL" + - "BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY" + - "CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d" transactions: stream: TRANSACTIONS stream_maxlen: 10_000_000 stream_data_key: data - filters: - - account_include: + filter: + account_include: - BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY redis: url: redis://localhost:6379 diff --git a/grpc-ingest/src/config.rs b/grpc-ingest/src/config.rs index c3c45531e..90b7c005c 100644 --- a/grpc-ingest/src/config.rs +++ b/grpc-ingest/src/config.rs @@ -61,7 +61,7 @@ pub struct ConfigGrpcAccounts { #[serde(default = "ConfigGrpcAccounts::default_stream_data_key")] pub stream_data_key: String, - pub filters: Vec, + pub filter: ConfigGrpcRequestAccounts, } impl ConfigGrpcAccounts { @@ -90,7 +90,7 @@ pub struct ConfigGrpcTransactions { #[serde(default = "ConfigGrpcTransactions::default_stream_data_key")] pub stream_data_key: String, - pub filters: Vec, + pub filter: ConfigGrpcRequestTransactions, } impl ConfigGrpcTransactions { diff --git a/grpc-ingest/src/grpc.rs b/grpc-ingest/src/grpc.rs index 01387c025..47cf9c718 100644 --- a/grpc-ingest/src/grpc.rs +++ b/grpc-ingest/src/grpc.rs @@ -5,6 +5,7 @@ use { anyhow::Context, futures::stream::StreamExt, redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue}, + std::collections::HashMap, std::{sync::Arc, time::Duration}, tokio::{ task::JoinSet, @@ -12,7 +13,10 @@ use { }, tracing::warn, yellowstone_grpc_client::GeyserGrpcClient, - yellowstone_grpc_proto::{prelude::subscribe_update::UpdateOneof, prost::Message}, + yellowstone_grpc_proto::{ + geyser::SubscribeRequest, prelude::subscribe_update::UpdateOneof, prost::Message, + }, + yellowstone_grpc_tools::config::GrpcRequestToProto, }; pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { @@ -41,7 +45,22 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { .connect() .await .context("failed to connect go gRPC")?; - let (mut _subscribe_tx, mut stream) = client.subscribe().await?; + + let mut accounts = HashMap::with_capacity(1); + let mut transactions = HashMap::with_capacity(1); + + accounts.insert("das".to_string(), config.accounts.filter.clone().to_proto()); + transactions.insert( + "das".to_string(), + config.transactions.filter.clone().to_proto(), + ); + + let request = SubscribeRequest { + accounts, + transactions, + ..Default::default() + }; + let (mut _subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; // recv-send loop let mut shutdown = create_shutdown()?; @@ -117,7 +136,7 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { let result: RedisResult = pipe.atomic().query_async(&mut connection).await; - let status = if result.is_ok() { Ok(()) } else { Err(()) }; + let status = result.map(|_| ()).map_err(|_| ()); redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts); redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions); diff --git a/nft_ingester/src/tasks/mod.rs b/nft_ingester/src/tasks/mod.rs index ec7a813fe..6c6f854b6 100644 --- a/nft_ingester/src/tasks/mod.rs +++ b/nft_ingester/src/tasks/mod.rs @@ -324,8 +324,8 @@ impl TaskManager { tokio::task::spawn(async move { while let Some(task) = receiver.recv().await { if let Some(task_created_time) = task.created_at { - let bus_time = - Utc::now().timestamp_millis() - task_created_time.timestamp_millis(); + let bus_time = Utc::now().timestamp_millis() + - task_created_time.and_utc().timestamp_millis(); metric! { statsd_histogram!("ingester.bgtask.bus_time", bus_time as u64, "type" => task.name); } diff --git a/program_transformers/src/bubblegum/decompress.rs b/program_transformers/src/bubblegum/decompress.rs deleted file mode 100644 index 208e9aabc..000000000 --- a/program_transformers/src/bubblegum/decompress.rs +++ /dev/null @@ -1,26 +0,0 @@ -use { - crate::{ - bubblegum::db::{ - upsert_asset_with_compression_info, upsert_asset_with_leaf_info_for_decompression, - }, - error::ProgramTransformerResult, - }, - blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}, - sea_orm::{ConnectionTrait, TransactionTrait}, -}; - -pub async fn decompress<'c, T>( - _parsing_result: &BubblegumInstruction, - bundle: &InstructionBundle<'c>, - txn: &'c T, -) -> ProgramTransformerResult<()> -where - T: ConnectionTrait + TransactionTrait, -{ - let id_bytes = bundle.keys.get(3).unwrap().to_bytes().to_vec(); - - // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.clone()).await?; - upsert_asset_with_compression_info(txn, id_bytes.clone(), false, false, 1, Some(id_bytes), true) - .await -} diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index 6b6259d01..79c407bd4 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -53,7 +53,7 @@ pub struct TransactionInfo { pub meta_inner_instructions: Vec, } -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DownloadMetadataInfo { asset_data_id: Vec, uri: String,