diff --git a/Cargo.lock b/Cargo.lock index 0f6b76380..856a5c9a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1945,8 +1945,10 @@ dependencies = [ "indicatif", "log", "mpl-bubblegum", + "mpl-token-metadata", "program_transformers", "sea-orm", + "serde_json", "solana-account-decoder", "solana-client", "solana-program", diff --git a/core/src/solana_rpc.rs b/core/src/solana_rpc.rs index b517f1404..26e9e1f05 100644 --- a/core/src/solana_rpc.rs +++ b/core/src/solana_rpc.rs @@ -3,13 +3,16 @@ use backon::ExponentialBuilder; use backon::Retryable; use clap::Parser; use solana_account_decoder::UiAccountEncoding; -use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; +use solana_client::rpc_response::RpcTokenAccountBalance; use solana_client::{ client_error::ClientError, nonblocking::rpc_client::RpcClient, rpc_client::GetConfirmedSignaturesForAddress2Config, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionConfig}, rpc_filter::RpcFilterType, + rpc_request::RpcRequest, + rpc_response::Response as RpcResponse, + rpc_response::RpcConfirmedTransactionStatusWithSignature, }; use solana_sdk::{ account::Account, @@ -157,4 +160,26 @@ impl Rpc { .await? .value) } + + pub async fn get_token_largest_account(&self, mint: Pubkey) -> anyhow::Result { + Ok((|| async { + self.0 + .send::>>( + RpcRequest::Custom { + method: "getTokenLargestAccounts", + }, + serde_json::json!([mint.to_string(),]), + ) + .await + }) + .retry(&ExponentialBuilder::default()) + .await? + .value + .first() + .ok_or(anyhow::anyhow!(format!( + "no token accounts for mint {mint}: burned nft?" + )))? + .address + .parse::()?) + } } diff --git a/ops/Cargo.toml b/ops/Cargo.toml index 63b91de67..1f37301b5 100644 --- a/ops/Cargo.toml +++ b/ops/Cargo.toml @@ -36,3 +36,5 @@ spl-account-compression = { workspace = true } sqlx = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +mpl-token-metadata = { workspace = true } +serde_json = { workspace = true } \ No newline at end of file diff --git a/ops/src/account/cmd.rs b/ops/src/account/cmd.rs index 659758d1c..9861cd8fe 100644 --- a/ops/src/account/cmd.rs +++ b/ops/src/account/cmd.rs @@ -1,4 +1,4 @@ -use super::{program, single}; +use super::{nft, program, single}; use anyhow::Result; use clap::{Args, Subcommand}; @@ -10,6 +10,9 @@ pub enum Commands { /// The 'single' command is used to backfill the index against a single account. #[clap(name = "single")] Single(single::Args), + + #[clap(name = "nft")] + Nft(nft::Args), } #[derive(Debug, Clone, Args)] @@ -26,6 +29,9 @@ pub async fn subcommand(subcommand: AccountCommand) -> Result<()> { Commands::Single(args) => { single::run(args).await?; } + Commands::Nft(args) => { + nft::run(args).await?; + } } Ok(()) diff --git a/ops/src/account/mod.rs b/ops/src/account/mod.rs index 59563b02e..e770cd362 100644 --- a/ops/src/account/mod.rs +++ b/ops/src/account/mod.rs @@ -1,5 +1,6 @@ mod account_info; mod cmd; +mod nft; mod program; mod single; diff --git a/ops/src/account/nft.rs b/ops/src/account/nft.rs new file mode 100644 index 000000000..f92e005d0 --- /dev/null +++ b/ops/src/account/nft.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use anyhow::Result; +use tokio::task::JoinHandle; + +use super::account_info; +use log::error; + +use clap::Parser; +use das_core::{ + connect_db, create_download_metadata_notifier, MetadataJsonDownloadWorkerArgs, PoolArgs, Rpc, + SolanaRpcArgs, +}; +use mpl_token_metadata::accounts::Metadata; +use program_transformers::ProgramTransformer; +use solana_sdk::pubkey::Pubkey; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + /// Database configuration + #[clap(flatten)] + pub database: PoolArgs, + + #[clap(flatten)] + pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs, + + /// Solana configuration + #[clap(flatten)] + pub solana: SolanaRpcArgs, + + /// NFT Mint address + #[clap(value_parser = parse_pubkey)] + pub mint: Pubkey, +} + +fn parse_pubkey(s: &str) -> Result { + Pubkey::try_from(s).map_err(|_| "Failed to parse public key") +} + +pub async fn run(config: Args) -> Result<()> { + let rpc = Rpc::from_config(&config.solana); + let pool = connect_db(&config.database).await?; + let metadata_json_download_db_pool = pool.clone(); + + let (metadata_json_download_worker, metadata_json_download_sender) = config + .metadata_json_download_worker + .start(metadata_json_download_db_pool)?; + + let download_metadata_notifier = + create_download_metadata_notifier(metadata_json_download_sender.clone()).await; + + let mint = config.mint; + + let metadata = Metadata::find_pda(&mint).0; + + let mut accounts_to_fetch = vec![mint, metadata]; + + let token_account = rpc.get_token_largest_account(mint).await; + + if let Ok(token_account) = token_account { + accounts_to_fetch.push(token_account); + } + + let program_transformer = Arc::new(ProgramTransformer::new(pool, download_metadata_notifier)); + let mut tasks = Vec::new(); + + for account in accounts_to_fetch { + let program_transformer = Arc::clone(&program_transformer); + let rpc = rpc.clone(); + + let task: JoinHandle> = tokio::spawn(async move { + let account_info = account_info::fetch(&rpc, account).await?; + if let Err(e) = program_transformer + .handle_account_update(&account_info) + .await + { + error!("Failed to handle account update: {:?}", e); + } + + Ok(()) + }); + + tasks.push(task); + } + + futures::future::try_join_all(tasks).await?; + + drop(metadata_json_download_sender); + + drop(program_transformer); + + metadata_json_download_worker.await?; + + Ok(()) +}