Skip to content

Commit

Permalink
adds stream size metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
austbot committed Jan 27, 2023
1 parent 7146610 commit 8273207
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 7 deletions.
2 changes: 1 addition & 1 deletion helm/ingest/templates/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ metadata:
data:
DATABASE_URL: {{ .Values.ingest.db_url | b64enc | quote }}
INGESTER_DATABASE_CONFIG: {{ ( printf "{listener_channel=\"backfill_item_added\", url=\"%s\"}" .Values.ingest.db_url ) | b64enc | quote }}
INGESTER_MESSENGER_CONFIG: {{ ( printf "{connection_config={batch_size=100,idle_timeout=5000,message_wait_timeout=10,redis_connection_str=%s}, messenger_type=\"Redis\"}" .Values.ingest.redis_url ) | b64enc | quote }}
INGESTER_MESSENGER_CONFIG: {{ ( printf "{connection_config={batch_size=%d,idle_timeout=%d,message_wait_timeout=%d,redis_connection_str=%s}, messenger_type=\"Redis\"}".Values.ingest.batch_size, .Values.ingest.message_wait_timeout, .Values.ingest.idle_timeout,.Values.ingest.redis_url ) | b64enc | quote }}
INGESTER_RPC_CONFIG: {{ ( printf "{url=%s, commitment=\"finalized\"}" .Values.ingest.rpc_url ) | b64enc | quote }}
3 changes: 3 additions & 0 deletions helm/ingest/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ ingest:
db_url:
redis_url:
rpc_url:
message_wait_timeout: 10
batch_size: 100
idle_timeout: 5000

serviceAccount:
# Specifies whether a service account should be created
Expand Down
13 changes: 10 additions & 3 deletions nft_ingester/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nft_ingester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ mpl-token-metadata = { git="https://github.com/metaplex-foundation/metaplex-prog
mpl-candy-machine-core = { git="https://github.com/metaplex-foundation/metaplex-program-library", branch="update-deps"}
mpl-bubblegum = { git="https://github.com/metaplex-foundation/metaplex-program-library", branch="update-deps"}
mpl-candy-guard = { git="https://github.com/metaplex-foundation/mpl-candy-guard", branch="update-deps"}
plerkle_messenger = { git="https://github.com/metaplex-foundation/digital-asset-validator-plugin", branch="speed-exp"}
35 changes: 32 additions & 3 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::sync::Arc;
use sqlx::{self, postgres::PgPoolOptions, Pool, Postgres};
use std::fmt::{Display, Formatter};
use std::net::UdpSocket;
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet, time};

// Types and constants used for Figment configuration items.
pub type DatabaseConfig = figment::value::Dict;
Expand Down Expand Up @@ -156,13 +156,41 @@ async fn main() {

let role = config.role.unwrap_or(IngesterRole::All);

let stream_size_timer = async move {
let mut interval = time::interval(tokio::time::Duration::from_secs(10));
let mut messenger = RedisMessenger::new(config.messenger_config.clone()).await.unwrap();
loop {
interval.tick().await;

let tx_size = messenger.stream_size(TRANSACTION_STREAM).await;
let acc_size = messenger.stream_size(ACCOUNT_STREAM).await;
if tx_size.is_err() {
safe_metric(|| {
statsd_count!("ingester.transaction_stream_size_error", 1);
});
}
if acc_size.is_err() {
safe_metric(|| {
statsd_count!("ingester.account_stream_size_error", 1);
});
}
let tx_size = tx_size.unwrap_or(0);
let acc_size = acc_size.unwrap_or(0);
safe_metric(move || {
statsd_gauge!("ingester.transaction_stream_size", tx_size);
statsd_gauge!("ingester.account_stream_size", acc_size);
})
}
};

match role {
IngesterRole::All => {
tasks.spawn(backfiller.await);
tasks.spawn(txn_stream.await);
tasks.spawn(account_stream.await);
tasks.spawn(background_task_manager_handle);
tasks.spawn(background_task_manager.start_runner());
tasks.spawn(stream_size_timer);
}
IngesterRole::Backfiller => {
tasks.spawn(backfiller.await);
Expand Down Expand Up @@ -309,7 +337,8 @@ async fn handle_account(manager: &Arc<ProgramTransformer>, data: Vec<RecvData>)
safe_metric(|| {
statsd_time!(
"ingester.account_bus_ingest_time",
(seen_at.timestamp_millis() - account_update.seen_at()) as u64
(seen_at.timestamp_millis() - account_update.seen_at()) as u64,
"owner" => &str_program_id
);
});
let begin_processing = Utc::now();
Expand All @@ -322,7 +351,7 @@ async fn handle_account(manager: &Arc<ProgramTransformer>, data: Vec<RecvData>)
let proc_time = (finish_processing.timestamp_millis()
- begin_processing.timestamp_millis())
as u64;
statsd_time!("ingester.account_proc_time", proc_time);
statsd_time!("ingester.account_proc_time", proc_time, "owner" => &str_program_id);
});
safe_metric(|| {
statsd_count!("ingester.account_update_success", 1, "owner" => &str_program_id);
Expand Down

0 comments on commit 8273207

Please sign in to comment.