diff --git a/Cargo.lock b/Cargo.lock index 3f884075b..a8f8fa573 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3537,6 +3537,8 @@ dependencies = [ "log", "proxy-wasm", "regex", + "wasi 0.7.0", + "wasm-bindgen", ] [[package]] @@ -3730,7 +3732,9 @@ dependencies = [ "async-stream", "atty", "cargo-lock", + "chrono", "clap 4.5.4", + "digital_asset_types", "futures", "git-version", "hyper", @@ -3742,7 +3746,10 @@ dependencies = [ "program_transformers", "prometheus", "redis 0.25.3", + "rust-crypto", + "sea-orm", "serde", + "serde_json", "serde_yaml", "solana-sdk", "sqlx", @@ -4633,6 +4640,7 @@ dependencies = [ "mpl-bubblegum", "num-traits", "sea-orm", + "serde", "serde_json", "solana-sdk", "solana-transaction-status", @@ -8166,6 +8174,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d" + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/metaplex-rpc-proxy/Cargo.toml b/metaplex-rpc-proxy/Cargo.toml index 06b801543..2e69785dd 100644 --- a/metaplex-rpc-proxy/Cargo.toml +++ b/metaplex-rpc-proxy/Cargo.toml @@ -13,6 +13,8 @@ lazy_static = { workspace = true } log = { workspace = true } proxy-wasm = { workspace = true } regex = { workspace = true } +wasi = { workspace = true } +wasm-bindgen = { workspace = true } [lints] workspace = true diff --git a/nft_ingester/src/transaction_notifications.rs b/nft_ingester/src/transaction_notifications.rs index 5335de6dc..df005bc56 100644 --- a/nft_ingester/src/transaction_notifications.rs +++ b/nft_ingester/src/transaction_notifications.rs @@ -10,7 +10,7 @@ use { log::{debug, error}, plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}, plerkle_serialization::root_as_transaction_info, - program_transformers::{error::ProgramTransformerResult, ProgramTransformer, TransactionInfo}, + program_transformers::ProgramTransformer, sqlx::{Pool, Postgres}, std::sync::Arc, tokio::{ diff --git a/nft_ingester2/Cargo.toml b/nft_ingester2/Cargo.toml index 4ab4f7f80..bf3f590ec 100644 --- a/nft_ingester2/Cargo.toml +++ b/nft_ingester2/Cargo.toml @@ -17,7 +17,9 @@ sqlx = { workspace = true, features = [ "offline", "json", ] } +chrono = { workspace = true } clap = { workspace = true, features = ["cargo", "derive"] } +digital_asset_types = { workspace = true } futures = { workspace = true } hyper = { workspace = true, features = ["server"] } json5 = { workspace = true } @@ -28,7 +30,10 @@ opentelemetry_sdk = { workspace = true, features = ["trace"] } program_transformers = { workspace = true } prometheus = { workspace = true } redis = { workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"] } +rust-crypto = { workspace = true } +sea-orm = { workspace = true, features = ["sqlx-postgres"] } serde = { workspace = true } +serde_json = { workspace = true } serde_yaml = { workspace = true } solana-sdk = { workspace = true } # only prom rn tokio = { workspace = true, features = [ diff --git a/nft_ingester2/config-run.yml b/nft_ingester2/config-run.yml index 19a1b3f84..3af3ad2e7 100644 --- a/nft_ingester2/config-run.yml +++ b/nft_ingester2/config-run.yml @@ -20,7 +20,9 @@ redis: postgres: url: postgres://solana:solana@localhost/solana min_connections: 10 - max_connections: 25 + max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible program_transformer: transactions_cl_audits: false - max_tasks_in_process: 100 + max_tasks_in_process: 40 +download_metadata_handler: + max_attempts: 3 diff --git a/nft_ingester2/src/config.rs b/nft_ingester2/src/config.rs index 003985f27..d01bfeeaa 100644 --- a/nft_ingester2/src/config.rs +++ b/nft_ingester2/src/config.rs @@ -3,6 +3,7 @@ use { serde::{de, Deserialize}, std::{net::SocketAddr, path::Path, time::Duration}, tokio::fs, + tracing::warn, yellowstone_grpc_tools::config::{ deserialize_usize_str, ConfigGrpcRequestAccounts, ConfigGrpcRequestCommitment, ConfigGrpcRequestTransactions, @@ -154,6 +155,15 @@ pub struct ConfigIngester { pub redis: ConfigIngesterRedis, pub postgres: ConfigIngesterPostgres, pub program_transformer: ConfigIngesterProgramTransformer, + pub download_metadata_handler: ConfigDownloadMetadataHandler, +} + +impl ConfigIngester { + pub fn check(&self) { + if self.postgres.max_connections < self.program_transformer.max_tasks_in_process { + warn!("`postgres.max_connections` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible"); + } + } } #[derive(Debug, Deserialize)] @@ -305,7 +315,7 @@ impl ConfigIngesterPostgres { } pub const fn default_max_connections() -> usize { - 25 + 50 } } @@ -326,6 +336,21 @@ impl ConfigIngesterProgramTransformer { } pub const fn default_max_tasks_in_process() -> usize { - 100 + 40 + } +} + +#[derive(Debug, Clone, Copy, Deserialize)] +pub struct ConfigDownloadMetadataHandler { + #[serde( + default = "ConfigDownloadMetadataHandler::default_max_attempts", + deserialize_with = "deserialize_usize_str" + )] + pub max_attempts: usize, +} + +impl ConfigDownloadMetadataHandler { + pub const fn default_max_attempts() -> usize { + 3 } } diff --git a/nft_ingester2/src/ingester.rs b/nft_ingester2/src/ingester.rs index 479e5bddb..22f4311c2 100644 --- a/nft_ingester2/src/ingester.rs +++ b/nft_ingester2/src/ingester.rs @@ -1,14 +1,17 @@ use { crate::{ - config::ConfigIngester, + config::{ConfigDownloadMetadataHandler, ConfigIngester}, postgres::{create_pool as pg_create_pool, metrics_pgpool}, prom::{ - program_transformer_task_status_inc, program_transformer_tasks_total_set, - ProgramTransformerTaskStatusKind, + download_metadata_inserted_total_inc, program_transformer_task_status_inc, + program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind, }, redis::{metrics_xlen, ProgramTransformerInfo, RedisStream}, util::create_shutdown, }, + chrono::Utc, + crypto::{digest::Digest, sha2::Sha256}, + digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks}, futures::{ future::{pending, BoxFuture, FusedFuture, FutureExt}, stream::StreamExt, @@ -17,9 +20,18 @@ use { error::ProgramTransformerError, DownloadMetadataInfo, DownloadMetadataNotifier, ProgramTransformer, }, - std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, + sea_orm::{ + entity::{ActiveModelTrait, ActiveValue}, + error::{DbErr, RuntimeErr}, + SqlxPostgresConnector, + }, + sqlx::{Error as SqlxError, PgPool}, + std::{ + borrow::Cow, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }, tokio::{ task::JoinSet, @@ -29,8 +41,6 @@ use { }; pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { - println!("{:#?}", config); - // connect to Redis let client = redis::Client::open(config.redis.url.clone())?; let connection = client.get_multiplexed_tokio_connection().await?; @@ -62,12 +72,12 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { // program transforms related let pt_accounts = Arc::new(ProgramTransformer::new( pgpool.clone(), - create_notifier(), + create_download_metadata_notifier(pgpool.clone(), config.download_metadata_handler)?, false, )); let pt_transactions = Arc::new(ProgramTransformer::new( pgpool.clone(), - create_notifier(), + create_download_metadata_notifier(pgpool.clone(), config.download_metadata_handler)?, config.program_transformer.transactions_cl_audits, )); let pt_max_tasks_in_process = config.program_transformer.max_tasks_in_process; @@ -214,14 +224,49 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { result } -fn create_notifier() -> DownloadMetadataNotifier { - Box::new( - move |_info: DownloadMetadataInfo| -> BoxFuture< - 'static, - Result<(), Box>, - > { - // TODO - Box::pin(async move { Ok(()) }) - }, - ) +fn create_download_metadata_notifier( + pgpool: PgPool, + config: ConfigDownloadMetadataHandler, +) -> anyhow::Result { + let max_attempts = config.max_attempts.try_into()?; + Ok(Box::new(move |info: DownloadMetadataInfo| -> BoxFuture< + 'static, + Result<(), Box>, + > { + let pgpool = pgpool.clone(); + Box::pin(async move { + const NAME: &str = "DownloadMetadata"; + + let data = serde_json::to_value(info)?; + + let mut hasher = Sha256::new(); + hasher.input(NAME.as_bytes()); + hasher.input(serde_json::to_vec(&data)?.as_slice()); + let hash = hasher.result_str(); + + let model = tasks::ActiveModel { + id: ActiveValue::Set(hash), + task_type: ActiveValue::Set(NAME.to_owned()), + data: ActiveValue::Set(data), + status: ActiveValue::Set(TaskStatus::Pending), + created_at: ActiveValue::Set(Utc::now().naive_utc()), + locked_until: ActiveValue::Set(None), + locked_by: ActiveValue::Set(None), + max_attempts: ActiveValue::Set(max_attempts), + attempts: ActiveValue::Set(0), + duration: ActiveValue::Set(None), + errors: ActiveValue::Set(None), + }; + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pgpool); + + match model.insert(&conn).await.map(|_mode| ()) { + // skip unique_violation error + Err(DbErr::Query(RuntimeErr::SqlxError(SqlxError::Database(dberr)))) if dberr.code() == Some(Cow::Borrowed("23505")) => {}, + value => value?, + }; + download_metadata_inserted_total_inc(); + + Ok(()) + }) + })) } diff --git a/nft_ingester2/src/main.rs b/nft_ingester2/src/main.rs index 0bd1f9b8d..7622b8aec 100644 --- a/nft_ingester2/src/main.rs +++ b/nft_ingester2/src/main.rs @@ -70,6 +70,7 @@ async fn main() -> anyhow::Result<()> { let config = config_load::(&args.config) .await .with_context(|| format!("failed to parse config from: {}", args.config))?; + config.check(); ingester::run(config).await } } diff --git a/nft_ingester2/src/prom.rs b/nft_ingester2/src/prom.rs index 884407760..e81de4255 100644 --- a/nft_ingester2/src/prom.rs +++ b/nft_ingester2/src/prom.rs @@ -5,7 +5,7 @@ use { service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, }, - prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}, + prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}, std::{net::SocketAddr, sync::Once}, tracing::{error, info}, }; @@ -46,6 +46,10 @@ lazy_static::lazy_static! { Opts::new("program_transformer_task_status", "Status of processed messages"), &["status"], ).unwrap(); + + static ref DOWNLOAD_METADATA_INSERTED_TOTAL: IntCounter = IntCounter::new( + "download_metadata_inserted_total", "Total number of inserted tasks for download metadata" + ).unwrap(); } pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { @@ -65,6 +69,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(PGPOOL_CONNECTIONS_TOTAL); register!(PROGRAM_TRANSFORMER_TASKS_TOTAL); register!(PROGRAM_TRANSFORMER_TASK_STATUS); + register!(DOWNLOAD_METADATA_INSERTED_TOTAL); VERSION .with_label_values(&[ @@ -171,3 +176,7 @@ pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKin }]) .inc() } + +pub fn download_metadata_inserted_total_inc() { + DOWNLOAD_METADATA_INSERTED_TOTAL.inc() +} diff --git a/program_transformers/Cargo.toml b/program_transformers/Cargo.toml index 08f784632..aa96212e7 100644 --- a/program_transformers/Cargo.toml +++ b/program_transformers/Cargo.toml @@ -19,6 +19,7 @@ heck = { workspace = true } mpl-bubblegum = { workspace = true } num-traits = { workspace = true } sea-orm = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index edde20ca3..db52e55d4 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -20,6 +20,7 @@ use { entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr, SqlxPostgresConnector, TransactionTrait, }, + serde::Serialize, solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature}, solana_transaction_status::InnerInstructions, sqlx::PgPool, @@ -52,7 +53,7 @@ pub struct TransactionInfo { pub meta_inner_instructions: Vec, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct DownloadMetadataInfo { asset_data_id: Vec, uri: String,