diff --git a/Cargo.lock b/Cargo.lock index 5ecea895a..6d5820e11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4044,6 +4044,28 @@ dependencies = [ "yansi", ] +[[package]] +name = "program_transformers" +version = "0.7.2" +dependencies = [ + "blockbuster", + "bs58 0.4.0", + "digital_asset_types", + "futures", + "mpl-bubblegum", + "num-traits", + "plerkle_serialization", + "sea-orm", + "serde_json", + "solana-sdk", + "spl-account-compression", + "spl-token", + "sqlx", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "prometheus" version = "0.13.3" diff --git a/Cargo.toml b/Cargo.toml index 06d2b7b94..ccb18ce6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "metaplex-rpc-proxy", "migration", "nft_ingester", + "program_transformers", "tools/acc_forwarder", "tools/bgtask_creator", "tools/fetch_trees", diff --git a/nft_ingester/.dockerignore b/nft_ingester/.dockerignore deleted file mode 100644 index 1de565933..000000000 --- a/nft_ingester/.dockerignore +++ /dev/null @@ -1 +0,0 @@ -target \ No newline at end of file diff --git a/program_transformers/Cargo.toml b/program_transformers/Cargo.toml new file mode 100644 index 000000000..8596b7e51 --- /dev/null +++ b/program_transformers/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "program_transformers" +version = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } +publish = { workspace = true } + +[dependencies] +blockbuster = { workspace = true } +bs58 = { workspace = true } +digital_asset_types = { workspace = true, features = ["json_types", "sql_types"] } +futures = { workspace = true } +mpl-bubblegum = { workspace = true } +num-traits = { workspace = true } +plerkle_serialization = { workspace = true } +sea-orm = { workspace = true, features = [] } +serde_json = { workspace = true } +solana-sdk = { workspace = true } +spl-account-compression = { workspace = true, features = ["no-entrypoint"] } +spl-token = { workspace = true, features = ["no-entrypoint"] } +sqlx = { workspace = true, features = [] } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["time"] } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/program_transformers/src/bubblegum/burn.rs b/program_transformers/src/bubblegum/burn.rs new file mode 100644 index 000000000..d0d45cca4 --- /dev/null +++ b/program_transformers/src/bubblegum/burn.rs @@ -0,0 +1,77 @@ +use { + crate::{ + bubblegum::{ + db::{save_changelog_event, upsert_asset_with_seq}, + u32_to_u8_array, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + }, + blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}, + digital_asset_types::dao::asset, + sea_orm::{ + entity::{ActiveValue, EntityTrait}, + query::QueryTrait, + sea_query::query::OnConflict, + ConnectionTrait, DbBackend, TransactionTrait, + }, + solana_sdk::pubkey::Pubkey, + tracing::debug, +}; + +pub async fn burn<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + if let Some(cl) = &parsing_result.tree_update { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + let leaf_index = cl.index; + let (asset_id, _) = Pubkey::find_program_address( + &[ + "asset".as_bytes(), + cl.id.as_ref(), + u32_to_u8_array(leaf_index).as_ref(), + ], + &mpl_bubblegum::ID, + ); + debug!("Indexing burn for asset id: {:?}", asset_id); + let id_bytes = asset_id.to_bytes(); + + let asset_model = asset::ActiveModel { + id: ActiveValue::Set(id_bytes.to_vec()), + burnt: ActiveValue::Set(true), + ..Default::default() + }; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Upsert asset table `burnt` column. Note we don't check for decompression (asset.seq = 0) + // because we know if the item was burnt it could not have been decompressed later. + let query = asset::Entity::insert(asset_model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([asset::Column::Burnt]) + .to_owned(), + ) + .build(DbBackend::Postgres); + multi_txn.execute(query).await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/cancel_redeem.rs b/program_transformers/src/bubblegum/cancel_redeem.rs new file mode 100644 index 000000000..450bde198 --- /dev/null +++ b/program_transformers/src/bubblegum/cancel_redeem.rs @@ -0,0 +1,85 @@ +use { + crate::{ + bubblegum::db::{ + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema}, + }, + sea_orm::{ConnectionTrait, TransactionTrait}, +}; + +pub async fn cancel_redeem<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + match le.schema { + LeafSchema::V1 { + id, + owner, + delegate, + .. + } => { + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); + let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { + None + } else { + Some(delegate.to_bytes().to_vec()) + }; + let tree_id = cl.id.to_bytes(); + let nonce = cl.index as i64; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + &multi_txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); + } + } + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/collection_verification.rs b/program_transformers/src/bubblegum/collection_verification.rs new file mode 100644 index 000000000..39284ffe3 --- /dev/null +++ b/program_transformers/src/bubblegum/collection_verification.rs @@ -0,0 +1,95 @@ +use { + crate::{ + bubblegum::db::{ + save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_seq, + upsert_collection_info, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, + }, + mpl_bubblegum::types::Collection, + sea_orm::{ConnectionTrait, TransactionTrait}, + tracing::debug, +}; + +pub async fn process<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + if let (Some(le), Some(cl), Some(payload)) = ( + &parsing_result.leaf_update, + &parsing_result.tree_update, + &parsing_result.payload, + ) { + let (collection, verify) = match payload { + Payload::CollectionVerification { + collection, verify, .. + } => (collection, verify), + _ => { + return Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )); + } + }; + debug!( + "Handling collection verification event for {} (verify: {}): {}", + collection, verify, bundle.txn_id + ); + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + let id_bytes = match le.schema { + LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), + }; + + let tree_id = cl.id.to_bytes(); + let nonce = cl.index as i64; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + upsert_collection_info( + &multi_txn, + id_bytes.to_vec(), + Some(Collection { + key: *collection, + verified: *verify, + }), + bundle.slot as i64, + seq as i64, + ) + .await?; + + multi_txn.commit().await?; + + return Ok(()); + }; + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/creator_verification.rs b/program_transformers/src/bubblegum/creator_verification.rs new file mode 100644 index 000000000..22646ec60 --- /dev/null +++ b/program_transformers/src/bubblegum/creator_verification.rs @@ -0,0 +1,133 @@ +use { + crate::{ + bubblegum::db::{ + save_changelog_event, upsert_asset_creators, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, + }, + mpl_bubblegum::types::Creator, + sea_orm::{ConnectionTrait, TransactionTrait}, + tracing::debug, +}; + +pub async fn process<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + if let (Some(le), Some(cl), Some(payload)) = ( + &parsing_result.leaf_update, + &parsing_result.tree_update, + &parsing_result.payload, + ) { + let (updated_creators, creator, verify) = match payload { + Payload::CreatorVerification { + metadata, + creator, + verify, + } => { + let updated_creators: Vec = metadata + .creators + .iter() + .map(|c| { + let mut c = c.clone(); + if c.address == *creator { + c.verified = *verify + }; + c + }) + .collect(); + + (updated_creators, creator, verify) + } + _ => { + return Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )); + } + }; + debug!( + "Handling creator verification event for creator {} (verify: {}): {}", + creator, verify, bundle.txn_id + ); + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + + match le.schema { + LeafSchema::V1 { + id, + owner, + delegate, + .. + } => { + let id_bytes = id.to_bytes(); + + let owner_bytes = owner.to_bytes().to_vec(); + let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { + None + } else { + Some(delegate.to_bytes().to_vec()) + }; + let tree_id = cl.id.to_bytes(); + let nonce = cl.index as i64; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Partial update of asset table with just leaf info. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + &multi_txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Upsert creators to `asset_creators` table. + upsert_asset_creators( + &multi_txn, + id_bytes.to_vec(), + &updated_creators, + bundle.slot as i64, + seq as i64, + ) + .await?; + + multi_txn.commit().await?; + } + }; + + return Ok(()); + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/db.rs b/program_transformers/src/bubblegum/db.rs new file mode 100644 index 000000000..3559438bd --- /dev/null +++ b/program_transformers/src/bubblegum/db.rs @@ -0,0 +1,634 @@ +use { + crate::error::{ProgramTransformerError, ProgramTransformerResult}, + digital_asset_types::dao::{ + asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items, + cl_audits_v2, cl_items, + sea_orm_active_enums::{ + ChainMutability, Instruction, Mutability, OwnerType, RoyaltyTargetType, + SpecificationAssetClass, SpecificationVersions, + }, + }, + mpl_bubblegum::types::{Collection, Creator}, + sea_orm::{ + entity::{ActiveValue, ColumnTrait, EntityTrait}, + query::{JsonValue, QueryFilter, QuerySelect, QueryTrait}, + sea_query::query::OnConflict, + ConnectionTrait, DbBackend, TransactionTrait, + }, + spl_account_compression::events::ChangeLogEventV1, + tracing::{debug, error, info}, +}; + +pub async fn save_changelog_event<'c, T>( + change_log_event: &ChangeLogEventV1, + slot: u64, + txn_id: &str, + txn: &T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult +where + T: ConnectionTrait + TransactionTrait, +{ + insert_change_log(change_log_event, slot, txn_id, txn, instruction, cl_audits).await?; + Ok(change_log_event.seq) +} + +const fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 { + index - 2i64.pow(tree_height) +} + +pub async fn insert_change_log<'c, T>( + change_log_event: &ChangeLogEventV1, + slot: u64, + txn_id: &str, + txn: &T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let mut i: i64 = 0; + let depth = change_log_event.path.len() - 1; + let tree_id = change_log_event.id.as_ref(); + for p in change_log_event.path.iter() { + let node_idx = p.index as i64; + debug!( + "seq {}, index {} level {}, node {:?}, txn: {:?}, instruction {}", + change_log_event.seq, + p.index, + i, + bs58::encode(p.node).into_string(), + txn_id, + instruction + ); + let leaf_idx = if i == 0 { + Some(node_idx_to_leaf_idx(node_idx, depth as u32)) + } else { + None + }; + + let item = cl_items::ActiveModel { + tree: ActiveValue::Set(tree_id.to_vec()), + level: ActiveValue::Set(i), + node_idx: ActiveValue::Set(node_idx), + hash: ActiveValue::Set(p.node.as_ref().to_vec()), + seq: ActiveValue::Set(change_log_event.seq as i64), + leaf_idx: ActiveValue::Set(leaf_idx), + ..Default::default() + }; + + i += 1; + let mut query = cl_items::Entity::insert(item) + .on_conflict( + OnConflict::columns([cl_items::Column::Tree, cl_items::Column::NodeIdx]) + .update_columns([ + cl_items::Column::Hash, + cl_items::Column::Seq, + cl_items::Column::LeafIdx, + cl_items::Column::Level, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!("{} WHERE excluded.seq > cl_items.seq", query.sql); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; + } + + // Insert the audit item after the insert into cl_items have been completed + if cl_audits { + let tx_id_bytes = bs58::decode(txn_id) + .into_vec() + .map_err(|_e| ProgramTransformerError::ChangeLogEventMalformed)?; + let ix = Instruction::from(instruction); + if ix == Instruction::Unknown { + error!("Unknown instruction: {}", instruction); + } + let audit_item_v2 = cl_audits_v2::ActiveModel { + tree: ActiveValue::Set(tree_id.to_vec()), + leaf_idx: ActiveValue::Set(change_log_event.index as i64), + seq: ActiveValue::Set(change_log_event.seq as i64), + tx: ActiveValue::Set(tx_id_bytes), + instruction: ActiveValue::Set(ix), + ..Default::default() + }; + let query = cl_audits_v2::Entity::insert(audit_item_v2) + .on_conflict( + OnConflict::columns([ + cl_audits_v2::Column::Tree, + cl_audits_v2::Column::LeafIdx, + cl_audits_v2::Column::Seq, + ]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); + match txn.execute(query).await { + Ok(_) => {} + Err(e) => { + error!("Error while inserting into cl_audits_v2: {:?}", e); + } + } + } + + // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert + // a single row into the `backfill_items` table. This way if an incomplete path was inserted + // into `cl_items` due to an error, a gap will be created for the tree and the backfiller will + // fix it. + if i - 1 == depth as i64 { + // See if the tree already exists in the `backfill_items` table. + let rows = backfill_items::Entity::find() + .filter(backfill_items::Column::Tree.eq(tree_id)) + .limit(1) + .all(txn) + .await?; + + // If the tree does not exist in `backfill_items` and the sequence number is greater than 1, + // then we know we will need to backfill the tree from sequence number 1 up to the current + // sequence number. So in this case we set at flag to force checking the tree. + let force_chk = rows.is_empty() && change_log_event.seq > 1; + + info!("Adding to backfill_items table at level {}", i - 1); + let item = backfill_items::ActiveModel { + tree: ActiveValue::Set(tree_id.to_vec()), + seq: ActiveValue::Set(change_log_event.seq as i64), + slot: ActiveValue::Set(slot as i64), + force_chk: ActiveValue::Set(force_chk), + backfilled: ActiveValue::Set(false), + failed: ActiveValue::Set(false), + ..Default::default() + }; + + backfill_items::Entity::insert(item).exec(txn).await?; + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_with_leaf_info( + txn: &T, + id: Vec, + nonce: i64, + tree_id: Vec, + leaf: Vec, + data_hash: [u8; 32], + creator_hash: [u8; 32], + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let data_hash = bs58::encode(data_hash).into_string().trim().to_string(); + let creator_hash = bs58::encode(creator_hash).into_string().trim().to_string(); + let model = asset::ActiveModel { + id: ActiveValue::Set(id), + nonce: ActiveValue::Set(Some(nonce)), + tree_id: ActiveValue::Set(Some(tree_id)), + leaf: ActiveValue::Set(Some(leaf)), + data_hash: ActiveValue::Set(Some(data_hash)), + creator_hash: ActiveValue::Set(Some(creator_hash)), + leaf_seq: ActiveValue::Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([ + asset::Column::Nonce, + asset::Column::TreeId, + asset::Column::Leaf, + asset::Column::DataHash, + asset::Column::CreatorHash, + asset::Column::LeafSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset.seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. + query.sql = format!( + "{} WHERE (asset.seq != 0 OR asset.seq IS NULL) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_asset_with_owner_and_delegate_info( + txn: &T, + id: Vec, + owner: Vec, + delegate: Option>, + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: ActiveValue::Set(id), + owner: ActiveValue::Set(Some(owner)), + delegate: ActiveValue::Set(delegate), + owner_delegate_seq: ActiveValue::Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([ + asset::Column::Owner, + asset::Column::Delegate, + asset::Column::OwnerDelegateSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset.seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. + query.sql = format!( + "{} WHERE (asset.seq != 0 OR asset.seq IS NULL) AND (excluded.owner_delegate_seq >= asset.owner_delegate_seq OR asset.owner_delegate_seq IS NULL)", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_asset_with_compression_info( + txn: &T, + id: Vec, + compressed: bool, + compressible: bool, + supply: i64, + supply_mint: Option>, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: ActiveValue::Set(id), + compressed: ActiveValue::Set(compressed), + compressible: ActiveValue::Set(compressible), + supply: ActiveValue::Set(supply), + supply_mint: ActiveValue::Set(supply_mint), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([ + asset::Column::Compressed, + asset::Column::Compressible, + asset::Column::Supply, + asset::Column::SupplyMint, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset.seq = 0). + query.sql = format!("{} WHERE asset.seq != 0 OR asset.seq IS NULL", query.sql); + txn.execute(query).await?; + + Ok(()) +} + +pub async fn upsert_asset_with_seq( + txn: &T, + id: Vec, + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: ActiveValue::Set(id), + seq: ActiveValue::Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([asset::Column::Seq]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset.seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. + query.sql = format!( + "{} WHERE (asset.seq != 0 AND excluded.seq >= asset.seq) OR asset.seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_collection_info( + txn: &T, + asset_id: Vec, + collection: Option, + slot_updated: i64, + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let (group_value, verified) = match collection { + Some(c) => (Some(c.key.to_string()), c.verified), + None => (None, false), + }; + + let model = asset_grouping::ActiveModel { + asset_id: ActiveValue::Set(asset_id), + group_key: ActiveValue::Set("collection".to_string()), + group_value: ActiveValue::Set(group_value), + verified: ActiveValue::Set(verified), + slot_updated: ActiveValue::Set(Some(slot_updated)), + group_info_seq: ActiveValue::Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_grouping::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, + ]) + .update_columns([ + asset_grouping::Column::GroupValue, + asset_grouping::Column::Verified, + asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset_grouping.group_info_seq = 0). + query.sql = format!( + "{} WHERE (asset_grouping.group_info_seq != 0 AND excluded.group_info_seq >= asset_grouping.group_info_seq) OR asset_grouping.group_info_seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_data( + txn: &T, + id: Vec, + chain_data_mutability: ChainMutability, + chain_data: JsonValue, + metadata_url: String, + metadata_mutability: Mutability, + metadata: JsonValue, + slot_updated: i64, + reindex: Option, + raw_name: Vec, + raw_symbol: Vec, + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_data::ActiveModel { + id: ActiveValue::Set(id.clone()), + chain_data_mutability: ActiveValue::Set(chain_data_mutability), + chain_data: ActiveValue::Set(chain_data), + metadata_url: ActiveValue::Set(metadata_url), + metadata_mutability: ActiveValue::Set(metadata_mutability), + metadata: ActiveValue::Set(metadata), + slot_updated: ActiveValue::Set(slot_updated), + reindex: ActiveValue::Set(reindex), + raw_name: ActiveValue::Set(Some(raw_name)), + raw_symbol: ActiveValue::Set(Some(raw_symbol)), + base_info_seq: ActiveValue::Set(Some(seq)), + }; + + let mut query = asset_data::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset_data::Column::Id]) + .update_columns([ + asset_data::Column::ChainDataMutability, + asset_data::Column::ChainData, + asset_data::Column::MetadataUrl, + asset_data::Column::MetadataMutability, + // Don't update asset_data::Column::Metadata if it already exists. Even if we + // are indexing `update_metadata`` and there's a new URI, the new background + // task will overwrite it. + asset_data::Column::SlotUpdated, + asset_data::Column::Reindex, + asset_data::Column::RawName, + asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // Do not overwrite changes that happened after decompression (asset_data.base_info_seq = 0). + // Do not overwrite changes from a later Bubblegum instruction. + query.sql = format!( + "{} WHERE (asset_data.base_info_seq != 0 AND excluded.base_info_seq >= asset_data.base_info_seq) OR asset_data.base_info_seq IS NULL", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_base_info( + txn: &T, + id: Vec, + owner_type: OwnerType, + frozen: bool, + specification_version: SpecificationVersions, + specification_asset_class: SpecificationAssetClass, + royalty_target_type: RoyaltyTargetType, + royalty_target: Option>, + royalty_amount: i32, + slot_updated: i64, + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + // Set base info for asset. + let asset_model = asset::ActiveModel { + id: ActiveValue::Set(id.clone()), + owner_type: ActiveValue::Set(owner_type), + frozen: ActiveValue::Set(frozen), + specification_version: ActiveValue::Set(Some(specification_version)), + specification_asset_class: ActiveValue::Set(Some(specification_asset_class)), + royalty_target_type: ActiveValue::Set(royalty_target_type), + royalty_target: ActiveValue::Set(royalty_target), + royalty_amount: ActiveValue::Set(royalty_amount), + asset_data: ActiveValue::Set(Some(id.clone())), + slot_updated: ActiveValue::Set(Some(slot_updated)), + base_info_seq: ActiveValue::Set(Some(seq)), + ..Default::default() + }; + + // Upsert asset table base info. + let mut query = asset::Entity::insert(asset_model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([ + asset::Column::OwnerType, + asset::Column::Frozen, + asset::Column::SpecificationVersion, + asset::Column::SpecificationAssetClass, + asset::Column::RoyaltyTargetType, + asset::Column::RoyaltyTarget, + asset::Column::RoyaltyAmount, + asset::Column::AssetData, + asset::Column::SlotUpdated, + asset::Column::BaseInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE (asset.seq != 0 OR asset.seq IS NULL) AND (excluded.base_info_seq >= asset.base_info_seq OR asset.base_info_seq IS NULL)", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_creators( + txn: &T, + id: Vec, + creators: &Vec, + slot_updated: i64, + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let db_creators = if creators.is_empty() { + // If creators are empty, insert an empty creator with the current sequence. + // This prevents accidental errors during out-of-order updates. + vec![asset_creators::ActiveModel { + asset_id: ActiveValue::Set(id.clone()), + position: ActiveValue::Set(0), + creator: ActiveValue::Set(vec![]), + share: ActiveValue::Set(100), + verified: ActiveValue::Set(false), + slot_updated: ActiveValue::Set(Some(slot_updated)), + seq: ActiveValue::Set(Some(seq)), + ..Default::default() + }] + } else { + creators + .iter() + .enumerate() + .map(|(i, c)| asset_creators::ActiveModel { + asset_id: ActiveValue::Set(id.clone()), + position: ActiveValue::Set(i as i16), + creator: ActiveValue::Set(c.address.to_bytes().to_vec()), + share: ActiveValue::Set(c.share as i32), + verified: ActiveValue::Set(c.verified), + slot_updated: ActiveValue::Set(Some(slot_updated)), + seq: ActiveValue::Set(Some(seq)), + ..Default::default() + }) + .collect() + }; + + // This statement will update base information for each creator. + let mut query = asset_creators::Entity::insert_many(db_creators) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Position, + ]) + .update_columns([ + asset_creators::Column::Creator, + asset_creators::Column::Share, + asset_creators::Column::Verified, + asset_creators::Column::Seq, + asset_creators::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!( + "{} WHERE (asset_creators.seq != 0 AND excluded.seq >= asset_creators.seq) OR asset_creators.seq IS NULL", + query.sql + ); + + txn.execute(query).await?; + + Ok(()) +} + +pub async fn upsert_asset_authority( + txn: &T, + asset_id: Vec, + authority: Vec, + slot_updated: i64, + seq: i64, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_authority::ActiveModel { + asset_id: ActiveValue::Set(asset_id), + authority: ActiveValue::Set(authority), + seq: ActiveValue::Set(seq), + slot_updated: ActiveValue::Set(slot_updated), + ..Default::default() + }; + + // This value is only written during `mint_V1`` or after an item is decompressed, so do not + // attempt to modify any existing values: + // `ON CONFLICT ('asset_id') DO NOTHING`. + let query = asset_authority::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset_authority::Column::AssetId]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); + + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + + Ok(()) +} diff --git a/program_transformers/src/bubblegum/delegate.rs b/program_transformers/src/bubblegum/delegate.rs new file mode 100644 index 000000000..b491ab5b4 --- /dev/null +++ b/program_transformers/src/bubblegum/delegate.rs @@ -0,0 +1,84 @@ +use { + crate::{ + bubblegum::db::{ + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema}, + }, + sea_orm::{ConnectionTrait, TransactionTrait}, +}; + +pub async fn delegate<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + match le.schema { + LeafSchema::V1 { + id, + owner, + delegate, + .. + } => { + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); + let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { + None + } else { + Some(delegate.to_bytes().to_vec()) + }; + let tree_id = cl.id.to_bytes(); + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + cl.index as i64, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + &multi_txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); + } + } + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/mint_v1.rs b/program_transformers/src/bubblegum/mint_v1.rs new file mode 100644 index 000000000..a293eef8c --- /dev/null +++ b/program_transformers/src/bubblegum/mint_v1.rs @@ -0,0 +1,220 @@ +use { + crate::{ + bubblegum::{ + bgum_use_method_to_token_metadata_use_method, + db::{ + save_changelog_event, upsert_asset_authority, upsert_asset_base_info, + upsert_asset_creators, upsert_asset_data, upsert_asset_with_compression_info, + upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, + upsert_asset_with_seq, upsert_collection_info, + }, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + DownloadMetadataInfo, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, + token_metadata::types::{TokenStandard, Uses}, + }, + digital_asset_types::{ + dao::sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, + }, + json::ChainDataV1, + }, + sea_orm::{query::JsonValue, ConnectionTrait, TransactionTrait}, + tracing::warn, +}; + +pub async fn mint_v1<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult> +where + T: ConnectionTrait + TransactionTrait, +{ + if let ( + Some(le), + Some(cl), + Some(Payload::MintV1 { + args, + authority, + tree_id, + }), + ) = ( + &parsing_result.leaf_update, + &parsing_result.tree_update, + &parsing_result.payload, + ) { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + let metadata = args; + #[allow(unreachable_patterns)] + return match le.schema { + LeafSchema::V1 { + id, + delegate, + owner, + nonce, + .. + } => { + let id_bytes = id.to_bytes(); + let slot_i = bundle.slot as i64; + let uri = metadata.uri.replace('\0', ""); + let name = metadata.name.clone().into_bytes(); + let symbol = metadata.symbol.clone().into_bytes(); + let mut chain_data = ChainDataV1 { + name: metadata.name.clone(), + symbol: metadata.symbol.clone(), + edition_nonce: metadata.edition_nonce, + primary_sale_happened: metadata.primary_sale_happened, + token_standard: Some(TokenStandard::NonFungible), + uses: metadata.uses.clone().map(|u| Uses { + use_method: bgum_use_method_to_token_metadata_use_method(u.use_method), + remaining: u.remaining, + total: u.total, + }), + }; + chain_data.sanitize(); + let chain_data_json = serde_json::to_value(chain_data) + .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; + let chain_mutability = match metadata.is_mutable { + true => ChainMutability::Mutable, + false => ChainMutability::Immutable, + }; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + upsert_asset_data( + &multi_txn, + id_bytes.to_vec(), + chain_mutability, + chain_data_json, + uri.clone(), + Mutability::Mutable, + JsonValue::String("processing".to_string()), + slot_i, + Some(true), + name.to_vec(), + symbol.to_vec(), + seq as i64, + ) + .await?; + + // Upsert `asset` table base info. + let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { + None + } else { + Some(delegate.to_bytes().to_vec()) + }; + + // Upsert `asset` table base info and `asset_creators` table. + upsert_asset_base_info( + &multi_txn, + id_bytes.to_vec(), + OwnerType::Single, + false, + SpecificationVersions::V1, + SpecificationAssetClass::Nft, + RoyaltyTargetType::Creators, + None, + metadata.seller_fee_basis_points as i32, + slot_i, + seq as i64, + ) + .await?; + + // Partial update of asset table with just compression info elements. + upsert_asset_with_compression_info( + &multi_txn, + id_bytes.to_vec(), + true, + false, + 1, + None, + ) + .await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce as i64, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + &multi_txn, + id_bytes.to_vec(), + owner.to_bytes().to_vec(), + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Upsert creators to `asset_creators` table. + upsert_asset_creators( + &multi_txn, + id_bytes.to_vec(), + &metadata.creators, + slot_i, + seq as i64, + ) + .await?; + + // Insert into `asset_authority` table. + //TODO - we need to remove the optional bubblegum signer logic + upsert_asset_authority( + &multi_txn, + id_bytes.to_vec(), + authority.to_vec(), + seq as i64, + slot_i, + ) + .await?; + + // Upsert into `asset_grouping` table with base collection info. + upsert_collection_info( + &multi_txn, + id_bytes.to_vec(), + metadata.collection.clone(), + slot_i, + seq as i64, + ) + .await?; + + multi_txn.commit().await?; + + if uri.is_empty() { + warn!( + "URI is empty for mint {}. Skipping background task.", + bs58::encode(id).into_string() + ); + return Ok(None); + } + + Ok(Some(DownloadMetadataInfo::new(id_bytes.to_vec(), uri))) + } + _ => Err(ProgramTransformerError::NotImplemented), + }; + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/mod.rs b/program_transformers/src/bubblegum/mod.rs new file mode 100644 index 000000000..07659011f --- /dev/null +++ b/program_transformers/src/bubblegum/mod.rs @@ -0,0 +1,133 @@ +use { + crate::{ + error::{ProgramTransformerError, ProgramTransformerResult}, + DownloadMetadataNotifier, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{ + BubblegumInstruction, InstructionName, UseMethod as BubblegumUseMethod, + }, + token_metadata::types::UseMethod as TokenMetadataUseMethod, + }, + sea_orm::{ConnectionTrait, TransactionTrait}, + tracing::{debug, info}, +}; + +mod burn; +mod cancel_redeem; +mod collection_verification; +mod creator_verification; +mod db; +mod delegate; +mod mint_v1; +mod redeem; +mod transfer; +mod update_metadata; + +pub async fn handle_bubblegum_instruction<'c, T>( + parsing_result: &'c BubblegumInstruction, + bundle: &'c InstructionBundle<'c>, + txn: &T, + download_metadata_notifier: &DownloadMetadataNotifier, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + let ix_type = &parsing_result.instruction; + + // @TODO this would be much better served by implemneting Debug trait on the InstructionName + // or wrapping it into something that can display it more neatly. + let ix_str = match ix_type { + InstructionName::Unknown => "Unknown", + InstructionName::MintV1 => "MintV1", + InstructionName::MintToCollectionV1 => "MintToCollectionV1", + InstructionName::Redeem => "Redeem", + InstructionName::CancelRedeem => "CancelRedeem", + InstructionName::Transfer => "Transfer", + InstructionName::Delegate => "Delegate", + InstructionName::DecompressV1 => "DecompressV1", + InstructionName::Compress => "Compress", + InstructionName::Burn => "Burn", + InstructionName::CreateTree => "CreateTree", + InstructionName::VerifyCreator => "VerifyCreator", + InstructionName::UnverifyCreator => "UnverifyCreator", + InstructionName::VerifyCollection => "VerifyCollection", + InstructionName::UnverifyCollection => "UnverifyCollection", + InstructionName::SetAndVerifyCollection => "SetAndVerifyCollection", + InstructionName::SetDecompressibleState => "SetDecompressibleState", + InstructionName::UpdateMetadata => "UpdateMetadata", + }; + info!("BGUM instruction txn={:?}: {:?}", ix_str, bundle.txn_id); + + match ix_type { + InstructionName::Transfer => { + transfer::transfer(parsing_result, bundle, txn, ix_str, cl_audits).await?; + } + InstructionName::Burn => { + burn::burn(parsing_result, bundle, txn, ix_str, cl_audits).await?; + } + InstructionName::Delegate => { + delegate::delegate(parsing_result, bundle, txn, ix_str, cl_audits).await?; + } + InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { + if let Some(info) = + mint_v1::mint_v1(parsing_result, bundle, txn, ix_str, cl_audits).await? + { + download_metadata_notifier(info) + .await + .map_err(ProgramTransformerError::DownloadMetadataNotify)?; + } + } + InstructionName::Redeem => { + redeem::redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?; + } + InstructionName::CancelRedeem => { + cancel_redeem::cancel_redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?; + } + InstructionName::DecompressV1 => { + debug!("No action necessary for decompression") + } + InstructionName::VerifyCreator | InstructionName::UnverifyCreator => { + creator_verification::process(parsing_result, bundle, txn, ix_str, cl_audits).await?; + } + InstructionName::VerifyCollection + | InstructionName::UnverifyCollection + | InstructionName::SetAndVerifyCollection => { + collection_verification::process(parsing_result, bundle, txn, ix_str, cl_audits) + .await?; + } + InstructionName::SetDecompressibleState => (), // Nothing to index. + InstructionName::UpdateMetadata => { + if let Some(info) = + update_metadata::update_metadata(parsing_result, bundle, txn, ix_str, cl_audits) + .await? + { + download_metadata_notifier(info) + .await + .map_err(ProgramTransformerError::DownloadMetadataNotify)?; + } + } + _ => debug!("Bubblegum: Not Implemented Instruction"), + } + Ok(()) +} + +// PDA lookup requires an 8-byte array. +fn u32_to_u8_array(value: u32) -> [u8; 8] { + let bytes: [u8; 4] = value.to_le_bytes(); + let mut result: [u8; 8] = [0; 8]; + result[..4].copy_from_slice(&bytes); + result +} + +const fn bgum_use_method_to_token_metadata_use_method( + bubblegum_use_method: BubblegumUseMethod, +) -> TokenMetadataUseMethod { + match bubblegum_use_method { + BubblegumUseMethod::Burn => TokenMetadataUseMethod::Burn, + BubblegumUseMethod::Multiple => TokenMetadataUseMethod::Multiple, + BubblegumUseMethod::Single => TokenMetadataUseMethod::Single, + } +} diff --git a/program_transformers/src/bubblegum/redeem.rs b/program_transformers/src/bubblegum/redeem.rs new file mode 100644 index 000000000..e6a6080a3 --- /dev/null +++ b/program_transformers/src/bubblegum/redeem.rs @@ -0,0 +1,69 @@ +use { + crate::{ + bubblegum::{ + db::{save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_seq}, + u32_to_u8_array, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + }, + blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}, + sea_orm::{ConnectionTrait, TransactionTrait}, + solana_sdk::pubkey::Pubkey, + tracing::debug, +}; + +pub async fn redeem<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + if let Some(cl) = &parsing_result.tree_update { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + let leaf_index = cl.index; + let (asset_id, _) = Pubkey::find_program_address( + &[ + "asset".as_bytes(), + cl.id.as_ref(), + u32_to_u8_array(leaf_index).as_ref(), + ], + &mpl_bubblegum::ID, + ); + debug!("Indexing redeem for asset id: {:?}", asset_id); + let id_bytes = asset_id.to_bytes(); + let tree_id = cl.id.to_bytes(); + let nonce = cl.index as i64; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce, + tree_id.to_vec(), + vec![0; 32], + [0; 32], + [0; 32], + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/transfer.rs b/program_transformers/src/bubblegum/transfer.rs new file mode 100644 index 000000000..9aa6a3311 --- /dev/null +++ b/program_transformers/src/bubblegum/transfer.rs @@ -0,0 +1,85 @@ +use { + crate::{ + bubblegum::db::{ + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema}, + }, + sea_orm::{ConnectionTrait, TransactionTrait}, +}; + +pub async fn transfer<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult<()> +where + T: ConnectionTrait + TransactionTrait, +{ + if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + match le.schema { + LeafSchema::V1 { + id, + owner, + delegate, + .. + } => { + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); + let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { + None + } else { + Some(delegate.to_bytes().to_vec()) + }; + let tree_id = cl.id.to_bytes(); + let nonce = cl.index as i64; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + &multi_txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + multi_txn.commit().await?; + + return Ok(()); + } + } + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/bubblegum/update_metadata.rs b/program_transformers/src/bubblegum/update_metadata.rs new file mode 100644 index 000000000..19d9ca615 --- /dev/null +++ b/program_transformers/src/bubblegum/update_metadata.rs @@ -0,0 +1,201 @@ +use { + crate::{ + bubblegum::{ + bgum_use_method_to_token_metadata_use_method, + db::{ + save_changelog_event, upsert_asset_base_info, upsert_asset_creators, + upsert_asset_data, upsert_asset_with_leaf_info, upsert_asset_with_seq, + }, + }, + error::{ProgramTransformerError, ProgramTransformerResult}, + DownloadMetadataInfo, + }, + blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, + token_metadata::types::{TokenStandard, Uses}, + }, + digital_asset_types::{ + dao::sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, + }, + json::ChainDataV1, + }, + sea_orm::{query::*, ConnectionTrait, JsonValue}, + tracing::warn, +}; + +pub async fn update_metadata<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + instruction: &str, + cl_audits: bool, +) -> ProgramTransformerResult> +where + T: ConnectionTrait + TransactionTrait, +{ + if let ( + Some(le), + Some(cl), + Some(Payload::UpdateMetadata { + current_metadata, + update_args, + tree_id, + }), + ) = ( + &parsing_result.leaf_update, + &parsing_result.tree_update, + &parsing_result.payload, + ) { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; + + #[allow(unreachable_patterns)] + return match le.schema { + LeafSchema::V1 { id, nonce, .. } => { + let id_bytes = id.to_bytes(); + let slot_i = bundle.slot as i64; + + let uri = if let Some(uri) = &update_args.uri { + uri.replace('\0', "") + } else { + current_metadata.uri.replace('\0', "") + }; + + let name = if let Some(name) = update_args.name.clone() { + name + } else { + current_metadata.name.clone() + }; + + let symbol = if let Some(symbol) = update_args.symbol.clone() { + symbol + } else { + current_metadata.symbol.clone() + }; + + let primary_sale_happened = + if let Some(primary_sale_happened) = update_args.primary_sale_happened { + primary_sale_happened + } else { + current_metadata.primary_sale_happened + }; + + let mut chain_data = ChainDataV1 { + name: name.clone(), + symbol: symbol.clone(), + edition_nonce: current_metadata.edition_nonce, + primary_sale_happened, + token_standard: Some(TokenStandard::NonFungible), + uses: current_metadata.uses.clone().map(|u| Uses { + use_method: bgum_use_method_to_token_metadata_use_method(u.use_method), + remaining: u.remaining, + total: u.total, + }), + }; + chain_data.sanitize(); + let chain_data_json = serde_json::to_value(chain_data) + .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; + + let is_mutable = if let Some(is_mutable) = update_args.is_mutable { + is_mutable + } else { + current_metadata.is_mutable + }; + + let chain_mutability = if is_mutable { + ChainMutability::Mutable + } else { + ChainMutability::Immutable + }; + + // Begin a transaction. If the transaction goes out of scope (i.e. one of the executions has + // an error and this function returns it using the `?` operator), then the transaction is + // automatically rolled back. + let multi_txn = txn.begin().await?; + + upsert_asset_data( + &multi_txn, + id_bytes.to_vec(), + chain_mutability, + chain_data_json, + uri.clone(), + Mutability::Mutable, + JsonValue::String("processing".to_string()), + slot_i, + Some(true), + name.into_bytes().to_vec(), + symbol.into_bytes().to_vec(), + seq as i64, + ) + .await?; + + // Upsert `asset` table base info. + let seller_fee_basis_points = + if let Some(seller_fee_basis_points) = update_args.seller_fee_basis_points { + seller_fee_basis_points + } else { + current_metadata.seller_fee_basis_points + }; + + let creators = if let Some(creators) = &update_args.creators { + creators + } else { + ¤t_metadata.creators + }; + + upsert_asset_base_info( + &multi_txn, + id_bytes.to_vec(), + OwnerType::Single, + false, + SpecificationVersions::V1, + SpecificationAssetClass::Nft, + RoyaltyTargetType::Creators, + None, + seller_fee_basis_points as i32, + slot_i, + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + &multi_txn, + id_bytes.to_vec(), + nonce as i64, + tree_id.to_vec(), + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Upsert creators to `asset_creators` table. + upsert_asset_creators(&multi_txn, id_bytes.to_vec(), creators, slot_i, seq as i64) + .await?; + + multi_txn.commit().await?; + + if uri.is_empty() { + warn!( + "URI is empty for mint {}. Skipping background task.", + bs58::encode(id).into_string() + ); + return Ok(None); + } + + Ok(Some(DownloadMetadataInfo::new(id_bytes.to_vec(), uri))) + } + _ => Err(ProgramTransformerError::NotImplemented), + }; + } + Err(ProgramTransformerError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/program_transformers/src/error.rs b/program_transformers/src/error.rs new file mode 100644 index 000000000..d0c29f383 --- /dev/null +++ b/program_transformers/src/error.rs @@ -0,0 +1,37 @@ +use {blockbuster::error::BlockbusterError, sea_orm::DbErr}; + +pub type ProgramTransformerResult = Result; + +#[derive(Debug, thiserror::Error)] +pub enum ProgramTransformerError { + #[error("ChangeLog Event Malformed")] + ChangeLogEventMalformed, + #[error("Storage Write Error: {0}")] + StorageWriteError(String), + #[error("NotImplemented")] + NotImplemented, + #[error("Deserialization Error: {0}")] + DeserializationError(String), + #[error("Data serializaton error: {0}")] + SerializatonError(String), + #[error("Blockbuster Parsing error: {0}")] + ParsingError(String), + #[error("Database Error: {0}")] + DatabaseError(String), + #[error("AssetIndex Error {0}")] + AssetIndexError(String), + #[error("Failed to notify about download metadata: {0}")] + DownloadMetadataNotify(Box), +} + +impl From for ProgramTransformerError { + fn from(err: BlockbusterError) -> Self { + ProgramTransformerError::ParsingError(err.to_string()) + } +} + +impl From for ProgramTransformerError { + fn from(e: DbErr) -> Self { + ProgramTransformerError::StorageWriteError(e.to_string()) + } +} diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs new file mode 100644 index 000000000..6fa720f6a --- /dev/null +++ b/program_transformers/src/lib.rs @@ -0,0 +1,226 @@ +use { + crate::{ + bubblegum::handle_bubblegum_instruction, + error::{ProgramTransformerError, ProgramTransformerResult}, + token::handle_token_program_account, + token_metadata::handle_token_metadata_account, + }, + blockbuster::{ + instruction::{order_instructions, InstructionBundle, IxPair}, + program_handler::ProgramParser, + programs::{ + bubblegum::BubblegumParser, token_account::TokenAccountParser, + token_metadata::TokenMetadataParser, ProgramParseResult, + }, + }, + futures::future::BoxFuture, + plerkle_serialization::{AccountInfo, Pubkey as FBPubkey, TransactionInfo}, + sea_orm::{DatabaseConnection, SqlxPostgresConnector}, + solana_sdk::pubkey::Pubkey, + sqlx::PgPool, + std::collections::{HashMap, HashSet, VecDeque}, + tracing::{debug, error, info}, +}; + +mod bubblegum; +mod error; +mod token; +mod token_metadata; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DownloadMetadataInfo { + asset_data_id: Vec, + uri: String, +} + +impl DownloadMetadataInfo { + pub fn new(asset_data_id: Vec, uri: String) -> Self { + Self { + asset_data_id, + uri: uri.trim().replace('\0', ""), + } + } + + pub fn into_inner(self) -> (Vec, String) { + (self.asset_data_id, self.uri) + } +} + +pub type DownloadMetadataNotifier = Box< + dyn Fn( + DownloadMetadataInfo, + ) -> BoxFuture<'static, Result<(), Box>>, +>; + +pub struct ProgramTransformer { + storage: DatabaseConnection, + download_metadata_notifier: DownloadMetadataNotifier, + parsers: HashMap>, + key_set: HashSet, + cl_audits: bool, +} + +impl ProgramTransformer { + pub fn new( + pool: PgPool, + download_metadata_notifier: DownloadMetadataNotifier, + cl_audits: bool, + ) -> Self { + let mut parsers: HashMap> = HashMap::with_capacity(3); + let bgum = BubblegumParser {}; + let token_metadata = TokenMetadataParser {}; + let token = TokenAccountParser {}; + parsers.insert(bgum.key(), Box::new(bgum)); + parsers.insert(token_metadata.key(), Box::new(token_metadata)); + parsers.insert(token.key(), Box::new(token)); + let hs = parsers.iter().fold(HashSet::new(), |mut acc, (k, _)| { + acc.insert(*k); + acc + }); + let pool: PgPool = pool; + ProgramTransformer { + storage: SqlxPostgresConnector::from_sqlx_postgres_pool(pool), + download_metadata_notifier, + parsers, + key_set: hs, + cl_audits, + } + } + + pub fn break_transaction<'i>( + &self, + tx: &'i TransactionInfo<'i>, + ) -> VecDeque<(IxPair<'i>, Option>>)> { + let ref_set: HashSet<&[u8]> = self.key_set.iter().map(|k| k.as_ref()).collect(); + order_instructions(ref_set, tx) + } + + #[allow(clippy::borrowed_box)] + pub fn match_program(&self, key: &FBPubkey) -> Option<&Box> { + match Pubkey::try_from(key.0.as_slice()) { + Ok(pubkey) => self.parsers.get(&pubkey), + Err(_error) => { + error!("failed to parse key: {key:?}"); + None + } + } + } + + pub async fn handle_transaction<'a>( + &self, + tx: &'a TransactionInfo<'a>, + ) -> ProgramTransformerResult<()> { + let sig: Option<&str> = tx.signature(); + info!("Handling Transaction: {:?}", sig); + let instructions = self.break_transaction(tx); + let accounts = tx.account_keys().unwrap_or_default(); + let slot = tx.slot(); + let txn_id = tx.signature().unwrap_or(""); + let mut keys: Vec = Vec::with_capacity(accounts.len()); + for k in accounts.into_iter() { + keys.push(*k); + } + let mut not_impl = 0; + let ixlen = instructions.len(); + debug!("Instructions: {}", ixlen); + let contains = instructions + .iter() + .filter(|(ib, _inner)| ib.0 .0.as_ref() == mpl_bubblegum::ID.as_ref()); + debug!("Instructions bgum: {}", contains.count()); + for (outer_ix, inner_ix) in instructions { + let (program, instruction) = outer_ix; + let ix_accounts = instruction.accounts().unwrap().iter().collect::>(); + let ix_account_len = ix_accounts.len(); + let max = ix_accounts.iter().max().copied().unwrap_or(0) as usize; + if keys.len() < max { + return Err(ProgramTransformerError::DeserializationError( + "Missing Accounts in Serialized Ixn/Txn".to_string(), + )); + } + let ix_accounts = + ix_accounts + .iter() + .fold(Vec::with_capacity(ix_account_len), |mut acc, a| { + if let Some(key) = keys.get(*a as usize) { + acc.push(*key); + } + acc + }); + let ix = InstructionBundle { + txn_id, + program, + instruction: Some(instruction), + inner_ix, + keys: ix_accounts.as_slice(), + slot, + }; + + if let Some(program) = self.match_program(&ix.program) { + debug!("Found a ix for program: {:?}", program.key()); + let result = program.handle_instruction(&ix)?; + let concrete = result.result_type(); + match concrete { + ProgramParseResult::Bubblegum(parsing_result) => { + handle_bubblegum_instruction( + parsing_result, + &ix, + &self.storage, + &self.download_metadata_notifier, + self.cl_audits, + ) + .await + .map_err(|err| { + error!( + "Failed to handle bubblegum instruction for txn {:?}: {:?}", + sig, err + ); + err + })?; + } + _ => { + not_impl += 1; + } + }; + } + } + + if not_impl == ixlen { + debug!("Not imple"); + return Err(ProgramTransformerError::NotImplemented); + } + Ok(()) + } + + pub async fn handle_account_update<'b>( + &self, + acct: AccountInfo<'b>, + ) -> ProgramTransformerResult<()> { + let owner = acct.owner().unwrap(); + if let Some(program) = self.match_program(owner) { + let result = program.handle_account(&acct)?; + let concrete = result.result_type(); + match concrete { + ProgramParseResult::TokenMetadata(parsing_result) => { + handle_token_metadata_account( + &acct, + parsing_result, + &self.storage, + &self.download_metadata_notifier, + ) + .await + } + ProgramParseResult::TokenProgramAccount(parsing_result) => { + handle_token_program_account( + &acct, + parsing_result, + &self.storage, + &self.download_metadata_notifier, + ) + .await + } + _ => Err(ProgramTransformerError::NotImplemented), + }?; + } + Ok(()) + } +} diff --git a/program_transformers/src/token/mod.rs b/program_transformers/src/token/mod.rs new file mode 100644 index 000000000..6855f2de2 --- /dev/null +++ b/program_transformers/src/token/mod.rs @@ -0,0 +1,167 @@ +use { + crate::{ + error::{ProgramTransformerError, ProgramTransformerResult}, + DownloadMetadataNotifier, + }, + blockbuster::programs::token_account::TokenProgramAccount, + digital_asset_types::dao::{asset, sea_orm_active_enums::OwnerType, token_accounts, tokens}, + plerkle_serialization::AccountInfo, + sea_orm::{ + entity::{ActiveModelTrait, ActiveValue, ColumnTrait}, + query::{QueryFilter, QueryTrait}, + sea_query::query::OnConflict, + ConnectionTrait, DatabaseConnection, DbBackend, EntityTrait, TransactionTrait, + }, + solana_sdk::program_option::COption, + spl_token::state::AccountState, +}; + +pub async fn handle_token_program_account<'a, 'b, 'c>( + account_update: &'a AccountInfo<'a>, + parsing_result: &'b TokenProgramAccount, + db: &'c DatabaseConnection, + _download_metadata_notifier: &DownloadMetadataNotifier, +) -> ProgramTransformerResult<()> { + let key = *account_update.pubkey().unwrap(); + let key_bytes = key.0.to_vec(); + let spl_token_program = account_update.owner().unwrap().0.to_vec(); + match &parsing_result { + TokenProgramAccount::TokenAccount(ta) => { + let mint = ta.mint.to_bytes().to_vec(); + let delegate: Option> = match ta.delegate { + COption::Some(d) => Some(d.to_bytes().to_vec()), + COption::None => None, + }; + let frozen = matches!(ta.state, AccountState::Frozen); + let owner = ta.owner.to_bytes().to_vec(); + let model = token_accounts::ActiveModel { + pubkey: ActiveValue::Set(key_bytes), + mint: ActiveValue::Set(mint.clone()), + delegate: ActiveValue::Set(delegate.clone()), + owner: ActiveValue::Set(owner.clone()), + frozen: ActiveValue::Set(frozen), + delegated_amount: ActiveValue::Set(ta.delegated_amount as i64), + token_program: ActiveValue::Set(spl_token_program), + slot_updated: ActiveValue::Set(account_update.slot() as i64), + amount: ActiveValue::Set(ta.amount as i64), + close_authority: ActiveValue::Set(None), + }; + + let mut query = token_accounts::Entity::insert(model) + .on_conflict( + OnConflict::columns([token_accounts::Column::Pubkey]) + .update_columns([ + token_accounts::Column::Mint, + token_accounts::Column::DelegatedAmount, + token_accounts::Column::Delegate, + token_accounts::Column::Amount, + token_accounts::Column::Frozen, + token_accounts::Column::TokenProgram, + token_accounts::Column::Owner, + token_accounts::Column::CloseAuthority, + token_accounts::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > token_accounts.slot_updated", + query.sql + ); + db.execute(query).await?; + let txn = db.begin().await?; + let asset_update: Option = asset::Entity::find_by_id(mint) + .filter(asset::Column::OwnerType.eq("single")) + .one(&txn) + .await?; + if let Some(asset) = asset_update { + // will only update owner if token account balance is non-zero + // since the asset is marked as single then the token account balance can only be 1. Greater implies a fungible token in which case no si + // TODO: this does not guarantee in case when wallet receives an amount of 1 for a token but its supply is more. is unlikely since mints often have a decimal + if ta.amount == 1 { + let mut active: asset::ActiveModel = asset.into(); + active.owner = ActiveValue::Set(Some(owner)); + active.delegate = ActiveValue::Set(delegate); + active.frozen = ActiveValue::Set(frozen); + active.save(&txn).await?; + } + } + txn.commit().await?; + Ok(()) + } + TokenProgramAccount::Mint(m) => { + let freeze_auth: Option> = match m.freeze_authority { + COption::Some(d) => Some(d.to_bytes().to_vec()), + COption::None => None, + }; + let mint_auth: Option> = match m.mint_authority { + COption::Some(d) => Some(d.to_bytes().to_vec()), + COption::None => None, + }; + let model = tokens::ActiveModel { + mint: ActiveValue::Set(key_bytes.clone()), + token_program: ActiveValue::Set(spl_token_program), + slot_updated: ActiveValue::Set(account_update.slot() as i64), + supply: ActiveValue::Set(m.supply as i64), + decimals: ActiveValue::Set(m.decimals as i32), + close_authority: ActiveValue::Set(None), + extension_data: ActiveValue::Set(None), + mint_authority: ActiveValue::Set(mint_auth), + freeze_authority: ActiveValue::Set(freeze_auth), + }; + + let mut query = tokens::Entity::insert(model) + .on_conflict( + OnConflict::columns([tokens::Column::Mint]) + .update_columns([ + tokens::Column::Supply, + tokens::Column::TokenProgram, + tokens::Column::MintAuthority, + tokens::Column::CloseAuthority, + tokens::Column::ExtensionData, + tokens::Column::SlotUpdated, + tokens::Column::Decimals, + tokens::Column::FreezeAuthority, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated >= tokens.slot_updated", + query.sql + ); + db.execute(query).await?; + + let asset_update: Option = asset::Entity::find_by_id(key_bytes.clone()) + .filter( + asset::Column::OwnerType + .eq(OwnerType::Single) + .or(asset::Column::OwnerType + .eq(OwnerType::Unknown) + .and(asset::Column::Supply.eq(1))), + ) + .one(db) + .await?; + if let Some(asset) = asset_update { + let mut active: asset::ActiveModel = asset.clone().into(); + active.supply = ActiveValue::Set(m.supply as i64); + active.supply_mint = ActiveValue::Set(Some(key_bytes)); + + // Update owner_type based on the supply. + if asset.owner_type == OwnerType::Unknown { + active.owner_type = match m.supply.cmp(&1) { + std::cmp::Ordering::Equal => ActiveValue::Set(OwnerType::Single), + std::cmp::Ordering::Greater => ActiveValue::Set(OwnerType::Token), + _ => ActiveValue::NotSet, + } + } + + active.save(db).await?; + } + + Ok(()) + } + _ => Err(ProgramTransformerError::NotImplemented), + }?; + Ok(()) +} diff --git a/program_transformers/src/token_metadata/master_edition.rs b/program_transformers/src/token_metadata/master_edition.rs new file mode 100644 index 000000000..e62d89932 --- /dev/null +++ b/program_transformers/src/token_metadata/master_edition.rs @@ -0,0 +1,106 @@ +use { + crate::error::{ProgramTransformerError, ProgramTransformerResult}, + blockbuster::token_metadata::{ + accounts::{DeprecatedMasterEditionV1, MasterEdition}, + types::Key, + }, + digital_asset_types::dao::{ + asset, asset_v1_account_attachments, extensions, + sea_orm_active_enums::{SpecificationAssetClass, V1AccountAttachments}, + }, + plerkle_serialization::Pubkey as FBPubkey, + sea_orm::{ + entity::{ActiveModelTrait, ActiveValue, EntityTrait, RelationTrait}, + query::{JoinType, QuerySelect, QueryTrait}, + sea_query::query::OnConflict, + ConnectionTrait, DatabaseTransaction, DbBackend, + }, +}; + +pub async fn save_v2_master_edition( + id: FBPubkey, + slot: u64, + me_data: &MasterEdition, + txn: &DatabaseTransaction, +) -> ProgramTransformerResult<()> { + save_master_edition( + V1AccountAttachments::MasterEditionV2, + id, + slot, + me_data, + txn, + ) + .await +} + +pub async fn save_v1_master_edition( + id: FBPubkey, + slot: u64, + me_data: &DeprecatedMasterEditionV1, + txn: &DatabaseTransaction, +) -> ProgramTransformerResult<()> { + // This discards the deprecated `MasterEditionV1` fields + // but sets the `Key`` as `MasterEditionV1`. + let bridge = MasterEdition { + supply: me_data.supply, + max_supply: me_data.max_supply, + key: Key::MasterEditionV1, + }; + save_master_edition( + V1AccountAttachments::MasterEditionV1, + id, + slot, + &bridge, + txn, + ) + .await +} +pub async fn save_master_edition( + version: V1AccountAttachments, + id: FBPubkey, + slot: u64, + me_data: &MasterEdition, + txn: &DatabaseTransaction, +) -> ProgramTransformerResult<()> { + let id_bytes = id.0.to_vec(); + let master_edition: Option<(asset_v1_account_attachments::Model, Option)> = + asset_v1_account_attachments::Entity::find_by_id(id.0.to_vec()) + .find_also_related(asset::Entity) + .join( + JoinType::InnerJoin, + extensions::asset::Relation::AssetData.def(), + ) + .one(txn) + .await?; + let ser = serde_json::to_value(me_data) + .map_err(|e| ProgramTransformerError::SerializatonError(e.to_string()))?; + + let model = asset_v1_account_attachments::ActiveModel { + id: ActiveValue::Set(id_bytes), + attachment_type: ActiveValue::Set(version), + data: ActiveValue::Set(Some(ser)), + slot_updated: ActiveValue::Set(slot as i64), + ..Default::default() + }; + + if let Some((_me, Some(asset))) = master_edition { + let mut updatable: asset::ActiveModel = asset.into(); + updatable.supply = ActiveValue::Set(1); + updatable.specification_asset_class = ActiveValue::Set(Some(SpecificationAssetClass::Nft)); + updatable.update(txn).await?; + } + + let query = asset_v1_account_attachments::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset_v1_account_attachments::Column::Id]) + .update_columns([ + asset_v1_account_attachments::Column::AttachmentType, + asset_v1_account_attachments::Column::Data, + asset_v1_account_attachments::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query).await?; + Ok(()) +} diff --git a/program_transformers/src/token_metadata/mod.rs b/program_transformers/src/token_metadata/mod.rs new file mode 100644 index 000000000..beb2f3a97 --- /dev/null +++ b/program_transformers/src/token_metadata/mod.rs @@ -0,0 +1,56 @@ +use { + crate::{ + error::{ProgramTransformerError, ProgramTransformerResult}, + token_metadata::{ + master_edition::{save_v1_master_edition, save_v2_master_edition}, + v1_asset::{burn_v1_asset, save_v1_asset}, + }, + DownloadMetadataNotifier, + }, + blockbuster::programs::token_metadata::{TokenMetadataAccountData, TokenMetadataAccountState}, + plerkle_serialization::AccountInfo, + sea_orm::{DatabaseConnection, TransactionTrait}, +}; + +mod master_edition; +mod v1_asset; + +pub async fn handle_token_metadata_account<'a, 'b, 'c>( + account_update: &'a AccountInfo<'a>, + parsing_result: &'b TokenMetadataAccountState, + db: &'c DatabaseConnection, + download_metadata_notifier: &DownloadMetadataNotifier, +) -> ProgramTransformerResult<()> { + let key = *account_update.pubkey().unwrap(); + match &parsing_result.data { + TokenMetadataAccountData::EmptyAccount => { + burn_v1_asset(db, key, account_update.slot()).await?; + Ok(()) + } + TokenMetadataAccountData::MasterEditionV1(m) => { + let txn = db.begin().await?; + save_v1_master_edition(key, account_update.slot(), m, &txn).await?; + txn.commit().await?; + Ok(()) + } + TokenMetadataAccountData::MetadataV1(m) => { + if let Some(info) = save_v1_asset(db, m, account_update.slot()).await? { + download_metadata_notifier(info) + .await + .map_err(ProgramTransformerError::DownloadMetadataNotify)?; + } + Ok(()) + } + TokenMetadataAccountData::MasterEditionV2(m) => { + let txn = db.begin().await?; + save_v2_master_edition(key, account_update.slot(), m, &txn).await?; + txn.commit().await?; + Ok(()) + } + // TokenMetadataAccountData::EditionMarker(_) => {} + // TokenMetadataAccountData::UseAuthorityRecord(_) => {} + // TokenMetadataAccountData::CollectionAuthorityRecord(_) => {} + _ => Err(ProgramTransformerError::NotImplemented), + }?; + Ok(()) +} diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs new file mode 100644 index 000000000..90f4415d2 --- /dev/null +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -0,0 +1,461 @@ +use { + crate::{ + error::{ProgramTransformerError, ProgramTransformerResult}, + DownloadMetadataInfo, + }, + blockbuster::token_metadata::{ + accounts::{MasterEdition, Metadata}, + types::TokenStandard, + }, + digital_asset_types::{ + dao::{ + asset, asset_authority, asset_creators, asset_data, asset_grouping, + asset_v1_account_attachments, + sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, V1AccountAttachments, + }, + token_accounts, tokens, + }, + json::ChainDataV1, + }, + plerkle_serialization::Pubkey as FBPubkey, + sea_orm::{ + entity::{ActiveValue, ColumnTrait, EntityTrait}, + query::{JsonValue, Order, QueryFilter, QueryOrder, QueryTrait, Select}, + sea_query::query::OnConflict, + ConnectionTrait, DbBackend, DbErr, TransactionTrait, + }, + solana_sdk::{pubkey, pubkey::Pubkey}, + tokio::time::{sleep, Duration}, + tracing::warn, +}; + +pub async fn burn_v1_asset( + conn: &T, + id: FBPubkey, + slot: u64, +) -> ProgramTransformerResult<()> { + let (id, slot_i) = (id.0, slot as i64); + let model = asset::ActiveModel { + id: ActiveValue::Set(id.to_vec()), + slot_updated: ActiveValue::Set(Some(slot_i)), + burnt: ActiveValue::Set(true), + ..Default::default() + }; + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset.slot_updated", + query.sql + ); + conn.execute(query).await?; + Ok(()) +} + +const RETRY_INTERVALS: &[u64] = &[0, 5, 10]; +static WSOL_PUBKEY: Pubkey = pubkey!("So11111111111111111111111111111111111111112"); + +pub async fn save_v1_asset( + conn: &T, + metadata: &Metadata, + slot: u64, +) -> ProgramTransformerResult> { + let metadata = metadata.clone(); + let mint_pubkey = metadata.mint; + let mint_pubkey_array = mint_pubkey.to_bytes(); + let mint_pubkey_vec = mint_pubkey_array.to_vec(); + + let (edition_attachment_address, _) = MasterEdition::find_pda(&mint_pubkey); + + let authority = metadata.update_authority.to_bytes().to_vec(); + let slot_i = slot as i64; + let uri = metadata.uri.trim().replace('\0', ""); + let _spec = SpecificationVersions::V1; + let mut class = match metadata.token_standard { + Some(TokenStandard::NonFungible) => SpecificationAssetClass::Nft, + Some(TokenStandard::FungibleAsset) => SpecificationAssetClass::FungibleAsset, + Some(TokenStandard::Fungible) => SpecificationAssetClass::FungibleToken, + Some(TokenStandard::NonFungibleEdition) => SpecificationAssetClass::Nft, + Some(TokenStandard::ProgrammableNonFungible) => SpecificationAssetClass::ProgrammableNft, + Some(TokenStandard::ProgrammableNonFungibleEdition) => { + SpecificationAssetClass::ProgrammableNft + } + _ => SpecificationAssetClass::Unknown, + }; + let mut ownership_type = match class { + SpecificationAssetClass::FungibleAsset => OwnerType::Token, + SpecificationAssetClass::FungibleToken => OwnerType::Token, + SpecificationAssetClass::Nft | SpecificationAssetClass::ProgrammableNft => { + OwnerType::Single + } + _ => OwnerType::Unknown, + }; + + // Wrapped Solana is a special token that has supply 0 (infinite). + // It's a fungible token with a metadata account, but without any token standard, meaning the code above will misabel it as an NFT. + if mint_pubkey == WSOL_PUBKEY { + ownership_type = OwnerType::Token; + class = SpecificationAssetClass::FungibleToken; + } + + // Gets the token and token account for the mint to populate the asset. + // This is required when the token and token account are indexed, but not the metadata account. + // If the metadata account is indexed, then the token and ta ingester will update the asset with the correct data. + let token: Option = find_model_with_retry( + conn, + "token", + &tokens::Entity::find_by_id(mint_pubkey_vec.clone()), + RETRY_INTERVALS, + ) + .await?; + + // get supply of token, default to 1 since most cases will be NFTs. Token mint ingester will properly set supply if token_result is None + let (supply, supply_mint) = match token { + Some(t) => (t.supply, Some(t.mint)), + None => { + warn!( + target: "Account not found", + "Token/Mint not found in 'tokens' table for mint {}", + bs58::encode(&mint_pubkey_vec).into_string() + ); + (1, None) + } + }; + + // Map unknown ownership types based on the supply. + if ownership_type == OwnerType::Unknown { + ownership_type = match supply.cmp(&1) { + std::cmp::Ordering::Equal => OwnerType::Single, + std::cmp::Ordering::Greater => OwnerType::Token, + _ => OwnerType::Unknown, + } + } + + let token_account: Option = match ownership_type { + OwnerType::Single | OwnerType::Unknown => { + // query for token account associated with mint with positive balance with latest slot + let token_account: Option = find_model_with_retry( + conn, + "token_accounts", + &token_accounts::Entity::find() + .filter(token_accounts::Column::Mint.eq(mint_pubkey_vec.clone())) + .filter(token_accounts::Column::Amount.gt(0)) + .order_by(token_accounts::Column::SlotUpdated, Order::Desc), + RETRY_INTERVALS, + ) + .await + .map_err(|e: DbErr| ProgramTransformerError::DatabaseError(e.to_string()))?; + + token_account + } + _ => None, + }; + + // owner and delegate should be from the token account with the mint + let (owner, delegate) = match token_account { + Some(ta) => ( + ActiveValue::Set(Some(ta.owner)), + ActiveValue::Set(ta.delegate), + ), + None => { + if supply == 1 && ownership_type == OwnerType::Single { + warn!( + target: "Account not found", + "Token acc not found in 'token_accounts' table for mint {}", + bs58::encode(&mint_pubkey_vec).into_string() + ); + } + (ActiveValue::NotSet, ActiveValue::NotSet) + } + }; + + let name = metadata.name.clone().into_bytes(); + let symbol = metadata.symbol.clone().into_bytes(); + let mut chain_data = ChainDataV1 { + name: metadata.name.clone(), + symbol: metadata.symbol.clone(), + edition_nonce: metadata.edition_nonce, + primary_sale_happened: metadata.primary_sale_happened, + token_standard: metadata.token_standard, + uses: metadata.uses, + }; + chain_data.sanitize(); + let chain_data_json = serde_json::to_value(chain_data) + .map_err(|e| ProgramTransformerError::DeserializationError(e.to_string()))?; + let chain_mutability = match metadata.is_mutable { + true => ChainMutability::Mutable, + false => ChainMutability::Immutable, + }; + let asset_data_model = asset_data::ActiveModel { + chain_data_mutability: ActiveValue::Set(chain_mutability), + chain_data: ActiveValue::Set(chain_data_json), + metadata_url: ActiveValue::Set(uri.clone()), + metadata: ActiveValue::Set(JsonValue::String("processing".to_string())), + metadata_mutability: ActiveValue::Set(Mutability::Mutable), + slot_updated: ActiveValue::Set(slot_i), + reindex: ActiveValue::Set(Some(true)), + id: ActiveValue::Set(mint_pubkey_vec.clone()), + raw_name: ActiveValue::Set(Some(name.to_vec())), + raw_symbol: ActiveValue::Set(Some(symbol.to_vec())), + base_info_seq: ActiveValue::Set(Some(0)), + }; + let txn = conn.begin().await?; + let mut query = asset_data::Entity::insert(asset_data_model) + .on_conflict( + OnConflict::columns([asset_data::Column::Id]) + .update_columns([ + asset_data::Column::ChainDataMutability, + asset_data::Column::ChainData, + asset_data::Column::MetadataUrl, + asset_data::Column::MetadataMutability, + asset_data::Column::SlotUpdated, + asset_data::Column::Reindex, + asset_data::Column::RawName, + asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset_data.slot_updated", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + + let model = asset::ActiveModel { + id: ActiveValue::Set(mint_pubkey_vec.clone()), + owner, + owner_type: ActiveValue::Set(ownership_type), + delegate, + frozen: ActiveValue::Set(false), + supply: ActiveValue::Set(supply), + supply_mint: ActiveValue::Set(supply_mint), + specification_version: ActiveValue::Set(Some(SpecificationVersions::V1)), + specification_asset_class: ActiveValue::Set(Some(class)), + tree_id: ActiveValue::Set(None), + nonce: ActiveValue::Set(Some(0)), + seq: ActiveValue::Set(Some(0)), + leaf: ActiveValue::Set(None), + data_hash: ActiveValue::Set(None), + creator_hash: ActiveValue::Set(None), + compressed: ActiveValue::Set(false), + compressible: ActiveValue::Set(false), + royalty_target_type: ActiveValue::Set(RoyaltyTargetType::Creators), + royalty_target: ActiveValue::Set(None), + royalty_amount: ActiveValue::Set(metadata.seller_fee_basis_points as i32), //basis points + asset_data: ActiveValue::Set(Some(mint_pubkey_vec.clone())), + slot_updated: ActiveValue::Set(Some(slot_i)), + burnt: ActiveValue::Set(false), + ..Default::default() + }; + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([ + asset::Column::Owner, + asset::Column::OwnerType, + asset::Column::Delegate, + asset::Column::Frozen, + asset::Column::Supply, + asset::Column::SupplyMint, + asset::Column::SpecificationVersion, + asset::Column::SpecificationAssetClass, + asset::Column::TreeId, + asset::Column::Nonce, + asset::Column::Seq, + asset::Column::Leaf, + asset::Column::DataHash, + asset::Column::CreatorHash, + asset::Column::Compressed, + asset::Column::Compressible, + asset::Column::RoyaltyTargetType, + asset::Column::RoyaltyTarget, + asset::Column::RoyaltyAmount, + asset::Column::AssetData, + asset::Column::SlotUpdated, + asset::Column::Burnt, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated >= asset.slot_updated OR asset.slot_updated IS NULL", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + + let attachment = asset_v1_account_attachments::ActiveModel { + id: ActiveValue::Set(edition_attachment_address.to_bytes().to_vec()), + slot_updated: ActiveValue::Set(slot_i), + attachment_type: ActiveValue::Set(V1AccountAttachments::MasterEditionV2), + ..Default::default() + }; + let query = asset_v1_account_attachments::Entity::insert(attachment) + .on_conflict( + OnConflict::columns([asset_v1_account_attachments::Column::Id]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + + let model = asset_authority::ActiveModel { + asset_id: ActiveValue::Set(mint_pubkey_vec.clone()), + authority: ActiveValue::Set(authority), + seq: ActiveValue::Set(0), + slot_updated: ActiveValue::Set(slot_i), + ..Default::default() + }; + let mut query = asset_authority::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset_authority::Column::AssetId]) + .update_columns([ + asset_authority::Column::Authority, + asset_authority::Column::Seq, + asset_authority::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset_authority.slot_updated", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + + if let Some(c) = &metadata.collection { + let model = asset_grouping::ActiveModel { + asset_id: ActiveValue::Set(mint_pubkey_vec.clone()), + group_key: ActiveValue::Set("collection".to_string()), + group_value: ActiveValue::Set(Some(c.key.to_string())), + verified: ActiveValue::Set(c.verified), + group_info_seq: ActiveValue::Set(Some(0)), + slot_updated: ActiveValue::Set(Some(slot_i)), + ..Default::default() + }; + let mut query = asset_grouping::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, + ]) + .update_columns([ + asset_grouping::Column::GroupValue, + asset_grouping::Column::Verified, + asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset_grouping.slot_updated", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + } + + let creators = metadata + .creators + .unwrap_or_default() + .iter() + .enumerate() + .map(|(i, creator)| asset_creators::ActiveModel { + asset_id: ActiveValue::Set(mint_pubkey_vec.clone()), + position: ActiveValue::Set(i as i16), + creator: ActiveValue::Set(creator.address.to_bytes().to_vec()), + share: ActiveValue::Set(creator.share as i32), + verified: ActiveValue::Set(creator.verified), + slot_updated: ActiveValue::Set(Some(slot_i)), + seq: ActiveValue::Set(Some(0)), + ..Default::default() + }) + .collect::>(); + + if !creators.is_empty() { + let mut query = asset_creators::Entity::insert_many(creators) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Position, + ]) + .update_columns([ + asset_creators::Column::Creator, + asset_creators::Column::Share, + asset_creators::Column::Verified, + asset_creators::Column::Seq, + asset_creators::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + } + txn.commit().await?; + + if uri.is_empty() { + warn!( + "URI is empty for mint {}. Skipping background task.", + bs58::encode(mint_pubkey_vec).into_string() + ); + return Ok(None); + } + + Ok(Some(DownloadMetadataInfo::new(mint_pubkey_vec, uri))) +} + +async fn find_model_with_retry( + conn: &T, + _model_name: &str, + select: &Select, + retry_intervals: &[u64], +) -> Result, DbErr> { + // let mut retries = 0; + // let metric_name = format!("{}_found", model_name); + + for interval in retry_intervals { + let interval_duration = Duration::from_millis(*interval); + sleep(interval_duration).await; + + let model = select.clone().one(conn).await?; + if let Some(m) = model { + // record_metric(&metric_name, true, retries); + return Ok(Some(m)); + } + // retries += 1; + } + + // record_metric(&metric_name, false, retries - 1); + Ok(None) +} + +// fn record_metric(metric_name: &str, success: bool, retries: u32) { +// let retry_count = &retries.to_string(); +// let success = if success { "true" } else { "false" }; +// metric! { +// statsd_count!(metric_name, 1, "success" => success, "retry_count" => retry_count); +// } +// }