diff --git a/helm/ingest/templates/secret.yaml b/helm/ingest/templates/secret.yaml index f6714a6d2..981826669 100644 --- a/helm/ingest/templates/secret.yaml +++ b/helm/ingest/templates/secret.yaml @@ -5,5 +5,5 @@ metadata: data: DATABASE_URL: {{ .Values.ingest.db_url | b64enc | quote }} INGESTER_DATABASE_CONFIG: {{ ( printf "{listener_channel=\"backfill_item_added\", url=\"%s\"}" .Values.ingest.db_url ) | b64enc | quote }} - INGESTER_MESSENGER_CONFIG: {{ ( printf "{connection_config={batch_size=100,idle_timeout=5000,message_wait_timeout=10,redis_connection_str=%s}, messenger_type=\"Redis\"}" .Values.ingest.redis_url ) | b64enc | quote }} + INGESTER_MESSENGER_CONFIG: {{ ( printf "{connection_config={batch_size=%d,idle_timeout=%d,message_wait_timeout=%d,redis_connection_str=%s}, messenger_type=\"Redis\"}".Values.ingest.batch_size, .Values.ingest.message_wait_timeout, .Values.ingest.idle_timeout,.Values.ingest.redis_url ) | b64enc | quote }} INGESTER_RPC_CONFIG: {{ ( printf "{url=%s, commitment=\"finalized\"}" .Values.ingest.rpc_url ) | b64enc | quote }} diff --git a/helm/ingest/values.yaml b/helm/ingest/values.yaml index 72f1e6940..62804d85e 100644 --- a/helm/ingest/values.yaml +++ b/helm/ingest/values.yaml @@ -17,6 +17,9 @@ ingest: db_url: redis_url: rpc_url: + message_wait_timeout: 10 + batch_size: 100 + idle_timeout: 5000 serviceAccount: # Specifies whether a service account should be created diff --git a/nft_ingester/Cargo.lock b/nft_ingester/Cargo.lock index d615e5e47..cb2ea8625 100644 --- a/nft_ingester/Cargo.lock +++ b/nft_ingester/Cargo.lock @@ -269,6 +269,12 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "arrayref" version = "0.3.6" @@ -3010,9 +3016,8 @@ checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" [[package]] name = "plerkle_messenger" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdc9d196c6f10b9a4123de62dffd2082554a8253d0bafdabdd0b429bef94aef1" +version = "1.2.0" +source = "git+https://github.com/metaplex-foundation/digital-asset-validator-plugin?branch=speed-exp#24e9ec10a6232d806890e5144c7a5fe6fad33eb1" dependencies = [ "async-mutex", "async-trait", @@ -3440,9 +3445,11 @@ version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aa8455fa3621f6b41c514946de66ea0531f57ca017b2e6c7cc368035ea5b46df" dependencies = [ + "arc-swap", "async-trait", "bytes", "combine", + "futures", "futures-util", "itoa", "native-tls", diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 65a9c7528..74b2ebec3 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -64,3 +64,4 @@ mpl-token-metadata = { git="https://github.com/metaplex-foundation/metaplex-prog mpl-candy-machine-core = { git="https://github.com/metaplex-foundation/metaplex-program-library", branch="update-deps"} mpl-bubblegum = { git="https://github.com/metaplex-foundation/metaplex-program-library", branch="update-deps"} mpl-candy-guard = { git="https://github.com/metaplex-foundation/mpl-candy-guard", branch="update-deps"} +plerkle_messenger = { git="https://github.com/metaplex-foundation/digital-asset-validator-plugin", branch="speed-exp"} \ No newline at end of file diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 3c8a502ef..fecf8b6a2 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -29,7 +29,7 @@ use std::sync::Arc; use sqlx::{self, postgres::PgPoolOptions, Pool, Postgres}; use std::fmt::{Display, Formatter}; use std::net::UdpSocket; -use tokio::{sync::mpsc::UnboundedSender, task::JoinSet}; +use tokio::{sync::mpsc::UnboundedSender, task::JoinSet, time}; // Types and constants used for Figment configuration items. pub type DatabaseConfig = figment::value::Dict; @@ -156,6 +156,33 @@ async fn main() { let role = config.role.unwrap_or(IngesterRole::All); + let stream_size_timer = async move { + let mut interval = time::interval(tokio::time::Duration::from_secs(10)); + let mut messenger = RedisMessenger::new(config.messenger_config.clone()).await.unwrap(); + loop { + interval.tick().await; + + let tx_size = messenger.stream_size(TRANSACTION_STREAM).await; + let acc_size = messenger.stream_size(ACCOUNT_STREAM).await; + if tx_size.is_err() { + safe_metric(|| { + statsd_count!("ingester.transaction_stream_size_error", 1); + }); + } + if acc_size.is_err() { + safe_metric(|| { + statsd_count!("ingester.account_stream_size_error", 1); + }); + } + let tx_size = tx_size.unwrap_or(0); + let acc_size = acc_size.unwrap_or(0); + safe_metric(move || { + statsd_gauge!("ingester.transaction_stream_size", tx_size); + statsd_gauge!("ingester.account_stream_size", acc_size); + }) + } + }; + match role { IngesterRole::All => { tasks.spawn(backfiller.await); @@ -163,6 +190,7 @@ async fn main() { tasks.spawn(account_stream.await); tasks.spawn(background_task_manager_handle); tasks.spawn(background_task_manager.start_runner()); + tasks.spawn(stream_size_timer); } IngesterRole::Backfiller => { tasks.spawn(backfiller.await); @@ -309,7 +337,8 @@ async fn handle_account(manager: &Arc, data: Vec) safe_metric(|| { statsd_time!( "ingester.account_bus_ingest_time", - (seen_at.timestamp_millis() - account_update.seen_at()) as u64 + (seen_at.timestamp_millis() - account_update.seen_at()) as u64, + "owner" => &str_program_id ); }); let begin_processing = Utc::now(); @@ -322,7 +351,7 @@ async fn handle_account(manager: &Arc, data: Vec) let proc_time = (finish_processing.timestamp_millis() - begin_processing.timestamp_millis()) as u64; - statsd_time!("ingester.account_proc_time", proc_time); + statsd_time!("ingester.account_proc_time", proc_time, "owner" => &str_program_id); }); safe_metric(|| { statsd_count!("ingester.account_update_success", 1, "owner" => &str_program_id);