From 1343e33d4b297b0816cf06de0d8f686cc7956532 Mon Sep 17 00:00:00 2001 From: Nagaprasad V R <81755170+Nagaprasadvr@users.noreply.github.com> Date: Fri, 24 Jan 2025 16:13:14 +0530 Subject: [PATCH 1/2] Das 106 add metadata json backfiller to das (#210) * Update to rust 1.79.0 * feat : add metadata_json backfiller * cleanup * revert metaplex/plerkle-validator image to from ..79 to ..75 --------- Co-authored-by: Kyle Espinola --- Cargo.lock | 1 + core/src/metadata_json.rs | 4 +- grpc-ingest/config-ingester.example.yml | 2 + ops/Cargo.toml | 1 + ops/src/main.rs | 4 + ops/src/metadata/backfiller.rs | 160 ++++++++++++++++++++++++ ops/src/metadata/cmd.rs | 27 ++++ ops/src/metadata/mod.rs | 4 + 8 files changed, 201 insertions(+), 2 deletions(-) create mode 100644 ops/src/metadata/backfiller.rs create mode 100644 ops/src/metadata/cmd.rs create mode 100644 ops/src/metadata/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c1213881a..5b7085c0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1934,6 +1934,7 @@ dependencies = [ "mpl-bubblegum", "mpl-token-metadata", "program_transformers", + "reqwest", "sea-orm", "serde_json", "solana-account-decoder", diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index a32eccc4e..731d356b5 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -87,10 +87,10 @@ pub async fn create_download_metadata_notifier( pub struct MetadataJsonDownloadWorkerArgs { /// The number of worker threads #[arg(long, env, default_value = "25")] - metadata_json_download_worker_count: usize, + pub metadata_json_download_worker_count: usize, /// The request timeout in milliseconds #[arg(long, env, default_value = "1000")] - metadata_json_download_worker_request_timeout: u64, + pub metadata_json_download_worker_request_timeout: u64, } impl MetadataJsonDownloadWorkerArgs { diff --git a/grpc-ingest/config-ingester.example.yml b/grpc-ingest/config-ingester.example.yml index e9dbda4eb..c72bff163 100644 --- a/grpc-ingest/config-ingester.example.yml +++ b/grpc-ingest/config-ingester.example.yml @@ -55,5 +55,7 @@ download_metadata: retry_max_delay: 10 # retry min delay in milliseconds retry_min_delay: 1 + # request timeout in milliseconds + request_timeout: 5_000 stream: name: METADATA_JSON diff --git a/ops/Cargo.toml b/ops/Cargo.toml index 14f741870..6a0964ae9 100644 --- a/ops/Cargo.toml +++ b/ops/Cargo.toml @@ -39,3 +39,4 @@ tokio = { workspace = true } tracing = { workspace = true } mpl-token-metadata = { workspace = true } serde_json = { workspace = true } +reqwest = { workspace = true } \ No newline at end of file diff --git a/ops/src/main.rs b/ops/src/main.rs index 09e73be12..0b80c5d1b 100644 --- a/ops/src/main.rs +++ b/ops/src/main.rs @@ -1,5 +1,6 @@ mod account; mod bubblegum; +mod metadata; use account::{subcommand as account_subcommand, AccountCommand}; use anyhow::Result; @@ -19,6 +20,8 @@ enum Command { Bubblegum(BubblegumCommand), #[clap(name = "account")] Account(AccountCommand), + #[clap(name = "metadata_json")] + MetadataJson(metadata::MetadataJsonCommand), } #[tokio::main] @@ -30,6 +33,7 @@ async fn main() -> Result<()> { match args.command { Command::Bubblegum(subcommand) => bubblegum_subcommand(subcommand).await?, Command::Account(subcommand) => account_subcommand(subcommand).await?, + Command::MetadataJson(subcommand) => metadata::subcommand(subcommand).await?, } Ok(()) diff --git a/ops/src/metadata/backfiller.rs b/ops/src/metadata/backfiller.rs new file mode 100644 index 000000000..9557e950e --- /dev/null +++ b/ops/src/metadata/backfiller.rs @@ -0,0 +1,160 @@ +use anyhow::{Ok, Result}; +use clap::Parser; +use das_core::{ + connect_db, perform_metadata_json_task, DownloadMetadataInfo, DownloadMetadataJsonRetryConfig, + MetadataJsonDownloadWorkerArgs, PoolArgs, +}; + +use digital_asset_types::dao::asset_data; +use indicatif::HumanDuration; +use log::{debug, error}; +use reqwest::Client; +use sea_orm::{ColumnTrait, EntityTrait, JsonValue, QueryFilter, SqlxPostgresConnector}; +use std::sync::Arc; +use tokio::{task::JoinHandle, time::Instant}; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + #[clap(flatten)] + pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs, + /// Database configuration + #[clap(flatten)] + pub database: PoolArgs, +} + +pub const DEFAULT_METADATA_JSON_DOWNLOAD_WORKER_COUNT: usize = 25; + +#[derive(Debug, Clone)] +pub struct MetadataJsonBackfillerContext { + pub database_pool: sqlx::PgPool, + pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs, +} + +pub async fn start_backfill(context: MetadataJsonBackfillerContext) -> Result<()> { + let MetadataJsonBackfillerContext { + database_pool, + metadata_json_download_worker: + MetadataJsonDownloadWorkerArgs { + metadata_json_download_worker_count, + metadata_json_download_worker_request_timeout, + }, + } = context; + + let mut worker_count = if metadata_json_download_worker_count > 0 { + metadata_json_download_worker_count + } else { + DEFAULT_METADATA_JSON_DOWNLOAD_WORKER_COUNT + }; + let database_pool = database_pool.clone(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool.clone()); + + let download_metadata_info_vec = asset_data::Entity::find() + .filter(asset_data::Column::Metadata.eq(JsonValue::String("processing".to_string()))) + .all(&conn) + .await? + .iter() + .map(|d| DownloadMetadataInfo::new(d.id.clone(), d.metadata_url.clone(), d.slot_updated)) + .collect::>(); + + let metadata_vec_len = download_metadata_info_vec.len(); + debug!( + "Found {} assets to download", + download_metadata_info_vec.len() + ); + + if metadata_vec_len == 0 { + return Ok(()); + } + + if worker_count > metadata_vec_len { + if metadata_vec_len == 1 { + worker_count = 1; + } else { + // If the number of assets is less than the number of workers, we assume each worker will handle 2 assets + worker_count = metadata_vec_len / 2; + } + } + + let excess_tasks = metadata_vec_len % worker_count; + let mut current_tasks_per_worker = if excess_tasks > 0 { + metadata_vec_len / worker_count + 1 + } else { + metadata_vec_len / worker_count + }; + + let mut handlers: Vec> = Vec::with_capacity(metadata_json_download_worker_count); + + let mut curr_start = 0; + let client = Client::builder() + .timeout(std::time::Duration::from_millis( + metadata_json_download_worker_request_timeout, + )) + .build()?; + + debug!("worker_count: {}", worker_count); + for _ in 0..worker_count { + let start = curr_start; + + let end = start + current_tasks_per_worker; + + let handler = spawn_metadata_fetch_task( + client.clone(), + database_pool.clone(), + &download_metadata_info_vec[start..end], + ); + + handlers.push(handler); + + current_tasks_per_worker = current_tasks_per_worker.saturating_sub(1); + + curr_start = end; + } + + futures::future::join_all(handlers).await; + + Ok(()) +} + +fn spawn_metadata_fetch_task( + client: reqwest::Client, + pool: sqlx::PgPool, + download_metadata_info: &[DownloadMetadataInfo], +) -> JoinHandle<()> { + let download_metadata_info = download_metadata_info.to_vec(); + tokio::spawn(async move { + for d in download_metadata_info.iter() { + let timing = Instant::now(); + let asset_data_id = bs58::encode(d.asset_data_id.clone()).into_string(); + + if let Err(e) = perform_metadata_json_task( + client.clone(), + pool.clone(), + d, + Arc::new(DownloadMetadataJsonRetryConfig::default()), + ) + .await + { + error!("Asset {} failed: {}", asset_data_id, e); + } + + debug!( + "Asset {} finished in {}", + asset_data_id, + HumanDuration(timing.elapsed()) + ); + } + }) +} + +pub async fn run(config: Args) -> Result<()> { + let database_pool = connect_db(&config.database).await?; + + let context = MetadataJsonBackfillerContext { + database_pool, + metadata_json_download_worker: config.metadata_json_download_worker, + }; + + start_backfill(context).await?; + + Ok(()) +} diff --git a/ops/src/metadata/cmd.rs b/ops/src/metadata/cmd.rs new file mode 100644 index 000000000..e2b26be8b --- /dev/null +++ b/ops/src/metadata/cmd.rs @@ -0,0 +1,27 @@ +use super::backfiller; +use anyhow::Result; +use clap::{Args, Subcommand}; + +#[derive(Debug, Clone, Subcommand)] +pub enum Commands { + /// The 'backfill' command is used to cross-reference the index against on-chain accounts. + /// It fetches all metadata json data marked as 'processing' and downloads the metadata json files. + #[clap(name = "backfill")] + Backfill(backfiller::Args), +} + +#[derive(Debug, Clone, Args)] +pub struct MetadataJsonCommand { + #[clap(subcommand)] + pub action: Commands, +} + +pub async fn subcommand(subcommand: MetadataJsonCommand) -> Result<()> { + match subcommand.action { + Commands::Backfill(args) => { + backfiller::run(args).await?; + } + } + + Ok(()) +} diff --git a/ops/src/metadata/mod.rs b/ops/src/metadata/mod.rs new file mode 100644 index 000000000..d798b6e9e --- /dev/null +++ b/ops/src/metadata/mod.rs @@ -0,0 +1,4 @@ +mod backfiller; +mod cmd; + +pub use cmd::*; From e678814ad90af6bc45f06c4e06619d9d73eacb9f Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Fri, 24 Jan 2025 05:44:50 -0500 Subject: [PATCH 2/2] Shutdown program on stream error or close (#213) --- grpc-ingest/src/grpc.rs | 186 +++++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 80 deletions(-) diff --git a/grpc-ingest/src/grpc.rs b/grpc-ingest/src/grpc.rs index 4b67bc2e0..c47997960 100644 --- a/grpc-ingest/src/grpc.rs +++ b/grpc-ingest/src/grpc.rs @@ -38,6 +38,9 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { let subscriptions = config.subscriptions.clone(); + let (global_shutdown_tx, mut global_shutdown_rx) = oneshot::channel(); + let global_shutdown_tx = Arc::new(Mutex::new(Some(global_shutdown_tx))); + let mut subscription_tasks = Vec::new(); for (label, subscription_config) in subscriptions { let subscription = Subscription { @@ -48,19 +51,27 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { .config(Arc::clone(&config)) .connection(connection.clone()) .subscription(subscription) - .start() + .start(Arc::clone(&global_shutdown_tx)) .await?; subscription_tasks.push(task); } - if let Some(signal) = shutdown.next().await { - warn!( - target: "grpc2redis", - action = "shutdown_signal_received", - message = "Shutdown signal received, waiting for spawned tasks to complete", - signal = ?signal - ); + tokio::select! { + _ = &mut global_shutdown_rx => { + warn!( + target: "grpc2redis", + action = "global_shutdown_signal_received", + message = "Global shutdown signal received, stopping all tasks" + ); + } + _ = shutdown.next() => { + warn!( + target: "grpc2redis", + action = "shutdown_signal_received", + message = "Shutdown signal received, waiting for spawned tasks to complete" + ); + } } futures::future::join_all( @@ -108,7 +119,10 @@ impl SubscriptionTask { self } - pub async fn start(mut self) -> anyhow::Result { + pub async fn start( + mut self, + global_shutdown_tx: Arc>>>, + ) -> anyhow::Result { let config = Arc::clone(&self.config); let connection = self .connection @@ -164,6 +178,7 @@ impl SubscriptionTask { let (mut subscribe_tx, stream) = dragon_mouth_client .subscribe_with_request(Some(request)) .await?; + let global_shutdown_tx = Arc::clone(&global_shutdown_tx); let control = tokio::spawn({ async move { @@ -215,89 +230,100 @@ impl SubscriptionTask { loop { tokio::select! { - Some(Ok(msg)) = stream.next() => { - match msg.update_oneof { - Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => { - if tasks.len() >= stream_config.max_concurrency { - tasks.next().await; - } - grpc_tasks_total_inc(&label, &stream_config.name); - - tasks.push(tokio::spawn({ - let pipe = Arc::clone(&pipes[current_pipe_index]); - let label = label.clone(); - let stream_config = Arc::clone(&stream_config); - - async move { - let stream = stream_config.name.clone(); - let stream_maxlen = stream_config.max_len; - - let SubscribeUpdate { update_oneof, .. } = msg; - - let mut pipe = pipe.lock().await; - - if let Some(update) = update_oneof { - match update { - UpdateOneof::Account(account) => { - pipe.xadd_maxlen( - &stream.to_string(), - StreamMaxlen::Approx(stream_maxlen), - "*", - account.encode_to_vec(), - ); - debug!(target: "grpc2redis", action = "process_account_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen); + event = stream.next() => { + match event { + Some(Ok(msg)) => { + match msg.update_oneof { + Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => { + if tasks.len() >= stream_config.max_concurrency { + tasks.next().await; + } + grpc_tasks_total_inc(&label, &stream_config.name); + + tasks.push(tokio::spawn({ + let pipe = Arc::clone(&pipes[current_pipe_index]); + let label = label.clone(); + let stream_config = Arc::clone(&stream_config); + + async move { + let stream = stream_config.name.clone(); + let stream_maxlen = stream_config.max_len; + + let SubscribeUpdate { update_oneof, .. } = msg; + + let mut pipe = pipe.lock().await; + + if let Some(update) = update_oneof { + match update { + UpdateOneof::Account(account) => { + pipe.xadd_maxlen( + &stream.to_string(), + StreamMaxlen::Approx(stream_maxlen), + "*", + account.encode_to_vec(), + ); + debug!(target: "grpc2redis", action = "process_account_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen); + } + + UpdateOneof::Transaction(transaction) => { + pipe.xadd_maxlen( + &stream.to_string(), + StreamMaxlen::Approx(stream_maxlen), + "*", + transaction.encode_to_vec(), + ); + debug!(target: "grpc2redis", action = "process_transaction_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen); + } + _ => { + warn!(target: "grpc2redis", action = "unknown_update_variant", label = ?label, message = "Unknown update variant"); + } + } } - UpdateOneof::Transaction(transaction) => { - pipe.xadd_maxlen( - &stream.to_string(), - StreamMaxlen::Approx(stream_maxlen), - "*", - transaction.encode_to_vec(), - ); - debug!(target: "grpc2redis", action = "process_transaction_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen); - } - _ => { - warn!(target: "grpc2redis", action = "unknown_update_variant",label = ?label, message = "Unknown update variant") - } + grpc_tasks_total_dec(&label, &stream_config.name); } - } - + })); - grpc_tasks_total_dec(&label, &stream_config.name); + current_pipe_index = (current_pipe_index + 1) % pipes.len(); } - })); - - current_pipe_index = (current_pipe_index + 1) % pipes.len(); - } - Some(UpdateOneof::Ping(_)) => { - let ping = subscribe_tx - .send(SubscribeRequest { - ping: Some(SubscribeRequestPing { id: PING_ID }), - ..Default::default() - }) - .await; - - match ping { - Ok(_) => { - debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID) + Some(UpdateOneof::Ping(_)) => { + let ping = subscribe_tx + .send(SubscribeRequest { + ping: Some(SubscribeRequestPing { id: PING_ID }), + ..Default::default() + }) + .await; + + match ping { + Ok(_) => { + debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID); + } + Err(err) => { + warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID); + } + } } - Err(err) => { - warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID) + Some(UpdateOneof::Pong(pong)) => { + if pong.id == PING_ID { + debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID); + } else { + warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id); + } + } + _ => { + warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant"); } - } - } - Some(UpdateOneof::Pong(pong)) => { - if pong.id == PING_ID { - debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID); - } else { - warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id); } } _ => { - warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant", ?msg.update_oneof) + break; } } + + let mut global_shutdown_tx = global_shutdown_tx.lock().await; + if let Some(global_shutdown_tx) = global_shutdown_tx.take() { + let _ = global_shutdown_tx.send(()); + } } _ = &mut shutdown_rx => { debug!(target: "grpc2redis", action = "shutdown_signal_received", message = "Shutdown signal received, stopping subscription task", ?label);