Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata json backfiller #209

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/src/metadata_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ pub async fn create_download_metadata_notifier(
pub struct MetadataJsonDownloadWorkerArgs {
/// The number of worker threads
#[arg(long, env, default_value = "25")]
metadata_json_download_worker_count: usize,
pub metadata_json_download_worker_count: usize,
/// The request timeout in milliseconds
#[arg(long, env, default_value = "1000")]
metadata_json_download_worker_request_timeout: u64,
pub metadata_json_download_worker_request_timeout: u64,
}

impl MetadataJsonDownloadWorkerArgs {
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ services:
volumes:
- ./db-data/:/var/lib/postgresql/data/:z
solana:
image: ghcr.io/metaplex-foundation/plerkle-test-validator:v1.9.0-1.75.0-v1.18.11
image: ghcr.io/metaplex-foundation/plerkle-test-validator:v1.9.0-1.79.0-v1.18.11
volumes:
- ./programs:/so/:ro
- ./ledger:/config:rw
Expand Down
2 changes: 2 additions & 0 deletions grpc-ingest/config-ingester.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@ download_metadata:
retry_max_delay: 10
# retry min delay in milliseconds
retry_min_delay: 1
# request timeout in milliseconds
request_timeout: 5_000
stream:
name: METADATA_JSON
1 change: 1 addition & 0 deletions ops/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ tokio = { workspace = true }
tracing = { workspace = true }
mpl-token-metadata = { workspace = true }
serde_json = { workspace = true }
reqwest = { workspace = true }
4 changes: 4 additions & 0 deletions ops/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod account;
mod bubblegum;
mod metadata;

use account::{subcommand as account_subcommand, AccountCommand};
use anyhow::Result;
Expand All @@ -19,6 +20,8 @@ enum Command {
Bubblegum(BubblegumCommand),
#[clap(name = "account")]
Account(AccountCommand),
#[clap(name = "metadata_json")]
MetadataJson(metadata::MetadataJsonCommand),
}

#[tokio::main]
Expand All @@ -30,6 +33,7 @@ async fn main() -> Result<()> {
match args.command {
Command::Bubblegum(subcommand) => bubblegum_subcommand(subcommand).await?,
Command::Account(subcommand) => account_subcommand(subcommand).await?,
Command::MetadataJson(subcommand) => metadata::subcommand(subcommand).await?,
}

Ok(())
Expand Down
160 changes: 160 additions & 0 deletions ops/src/metadata/backfiller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use anyhow::{Ok, Result};
use clap::Parser;
use das_core::{
connect_db, perform_metadata_json_task, DownloadMetadataInfo, DownloadMetadataJsonRetryConfig,
MetadataJsonDownloadWorkerArgs, PoolArgs,
};

use digital_asset_types::dao::asset_data;
use indicatif::HumanDuration;
use log::{debug, error};
use reqwest::Client;
use sea_orm::{ColumnTrait, EntityTrait, JsonValue, QueryFilter, SqlxPostgresConnector};
use std::sync::Arc;
use tokio::{task::JoinHandle, time::Instant};

#[derive(Debug, Parser, Clone)]
pub struct Args {
#[clap(flatten)]
pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs,
/// Database configuration
#[clap(flatten)]
pub database: PoolArgs,
}

pub const DEFAULT_METADATA_JSON_DOWNLOAD_WORKER_COUNT: usize = 25;

#[derive(Debug, Clone)]
pub struct MetadataJsonBackfillerContext {
pub database_pool: sqlx::PgPool,
pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs,
}

pub async fn start_backfill(context: MetadataJsonBackfillerContext) -> Result<()> {
let MetadataJsonBackfillerContext {
database_pool,
metadata_json_download_worker:
MetadataJsonDownloadWorkerArgs {
metadata_json_download_worker_count,
metadata_json_download_worker_request_timeout,
},
} = context;

let mut worker_count = if metadata_json_download_worker_count > 0 {
metadata_json_download_worker_count
} else {
DEFAULT_METADATA_JSON_DOWNLOAD_WORKER_COUNT
};
let database_pool = database_pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool.clone());

let download_metadata_info_vec = asset_data::Entity::find()
.filter(asset_data::Column::Metadata.eq(JsonValue::String("processing".to_string())))
.all(&conn)
.await?
.iter()
.map(|d| DownloadMetadataInfo::new(d.id.clone(), d.metadata_url.clone(), d.slot_updated))
.collect::<Vec<DownloadMetadataInfo>>();

let metadata_vec_len = download_metadata_info_vec.len();
debug!(
"Found {} assets to download",
download_metadata_info_vec.len()
);

if metadata_vec_len == 0 {
return Ok(());
}

if worker_count > metadata_vec_len {
if metadata_vec_len == 1 {
worker_count = 1;
} else {
// If the number of assets is less than the number of workers, we assume each worker will handle 2 assets
worker_count = metadata_vec_len / 2;
}
}

let excess_tasks = metadata_vec_len % worker_count;
let mut current_tasks_per_worker = if excess_tasks > 0 {
metadata_vec_len / worker_count + 1
} else {
metadata_vec_len / worker_count
};

let mut handlers: Vec<JoinHandle<()>> = Vec::with_capacity(metadata_json_download_worker_count);

let mut curr_start = 0;
let client = Client::builder()
.timeout(std::time::Duration::from_millis(
metadata_json_download_worker_request_timeout,
))
.build()?;

debug!("worker_count: {}", worker_count);
for _ in 0..worker_count {
let start = curr_start;

let end = start + current_tasks_per_worker;

let handler = spawn_metadata_fetch_task(
client.clone(),
database_pool.clone(),
&download_metadata_info_vec[start..end],
);

handlers.push(handler);

current_tasks_per_worker = current_tasks_per_worker.saturating_sub(1);

curr_start = end;
}

futures::future::join_all(handlers).await;

Ok(())
}

fn spawn_metadata_fetch_task(
client: reqwest::Client,
pool: sqlx::PgPool,
download_metadata_info: &[DownloadMetadataInfo],
) -> JoinHandle<()> {
let download_metadata_info = download_metadata_info.to_vec();
tokio::spawn(async move {
for d in download_metadata_info.iter() {
let timing = Instant::now();
let asset_data_id = bs58::encode(d.asset_data_id.clone()).into_string();

if let Err(e) = perform_metadata_json_task(
client.clone(),
pool.clone(),
d,
Arc::new(DownloadMetadataJsonRetryConfig::default()),
)
.await
{
error!("Asset {} failed: {}", asset_data_id, e);
}

debug!(
"Asset {} finished in {}",
asset_data_id,
HumanDuration(timing.elapsed())
);
}
})
}

pub async fn run(config: Args) -> Result<()> {
let database_pool = connect_db(&config.database).await?;

let context = MetadataJsonBackfillerContext {
database_pool,
metadata_json_download_worker: config.metadata_json_download_worker,
};

start_backfill(context).await?;

Ok(())
}
27 changes: 27 additions & 0 deletions ops/src/metadata/cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use super::backfiller;
use anyhow::Result;
use clap::{Args, Subcommand};

#[derive(Debug, Clone, Subcommand)]
pub enum Commands {
/// The 'backfill' command is used to cross-reference the index against on-chain accounts.
/// It fetches all metadata json data marked as 'processing' and downloads the metadata json files.
#[clap(name = "backfill")]
Backfill(backfiller::Args),
}

#[derive(Debug, Clone, Args)]
pub struct MetadataJsonCommand {
#[clap(subcommand)]
pub action: Commands,
}

pub async fn subcommand(subcommand: MetadataJsonCommand) -> Result<()> {
match subcommand.action {
Commands::Backfill(args) => {
backfiller::run(args).await?;
}
}

Ok(())
}
4 changes: 4 additions & 0 deletions ops/src/metadata/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mod backfiller;
mod cmd;

pub use cmd::*;