From 2e6771ea1dba0d891bd43e9f6cdf66cd4916a989 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Wed, 15 Jan 2025 17:40:33 +0100 Subject: [PATCH 1/3] Update to rust 1.79.0 --- docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 7358cb6f..6c00aa5f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -107,7 +107,7 @@ services: volumes: - ./db-data/:/var/lib/postgresql/data/:z solana: - image: ghcr.io/metaplex-foundation/plerkle-test-validator:v1.9.0-1.75.0-v1.18.11 + image: ghcr.io/metaplex-foundation/plerkle-test-validator:v1.9.0-1.79.0-v1.18.11 volumes: - ./programs:/so/:ro - ./ledger:/config:rw From d049ca254ce4ea9efb0f8f6232d2d858baab33b4 Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Wed, 22 Jan 2025 13:10:27 +0530 Subject: [PATCH 2/3] feat : add metadata_json backfiller --- Cargo.lock | 1 + core/src/metadata_json.rs | 4 +- grpc-ingest/config-ingester.example.yml | 2 + ops/Cargo.toml | 1 + ops/src/main.rs | 4 + ops/src/metadata/backfiller.rs | 161 ++++++++++++++++++++++++ ops/src/metadata/cmd.rs | 27 ++++ ops/src/metadata/mod.rs | 4 + 8 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 ops/src/metadata/backfiller.rs create mode 100644 ops/src/metadata/cmd.rs create mode 100644 ops/src/metadata/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c1213881..5b7085c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1934,6 +1934,7 @@ dependencies = [ "mpl-bubblegum", "mpl-token-metadata", "program_transformers", + "reqwest", "sea-orm", "serde_json", "solana-account-decoder", diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index a32eccc4..731d356b 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -87,10 +87,10 @@ pub async fn create_download_metadata_notifier( pub struct MetadataJsonDownloadWorkerArgs { /// The number of worker threads #[arg(long, env, default_value = "25")] - metadata_json_download_worker_count: usize, + pub metadata_json_download_worker_count: usize, /// The request timeout in milliseconds #[arg(long, env, default_value = "1000")] - metadata_json_download_worker_request_timeout: u64, + pub metadata_json_download_worker_request_timeout: u64, } impl MetadataJsonDownloadWorkerArgs { diff --git a/grpc-ingest/config-ingester.example.yml b/grpc-ingest/config-ingester.example.yml index e9dbda4e..c72bff16 100644 --- a/grpc-ingest/config-ingester.example.yml +++ b/grpc-ingest/config-ingester.example.yml @@ -55,5 +55,7 @@ download_metadata: retry_max_delay: 10 # retry min delay in milliseconds retry_min_delay: 1 + # request timeout in milliseconds + request_timeout: 5_000 stream: name: METADATA_JSON diff --git a/ops/Cargo.toml b/ops/Cargo.toml index 14f74187..6a0964ae 100644 --- a/ops/Cargo.toml +++ b/ops/Cargo.toml @@ -39,3 +39,4 @@ tokio = { workspace = true } tracing = { workspace = true } mpl-token-metadata = { workspace = true } serde_json = { workspace = true } +reqwest = { workspace = true } \ No newline at end of file diff --git a/ops/src/main.rs b/ops/src/main.rs index 09e73be1..0b80c5d1 100644 --- a/ops/src/main.rs +++ b/ops/src/main.rs @@ -1,5 +1,6 @@ mod account; mod bubblegum; +mod metadata; use account::{subcommand as account_subcommand, AccountCommand}; use anyhow::Result; @@ -19,6 +20,8 @@ enum Command { Bubblegum(BubblegumCommand), #[clap(name = "account")] Account(AccountCommand), + #[clap(name = "metadata_json")] + MetadataJson(metadata::MetadataJsonCommand), } #[tokio::main] @@ -30,6 +33,7 @@ async fn main() -> Result<()> { match args.command { Command::Bubblegum(subcommand) => bubblegum_subcommand(subcommand).await?, Command::Account(subcommand) => account_subcommand(subcommand).await?, + Command::MetadataJson(subcommand) => metadata::subcommand(subcommand).await?, } Ok(()) diff --git a/ops/src/metadata/backfiller.rs b/ops/src/metadata/backfiller.rs new file mode 100644 index 00000000..daeb8384 --- /dev/null +++ b/ops/src/metadata/backfiller.rs @@ -0,0 +1,161 @@ +use anyhow::{Ok, Result}; +use clap::Parser; +use das_core::{ + connect_db, perform_metadata_json_task, DownloadMetadataInfo, DownloadMetadataJsonRetryConfig, + MetadataJsonDownloadWorkerArgs, PoolArgs, +}; + +use digital_asset_types::dao::asset_data; +use indicatif::HumanDuration; +use log::{debug, error}; +use reqwest::Client; +use sea_orm::{ColumnTrait, EntityTrait, JsonValue, QueryFilter, SqlxPostgresConnector}; +use std::sync::Arc; +use tokio::{task::JoinHandle, time::Instant}; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + #[clap(flatten)] + pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs, + /// Database configuration + #[clap(flatten)] + pub database: PoolArgs, +} + +pub const DEFAULT_METADATA_JSON_DOWNLOAD_WORKER_COUNT: usize = 25; + +#[derive(Debug, Clone)] +pub struct MetadataJsonBackfillerContext { + pub database_pool: sqlx::PgPool, + pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs, +} + +pub async fn start_backfill(context: MetadataJsonBackfillerContext) -> Result<()> { + let MetadataJsonBackfillerContext { + database_pool, + metadata_json_download_worker: + MetadataJsonDownloadWorkerArgs { + metadata_json_download_worker_count, + metadata_json_download_worker_request_timeout, + }, + } = context; + + let mut worker_count = if metadata_json_download_worker_count > 0 { + metadata_json_download_worker_count + } else { + DEFAULT_METADATA_JSON_DOWNLOAD_WORKER_COUNT + }; + let database_pool = database_pool.clone(); + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool.clone()); + + let download_metadata_info_vec = asset_data::Entity::find() + .filter(asset_data::Column::Metadata.eq(JsonValue::String("processing".to_string()))) + .all(&conn) + .await? + .iter() + .map(|d| DownloadMetadataInfo::new(d.id.clone(), d.metadata_url.clone(), d.slot_updated)) + .collect::>(); + + debug!( + "Found {} assets to download", + download_metadata_info_vec.len() + ); + + if download_metadata_info_vec.is_empty() { + return Ok(()); + } + + if worker_count > download_metadata_info_vec.len() { + if download_metadata_info_vec.len() == 1 { + worker_count = 1; + } else { + // If the number of assets is less than the number of workers, we assume each worker will handle 2 assets + worker_count = download_metadata_info_vec.len() / 2; + } + } + + let excess_tasks = download_metadata_info_vec.len() % worker_count; + let mut current_tasks_per_worker = if excess_tasks > 0 { + download_metadata_info_vec.len() / worker_count + 1 + } else { + download_metadata_info_vec.len() / worker_count + }; + + let mut handlers: Vec> = Vec::with_capacity(metadata_json_download_worker_count); + + let mut curr_start = 0; + let client = Client::builder() + .timeout(std::time::Duration::from_millis( + metadata_json_download_worker_request_timeout, + )) + .build()?; + + debug!("worker_count: {}", worker_count); + for _ in 0..worker_count { + let start = curr_start; + + let end = start + current_tasks_per_worker; + + let handler = spawn_metadata_fetch_task( + client.clone(), + database_pool.clone(), + &download_metadata_info_vec[start..end], + ); + + handlers.push(handler); + + if current_tasks_per_worker > 0 { + current_tasks_per_worker -= 1; + } + + curr_start = end; + } + + futures::future::join_all(handlers).await; + + Ok(()) +} + +fn spawn_metadata_fetch_task( + client: reqwest::Client, + pool: sqlx::PgPool, + download_metadata_info: &[DownloadMetadataInfo], +) -> JoinHandle<()> { + let download_metadata_info = download_metadata_info.to_vec(); + tokio::spawn(async move { + for d in download_metadata_info.iter() { + let timing = Instant::now(); + let asset_data_id = bs58::encode(d.asset_data_id.clone()).into_string(); + + if let Err(e) = perform_metadata_json_task( + client.clone(), + pool.clone(), + d, + Arc::new(DownloadMetadataJsonRetryConfig::default()), + ) + .await + { + error!("Asset {} failed: {}", asset_data_id, e); + } + + debug!( + "Asset {} finished in {}", + asset_data_id, + HumanDuration(timing.elapsed()) + ); + } + }) +} + +pub async fn run(config: Args) -> Result<()> { + let database_pool = connect_db(&config.database).await?; + + let context = MetadataJsonBackfillerContext { + database_pool, + metadata_json_download_worker: config.metadata_json_download_worker, + }; + + start_backfill(context).await?; + + Ok(()) +} diff --git a/ops/src/metadata/cmd.rs b/ops/src/metadata/cmd.rs new file mode 100644 index 00000000..e2b26be8 --- /dev/null +++ b/ops/src/metadata/cmd.rs @@ -0,0 +1,27 @@ +use super::backfiller; +use anyhow::Result; +use clap::{Args, Subcommand}; + +#[derive(Debug, Clone, Subcommand)] +pub enum Commands { + /// The 'backfill' command is used to cross-reference the index against on-chain accounts. + /// It fetches all metadata json data marked as 'processing' and downloads the metadata json files. + #[clap(name = "backfill")] + Backfill(backfiller::Args), +} + +#[derive(Debug, Clone, Args)] +pub struct MetadataJsonCommand { + #[clap(subcommand)] + pub action: Commands, +} + +pub async fn subcommand(subcommand: MetadataJsonCommand) -> Result<()> { + match subcommand.action { + Commands::Backfill(args) => { + backfiller::run(args).await?; + } + } + + Ok(()) +} diff --git a/ops/src/metadata/mod.rs b/ops/src/metadata/mod.rs new file mode 100644 index 00000000..d798b6e9 --- /dev/null +++ b/ops/src/metadata/mod.rs @@ -0,0 +1,4 @@ +mod backfiller; +mod cmd; + +pub use cmd::*; From 9cad70a64a1a0947d36953290c05c4dee04a54c4 Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Wed, 22 Jan 2025 15:21:56 +0530 Subject: [PATCH 3/3] cleanup --- ops/src/metadata/backfiller.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/ops/src/metadata/backfiller.rs b/ops/src/metadata/backfiller.rs index daeb8384..9557e950 100644 --- a/ops/src/metadata/backfiller.rs +++ b/ops/src/metadata/backfiller.rs @@ -56,29 +56,30 @@ pub async fn start_backfill(context: MetadataJsonBackfillerContext) -> Result<() .map(|d| DownloadMetadataInfo::new(d.id.clone(), d.metadata_url.clone(), d.slot_updated)) .collect::>(); + let metadata_vec_len = download_metadata_info_vec.len(); debug!( "Found {} assets to download", download_metadata_info_vec.len() ); - if download_metadata_info_vec.is_empty() { + if metadata_vec_len == 0 { return Ok(()); } - if worker_count > download_metadata_info_vec.len() { - if download_metadata_info_vec.len() == 1 { + if worker_count > metadata_vec_len { + if metadata_vec_len == 1 { worker_count = 1; } else { // If the number of assets is less than the number of workers, we assume each worker will handle 2 assets - worker_count = download_metadata_info_vec.len() / 2; + worker_count = metadata_vec_len / 2; } } - let excess_tasks = download_metadata_info_vec.len() % worker_count; + let excess_tasks = metadata_vec_len % worker_count; let mut current_tasks_per_worker = if excess_tasks > 0 { - download_metadata_info_vec.len() / worker_count + 1 + metadata_vec_len / worker_count + 1 } else { - download_metadata_info_vec.len() / worker_count + metadata_vec_len / worker_count }; let mut handlers: Vec> = Vec::with_capacity(metadata_json_download_worker_count); @@ -104,9 +105,7 @@ pub async fn start_backfill(context: MetadataJsonBackfillerContext) -> Result<() handlers.push(handler); - if current_tasks_per_worker > 0 { - current_tasks_per_worker -= 1; - } + current_tasks_per_worker = current_tasks_per_worker.saturating_sub(1); curr_start = end; }