diff --git a/Cargo.lock b/Cargo.lock index 7e33acdb9..2b2c42ddc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -881,9 +881,9 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" [[package]] name = "blockbuster" -version = "0.9.0-beta.1" +version = "0.9.0-beta.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56e0240c1218958c0d51284d783fa055f551d769bb8b7a4abf635b17fa9620dc" +checksum = "40ab97783defb671f7214f158a517844cb8fa5da781e4d8d46a17e15bc79f213" dependencies = [ "anchor-lang", "async-trait", diff --git a/README.md b/README.md index 2784bcad9..e0b4d6d2b 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Because this is a multi component system the easiest way to develop or locally t #### Regenerating DB Types Edit the init.sql, then run `docker compose up db` Then with a local `DATABASE_URL` var exported like this `export DATABASE_URL=postgres://solana:solana@localhost/solana` you can run -` sea-orm-cli generate entity -o ./digital_asset_types/src/dao/generated/ --database-url $DATABASE_URL --with-serde both --expanded-format` +`sea-orm-cli generate entity -o ./digital_asset_types/src/dao/generated/ --database-url $DATABASE_URL --with-serde both --expanded-format` If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`. diff --git a/das_api/Cargo.toml b/das_api/Cargo.toml index c99d8b21a..bd2bbff51 100644 --- a/das_api/Cargo.toml +++ b/das_api/Cargo.toml @@ -33,7 +33,7 @@ schemars = "0.8.6" schemars_derive = "0.8.6" open-rpc-derive = { version = "0.0.4"} open-rpc-schema = { version = "0.0.4"} -blockbuster = "0.9.0-beta.1" +blockbuster = "=0.9.0-beta.3" anchor-lang = "0.28.0" mpl-token-metadata = { version = "=2.0.0-beta.1", features = ["serde-feature"] } mpl-candy-machine-core = { version = "2.0.1", features = ["no-entrypoint"] } diff --git a/digital_asset_types/Cargo.toml b/digital_asset_types/Cargo.toml index 8e4b18bb9..8d79be1f3 100644 --- a/digital_asset_types/Cargo.toml +++ b/digital_asset_types/Cargo.toml @@ -18,7 +18,7 @@ solana-sdk = "~1.16.16" num-traits = "0.2.15" num-derive = "0.3.3" thiserror = "1.0.31" -blockbuster = "0.9.0-beta.1" +blockbuster = "=0.9.0-beta.3" jsonpath_lib = "0.3.0" mime_guess = "2.0.4" url = "2.3.1" diff --git a/digital_asset_types/src/dao/generated/asset.rs b/digital_asset_types/src/dao/generated/asset.rs index 0ced69299..b8e116e93 100644 --- a/digital_asset_types/src/dao/generated/asset.rs +++ b/digital_asset_types/src/dao/generated/asset.rs @@ -46,6 +46,8 @@ pub struct Model { pub owner_delegate_seq: Option, pub was_decompressed: bool, pub leaf_seq: Option, + pub royalty_amount_seq: Option, + pub creators_added_seq: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -78,6 +80,8 @@ pub enum Column { OwnerDelegateSeq, WasDecompressed, LeafSeq, + RoyaltyAmountSeq, + CreatorsAddedSeq, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -133,6 +137,8 @@ impl ColumnTrait for Column { Self::OwnerDelegateSeq => ColumnType::BigInteger.def().null(), Self::WasDecompressed => ColumnType::Boolean.def(), Self::LeafSeq => ColumnType::BigInteger.def().null(), + Self::RoyaltyAmountSeq => ColumnType::BigInteger.def().null(), + Self::CreatorsAddedSeq => ColumnType::BigInteger.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/asset_creators.rs b/digital_asset_types/src/dao/generated/asset_creators.rs index 21f34dcf7..346ed3b2e 100644 --- a/digital_asset_types/src/dao/generated/asset_creators.rs +++ b/digital_asset_types/src/dao/generated/asset_creators.rs @@ -19,7 +19,7 @@ pub struct Model { pub creator: Vec, pub share: i32, pub verified: bool, - pub seq: Option, + pub verified_seq: Option, pub slot_updated: Option, pub position: i16, } @@ -31,7 +31,7 @@ pub enum Column { Creator, Share, Verified, - Seq, + VerifiedSeq, SlotUpdated, Position, } @@ -62,7 +62,7 @@ impl ColumnTrait for Column { Self::Creator => ColumnType::Binary.def(), Self::Share => ColumnType::Integer.def(), Self::Verified => ColumnType::Boolean.def(), - Self::Seq => ColumnType::BigInteger.def().null(), + Self::VerifiedSeq => ColumnType::BigInteger.def().null(), Self::SlotUpdated => ColumnType::BigInteger.def().null(), Self::Position => ColumnType::SmallInteger.def(), } diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs index 374ed854a..17bbc2e43 100644 --- a/digital_asset_types/src/dao/generated/asset_data.rs +++ b/digital_asset_types/src/dao/generated/asset_data.rs @@ -24,8 +24,10 @@ pub struct Model { pub metadata: Json, pub slot_updated: i64, pub reindex: Option, - pub raw_name: Vec, - pub raw_symbol: Vec, + pub raw_name: Option>, + pub raw_symbol: Option>, + pub base_info_seq: Option, + pub download_metadata_seq: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -40,6 +42,8 @@ pub enum Column { Reindex, RawName, RawSymbol, + BaseInfoSeq, + DownloadMetadataSeq, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -70,9 +74,11 @@ impl ColumnTrait for Column { Self::MetadataMutability => Mutability::db_type(), Self::Metadata => ColumnType::JsonBinary.def(), Self::SlotUpdated => ColumnType::BigInteger.def(), - Self::Reindex => ColumnType::Boolean.def(), - Self::RawName => ColumnType::Binary.def(), - Self::RawSymbol => ColumnType::Binary.def(), + Self::Reindex => ColumnType::Boolean.def().null(), + Self::RawName => ColumnType::Binary.def().null(), + Self::RawSymbol => ColumnType::Binary.def().null(), + Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), + Self::DownloadMetadataSeq => ColumnType::BigInteger.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/asset_grouping.rs b/digital_asset_types/src/dao/generated/asset_grouping.rs index aae51d6d8..5d5c0e749 100644 --- a/digital_asset_types/src/dao/generated/asset_grouping.rs +++ b/digital_asset_types/src/dao/generated/asset_grouping.rs @@ -1,4 +1,4 @@ -//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 +//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -12,7 +12,7 @@ impl EntityName for Entity { } } -#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Serialize, Deserialize)] pub struct Model { pub id: i64, pub asset_id: Vec, @@ -20,7 +20,7 @@ pub struct Model { pub group_value: Option, pub seq: Option, pub slot_updated: Option, - pub verified: Option, + pub verified: bool, pub group_info_seq: Option, } diff --git a/digital_asset_types/src/dao/generated/cl_audits.rs b/digital_asset_types/src/dao/generated/cl_audits.rs index a07714202..0d02b7769 100644 --- a/digital_asset_types/src/dao/generated/cl_audits.rs +++ b/digital_asset_types/src/dao/generated/cl_audits.rs @@ -22,7 +22,7 @@ pub struct Model { pub seq: i64, pub level: i64, pub hash: Vec, - pub created_at: Option, + pub created_at: DateTime, pub tx: String, } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index 2be0283e7..bf72d7957 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -4,58 +4,12 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")] -pub enum Mutability { - #[sea_orm(string_value = "immutable")] - Immutable, - #[sea_orm(string_value = "mutable")] - Mutable, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "v1_account_attachments" -)] -pub enum V1AccountAttachments { - #[sea_orm(string_value = "edition")] - Edition, - #[sea_orm(string_value = "edition_marker")] - EditionMarker, - #[sea_orm(string_value = "master_edition_v1")] - MasterEditionV1, - #[sea_orm(string_value = "master_edition_v2")] - MasterEditionV2, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] -pub enum TaskStatus { - #[sea_orm(string_value = "failed")] - Failed, - #[sea_orm(string_value = "pending")] - Pending, - #[sea_orm(string_value = "running")] - Running, - #[sea_orm(string_value = "success")] - Success, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "royalty_target_type" -)] -pub enum RoyaltyTargetType { - #[sea_orm(string_value = "creators")] - Creators, - #[sea_orm(string_value = "fanout")] - Fanout, +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] +pub enum OwnerType { #[sea_orm(string_value = "single")] Single, + #[sea_orm(string_value = "token")] + Token, #[sea_orm(string_value = "unknown")] Unknown, } @@ -88,12 +42,20 @@ pub enum SpecificationAssetClass { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] -pub enum ChainMutability { - #[sea_orm(string_value = "immutable")] - Immutable, - #[sea_orm(string_value = "mutable")] - Mutable, +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "v1_account_attachments" +)] +pub enum V1AccountAttachments { + #[sea_orm(string_value = "edition")] + Edition, + #[sea_orm(string_value = "edition_marker")] + EditionMarker, + #[sea_orm(string_value = "master_edition_v1")] + MasterEditionV1, + #[sea_orm(string_value = "master_edition_v2")] + MasterEditionV2, #[sea_orm(string_value = "unknown")] Unknown, } @@ -114,12 +76,50 @@ pub enum SpecificationVersions { V2, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] -pub enum OwnerType { +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "royalty_target_type" +)] +pub enum RoyaltyTargetType { + #[sea_orm(string_value = "creators")] + Creators, + #[sea_orm(string_value = "fanout")] + Fanout, #[sea_orm(string_value = "single")] Single, - #[sea_orm(string_value = "token")] - Token, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] +pub enum TaskStatus { + #[sea_orm(string_value = "failed")] + Failed, + #[sea_orm(string_value = "pending")] + Pending, + #[sea_orm(string_value = "running")] + Running, + #[sea_orm(string_value = "success")] + Success, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] +pub enum ChainMutability { + #[sea_orm(string_value = "immutable")] + Immutable, + #[sea_orm(string_value = "mutable")] + Mutable, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")] +pub enum Mutability { + #[sea_orm(string_value = "immutable")] + Immutable, + #[sea_orm(string_value = "mutable")] + Mutable, #[sea_orm(string_value = "unknown")] Unknown, } diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index bbe3cf509..50bfc194a 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -83,8 +83,10 @@ pub fn create_asset_data( metadata: JsonValue::String("processing".to_string()), slot_updated: 0, reindex: None, - raw_name: metadata.name.into_bytes().to_vec().clone(), - raw_symbol: metadata.symbol.into_bytes().to_vec().clone(), + raw_name: Some(metadata.name.into_bytes().to_vec().clone()), + raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()), + base_info_seq: Some(0), + download_metadata_seq: Some(0), }, ) } @@ -157,6 +159,8 @@ pub fn create_asset( owner_delegate_seq: Some(0), was_decompressed: false, leaf_seq: Some(0), + royalty_amount_seq: Some(0), + creators_added_seq: Some(0), }, ) } @@ -182,7 +186,7 @@ pub fn create_asset_creator( creator, share, verified, - seq: Some(0), + verified_seq: Some(0), slot_updated: Some(0), position: 0, }, @@ -231,7 +235,7 @@ pub fn create_asset_grouping( id: row_num, group_key: "collection".to_string(), slot_updated: Some(0), - verified: Some(false), + verified: false, group_info_seq: Some(0), }, ) diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs index 765f14bc6..c10ca12e3 100644 --- a/digital_asset_types/tests/json_parsing.rs +++ b/digital_asset_types/tests/json_parsing.rs @@ -34,8 +34,10 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content { metadata: json, slot_updated: 0, reindex: None, - raw_name: String::from("Handalf").into_bytes().to_vec(), - raw_symbol: String::from("").into_bytes().to_vec(), + raw_name: Some(String::from("Handalf").into_bytes().to_vec()), + raw_symbol: Some(String::from("").into_bytes().to_vec()), + base_info_seq: Some(0), + download_metadata_seq: Some(0), }; v1_content_from_json(&asset_data).unwrap() diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 7e38ac93d..7b4be4523 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -30,6 +30,7 @@ mod m20230724_120101_add_group_info_seq; mod m20230726_013107_remove_not_null_constraint_from_group_value; mod m20230918_182123_add_raw_name_symbol; mod m20230919_072154_cl_audits; +mod m20231019_120101_add_seq_numbers_bgum_update_metadata; pub struct Migrator; @@ -67,6 +68,7 @@ impl MigratorTrait for Migrator { Box::new(m20230726_013107_remove_not_null_constraint_from_group_value::Migration), Box::new(m20230918_182123_add_raw_name_symbol::Migration), Box::new(m20230919_072154_cl_audits::Migration), + Box::new(m20231019_120101_add_seq_numbers_bgum_update_metadata::Migration), ] } } diff --git a/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs b/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs new file mode 100644 index 000000000..6881c3c8b --- /dev/null +++ b/migration/src/m20231019_120101_add_seq_numbers_bgum_update_metadata.rs @@ -0,0 +1,83 @@ +use digital_asset_types::dao::{asset, asset_creators, asset_data}; +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_creators + RENAME COLUMN seq to verified_seq; + " + .to_string(), + )) + .await?; + + manager + .alter_table( + Table::alter() + .table(asset_data::Entity) + .add_column(ColumnDef::new(Alias::new("base_info_seq")).big_integer()) + .add_column(ColumnDef::new(Alias::new("download_metadata_seq")).big_integer()) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .add_column(ColumnDef::new(Alias::new("royalty_amount_seq")).big_integer()) + .add_column(ColumnDef::new(Alias::new("creators_added_seq")).big_integer()) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_creators + RENAME COLUMN verified_seq to seq; + " + .to_string(), + )) + .await?; + + manager + .alter_table( + Table::alter() + .table(asset_data::Entity) + .drop_column(Alias::new("base_info_seq")) + .drop_column(Alias::new("download_metadata_seq")) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .drop_column(Alias::new("royalty_amount_seq")) + .drop_column(Alias::new("creators_added_seq")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 6fb9f998c..8a44d32b4 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -35,7 +35,7 @@ spl-concurrent-merkle-tree = "0.2.0" uuid = "1.0.0" async-trait = "0.1.53" num-traits = "0.2.15" -blockbuster = "0.9.0-beta.1" +blockbuster = "=0.9.0-beta.3" figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } cadence = "0.29.0" cadence-macros = "0.29.0" diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index 70ddcfcea..adca1324f 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -23,6 +23,9 @@ where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { + // Note: We do not check whether the asset has been decompressed here because we know if it + // was burned then it could not have been decompressed later. + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( @@ -46,10 +49,7 @@ where let query = asset::Entity::insert(asset_model) .on_conflict( OnConflict::columns([asset::Column::Id]) - .update_columns([ - asset::Column::Burnt, - //TODO maybe handle slot updated. - ]) + .update_columns([asset::Column::Burnt]) .to_owned(), ) .build(DbBackend::Postgres); diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index 1b8f5842a..ba6dd3073 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -1,7 +1,7 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, + asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, }, }; @@ -31,6 +31,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -50,7 +56,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 7517f1544..fd88d6c35 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -1,4 +1,6 @@ -use crate::program_transformers::bubblegum::{upsert_asset_with_seq, upsert_collection_info}; +use crate::program_transformers::bubblegum::{ + asset_was_decompressed, upsert_asset_with_seq, upsert_collection_info, +}; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, @@ -26,7 +28,7 @@ where let (collection, verify) = match payload { Payload::CollectionVerification { collection, verify, .. - } => (collection.clone(), verify.clone()), + } => (collection, verify), _ => { return Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), @@ -41,6 +43,12 @@ where let id_bytes = match le.schema { LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), }; + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; @@ -54,7 +62,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; @@ -64,8 +71,8 @@ where txn, id_bytes.to_vec(), Some(Collection { - key: collection.clone(), - verified: verify, + key: *collection, + verified: *verify, }), bundle.slot as i64, seq as i64, diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index 134fe89ca..85c6e9857 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -1,7 +1,7 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, + asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified, }, }; @@ -51,6 +51,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -70,7 +76,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 7e930abdc..e788526db 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,15 +1,15 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ - asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items, + asset, asset_creators, asset_data, asset_grouping, backfill_items, cl_audits, cl_items, + sea_orm_active_enums::{ChainMutability, Mutability}, }; use log::{debug, info}; -use mpl_bubblegum::types::Collection; +use mpl_bubblegum::types::{Collection, Creator}; use sea_orm::{ query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, }; use spl_account_compression::events::ChangeLogEventV1; - -use std::convert::From; +use std::collections::HashSet; pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, @@ -68,7 +68,7 @@ where ..Default::default() }; - let mut audit_item: Option = if (cl_audits) { + let audit_item: Option = if cl_audits { let mut ai: cl_audits::ActiveModel = item.clone().into(); ai.tx = Set(txn_id.to_string()); Some(ai) @@ -135,6 +135,7 @@ where //TODO -> set maximum size of path and break into multiple statements } +#[allow(clippy::too_many_arguments)] pub async fn upsert_asset_with_leaf_info( txn: &T, id: Vec, @@ -144,7 +145,6 @@ pub async fn upsert_asset_with_leaf_info( data_hash: [u8; 32], creator_hash: [u8; 32], seq: i64, - was_decompressed: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -169,22 +169,19 @@ where asset::Column::Nonce, asset::Column::TreeId, asset::Column::Leaf, - asset::Column::LeafSeq, asset::Column::DataHash, asset::Column::CreatorHash, + asset::Column::LeafSeq, ]) .to_owned(), ) .build(DbBackend::Postgres); - // If we are indexing decompression we will update the leaf regardless of if we have previously - // indexed decompression and regardless of seq. - if !was_decompressed { - query.sql = format!( - "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", - query.sql - ); - } + // If the asset was decompressed, don't update the leaf info since we cleared it during decompression. + query.sql = format!( + "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq >= asset.leaf_seq OR asset.leaf_seq IS NULL)", + query.sql + ); txn.execute(query) .await @@ -202,26 +199,25 @@ where { let model = asset::ActiveModel { id: Set(id), - leaf: Set(None), nonce: Set(Some(0)), - leaf_seq: Set(None), + tree_id: Set(None), + leaf: Set(None), data_hash: Set(None), creator_hash: Set(None), - tree_id: Set(None), - seq: Set(Some(0)), + leaf_seq: Set(None), ..Default::default() }; + let query = asset::Entity::insert(model) .on_conflict( OnConflict::column(asset::Column::Id) .update_columns([ - asset::Column::Leaf, - asset::Column::LeafSeq, asset::Column::Nonce, + asset::Column::TreeId, + asset::Column::Leaf, asset::Column::DataHash, asset::Column::CreatorHash, - asset::Column::TreeId, - asset::Column::Seq, + asset::Column::LeafSeq, ]) .to_owned(), ) @@ -334,7 +330,7 @@ where .build(DbBackend::Postgres); query.sql = format!( - "{} WHERE (NOT asset.was_decompressed) AND (excluded.seq >= asset.seq OR asset.seq IS NULL)", + "{} WHERE excluded.seq >= asset.seq OR asset.seq IS NULL", query.sql ); @@ -356,35 +352,42 @@ where T: ConnectionTrait + TransactionTrait, { let model = asset_creators::ActiveModel { - asset_id: Set(asset_id), + asset_id: Set(asset_id.clone()), creator: Set(creator), verified: Set(verified), - seq: Set(Some(seq)), + verified_seq: Set(Some(seq)), ..Default::default() }; - let mut query = asset_creators::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Verified, - asset_creators::Column::Seq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); + // Only upsert a creator if the asset table's creator array seq is at a lower value. That seq + // gets updated when we set up the creator array in `mintV1` or `update_metadata`. We don't + // want to insert a creator that was removed from a later `update_metadata`. And we don't need + // to worry about creator verification in that case because the `update_metadata` updates + // creator verification state as well. + if creators_should_be_updated(txn, asset_id, seq).await? { + let mut query = asset_creators::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ + asset_creators::Column::Verified, + asset_creators::Column::VerifiedSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.seq >= asset_creators.seq OR asset_creators.seq is NULL", - query.sql - ); + query.sql = format!( + "{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq is NULL", + query.sql, +); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + } Ok(()) } @@ -408,7 +411,7 @@ where asset_id: Set(asset_id), group_key: Set("collection".to_string()), group_value: Set(group_value), - verified: Set(Some(verified)), + verified: Set(verified), slot_updated: Set(Some(slot_updated)), group_info_seq: Set(Some(seq)), ..Default::default() @@ -441,3 +444,261 @@ where Ok(()) } + +#[allow(clippy::too_many_arguments)] +pub async fn upsert_asset_data( + txn: &T, + id: Vec, + chain_data_mutability: ChainMutability, + chain_data: JsonValue, + metadata_url: String, + metadata_mutability: Mutability, + metadata: JsonValue, + slot_updated: i64, + reindex: Option, + raw_name: Vec, + raw_symbol: Vec, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_data::ActiveModel { + id: Set(id.clone()), + chain_data_mutability: Set(chain_data_mutability), + chain_data: Set(chain_data), + metadata_url: Set(metadata_url), + metadata_mutability: Set(metadata_mutability), + metadata: Set(metadata), + slot_updated: Set(slot_updated), + reindex: Set(reindex), + raw_name: Set(Some(raw_name)), + raw_symbol: Set(Some(raw_symbol)), + base_info_seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_data::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset_data::Column::Id]) + .update_columns([ + asset_data::Column::ChainDataMutability, + asset_data::Column::ChainData, + asset_data::Column::MetadataUrl, + asset_data::Column::MetadataMutability, + // Don't update asset_data::Column::Metadata if it already exists. Even if we + // are indexing `update_metadata`` and there's a new URI, the new background + // task will overwrite it. + asset_data::Column::SlotUpdated, + asset_data::Column::Reindex, + asset_data::Column::RawName, + asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.base_info_seq >= asset_data.base_info_seq OR asset_data.base_info_seq IS NULL)", + query.sql + ); + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_asset_with_royalty_amount( + txn: &T, + id: Vec, + royalty_amount: i32, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: Set(id.clone()), + royalty_amount: Set(royalty_amount), + royalty_amount_seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([ + asset::Column::RoyaltyAmount, + asset::Column::RoyaltyAmountSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!( + "{} WHERE excluded.royalty_amount_seq >= asset.royalty_amount_seq OR royalty_amount_seq.seq IS NULL)", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn asset_was_decompressed(txn: &T, id: Vec) -> Result +where + T: ConnectionTrait + TransactionTrait, +{ + if let Some(asset) = asset::Entity::find_by_id(id).one(txn).await? { + if let Some(0) = asset.seq { + return Ok(true); + } + }; + Ok(false) +} + +pub async fn creators_should_be_updated( + txn: &T, + id: Vec, + seq: i64, +) -> Result +where + T: ConnectionTrait + TransactionTrait, +{ + if let Some(asset) = asset::Entity::find_by_id(id).one(txn).await? { + if let Some(creators_added_seq) = asset.creators_added_seq { + if seq < creators_added_seq { + return Ok(false); + } + } + } + Ok(true) +} + +pub async fn upsert_asset_with_creators_added_seq( + txn: &T, + id: Vec, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: Set(id), + creators_added_seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([asset::Column::CreatorsAddedSeq]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!( + "{} WHERE excluded.creators_added_seq >= asset.creators_added_seq OR asset.creators_added_seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_creators( + txn: &T, + id: Vec, + creators: &Vec, + slot_updated: i64, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + if creators_should_be_updated(txn, id.clone(), seq).await? { + if !creators.is_empty() { + // Vec to hold base creator information. + let mut db_creator_infos = Vec::with_capacity(creators.len()); + + // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. + let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); + + // Set to prevent duplicates. + let mut creators_set = HashSet::new(); + + for (i, c) in creators.iter().enumerate() { + if creators_set.contains(&c.address) { + continue; + } + + db_creator_infos.push(asset_creators::ActiveModel { + asset_id: Set(id.clone()), + creator: Set(c.address.to_bytes().to_vec()), + position: Set(i as i16), + share: Set(c.share as i32), + slot_updated: Set(Some(slot_updated)), + ..Default::default() + }); + + db_creator_verified_infos.push(asset_creators::ActiveModel { + asset_id: Set(id.clone()), + creator: Set(c.address.to_bytes().to_vec()), + verified: Set(c.verified), + verified_seq: Set(Some(seq)), + ..Default::default() + }); + + creators_set.insert(c.address); + } + + // This statement will update base information for each creator. + let query = asset_creators::Entity::insert_many(db_creator_infos) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ + asset_creators::Column::Position, + asset_creators::Column::Share, + asset_creators::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query).await?; + + // This statement will update whether the creator is verified and the + // `verified_seq` number. + let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ + asset_creators::Column::Verified, + asset_creators::Column::VerifiedSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL", + query.sql + ); + txn.execute(query).await?; + } + + upsert_asset_with_creators_added_seq(txn, id, seq).await?; + } + + Ok(()) +} diff --git a/nft_ingester/src/program_transformers/bubblegum/decompress.rs b/nft_ingester/src/program_transformers/bubblegum/decompress.rs index a024d5ebe..6e9e0341a 100644 --- a/nft_ingester/src/program_transformers/bubblegum/decompress.rs +++ b/nft_ingester/src/program_transformers/bubblegum/decompress.rs @@ -1,12 +1,13 @@ use crate::{ error::IngesterError, - program_transformers::bubblegum::upsert_asset_with_leaf_info_for_decompression, + program_transformers::bubblegum::{ + asset_was_decompressed, upsert_asset_with_compression_info, + upsert_asset_with_leaf_info_for_decompression, + }, }; use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; use sea_orm::{query::*, ConnectionTrait}; -use super::upsert_asset_with_compression_info; - pub async fn decompress<'c, T>( _parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, @@ -17,8 +18,14 @@ where { let id_bytes = bundle.keys.get(3).unwrap().0.as_slice(); + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?; + upsert_asset_with_compression_info( txn, id_bytes.to_vec(), diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index 88896de64..5f646b269 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -1,7 +1,7 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_leaf_info, + asset_was_decompressed, save_changelog_event, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, }, }; @@ -30,6 +30,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -48,7 +54,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 752ed6a3c..93ffbaca5 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -1,8 +1,10 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, upsert_asset_with_compression_info, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_collection_info, + asset_was_decompressed, save_changelog_event, upsert_asset_data, + upsert_asset_with_compression_info, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_royalty_amount, + upsert_asset_with_seq, upsert_collection_info, upsert_creators, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; @@ -17,8 +19,11 @@ use blockbuster::{ use chrono::Utc; use digital_asset_types::{ dao::{ - asset, asset_authority, asset_creators, asset_data, asset_v1_account_attachments, - sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType}, + asset, asset_authority, asset_v1_account_attachments, + sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, V1AccountAttachments, + }, }, json::ChainDataV1, }; @@ -27,11 +32,6 @@ use num_traits::FromPrimitive; use sea_orm::{ entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, JsonValue, }; -use std::collections::HashSet; - -use digital_asset_types::dao::sea_orm_active_enums::{ - SpecificationAssetClass, SpecificationVersions, V1AccountAttachments, -}; // TODO -> consider moving structs into these functions to avoid clone @@ -62,6 +62,12 @@ where } => { let (edition_attachment_address, _) = find_master_edition_account(&id); let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(None); + } + let slot_i = bundle.slot as i64; let uri = metadata.uri.replace('\0', ""); let name = metadata.name.clone().into_bytes(); @@ -86,43 +92,22 @@ where false => ChainMutability::Immutable, }; - let data = asset_data::ActiveModel { - id: Set(id_bytes.to_vec()), - chain_data_mutability: Set(chain_mutability), - chain_data: Set(chain_data_json), - metadata_url: Set(uri.clone()), - metadata: Set(JsonValue::String("processing".to_string())), - metadata_mutability: Set(Mutability::Mutable), - slot_updated: Set(slot_i), - reindex: Set(Some(true)), - raw_name: Set(name.to_vec()), - raw_symbol: Set(symbol.to_vec()), - ..Default::default() - }; + upsert_asset_data( + txn, + id_bytes.to_vec(), + chain_mutability, + chain_data_json, + uri.clone(), + Mutability::Mutable, + JsonValue::String("processing".to_string()), + slot_i, + Some(true), + name.to_vec(), + symbol.to_vec(), + seq as i64, + ) + .await?; - let mut query = asset_data::Entity::insert(data) - .on_conflict( - OnConflict::columns([asset_data::Column::Id]) - .update_columns([ - asset_data::Column::ChainDataMutability, - asset_data::Column::ChainData, - asset_data::Column::MetadataUrl, - asset_data::Column::MetadataMutability, - asset_data::Column::SlotUpdated, - asset_data::Column::Reindex, - asset_data::Column::RawName, - asset_data::Column::RawSymbol, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; // Insert into `asset` table. let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -136,20 +121,17 @@ where id: Set(id_bytes.to_vec()), owner_type: Set(OwnerType::Single), frozen: Set(false), - tree_id: Set(Some(tree_id.clone())), specification_version: Set(Some(SpecificationVersions::V1)), specification_asset_class: Set(Some(SpecificationAssetClass::Nft)), - nonce: Set(Some(nonce as i64)), royalty_target_type: Set(RoyaltyTargetType::Creators), royalty_target: Set(None), - royalty_amount: Set(metadata.seller_fee_basis_points as i32), //basis points asset_data: Set(Some(id_bytes.to_vec())), slot_updated: Set(Some(slot_i)), ..Default::default() }; // Upsert asset table base info. - let mut query = asset::Entity::insert(asset_model) + let query = asset::Entity::insert(asset_model) .on_conflict( OnConflict::columns([asset::Column::Id]) .update_columns([ @@ -159,22 +141,25 @@ where asset::Column::SpecificationAssetClass, asset::Column::RoyaltyTargetType, asset::Column::RoyaltyTarget, - asset::Column::RoyaltyAmount, asset::Column::AssetData, + asset::Column::SlotUpdated, ]) .to_owned(), ) .build(DbBackend::Postgres); - // Do not overwrite changes that happened after the asset was decompressed. - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated OR asset.slot_updated IS NULL", - query.sql - ); txn.execute(query) .await .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; + upsert_asset_with_royalty_amount( + txn, + id_bytes.to_vec(), + metadata.seller_fee_basis_points as i32, + seq as i64, + ) + .await?; + // Partial update of asset table with just compression info elements. upsert_asset_with_compression_info( txn, @@ -197,7 +182,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; @@ -231,81 +215,15 @@ where .await .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; - // Insert into `asset_creators` table. - let creators = &metadata.creators; - if !creators.is_empty() { - // Vec to hold base creator information. - let mut db_creator_infos = Vec::with_capacity(creators.len()); - - // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. - let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); - - // Set to prevent duplicates. - let mut creators_set = HashSet::new(); - - for (i, c) in creators.iter().enumerate() { - if creators_set.contains(&c.address) { - continue; - } - db_creator_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - position: Set(i as i16), - share: Set(c.share as i32), - slot_updated: Set(Some(slot_i)), - ..Default::default() - }); - - db_creator_verified_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - verified: Set(c.verified), - seq: Set(Some(seq as i64)), - ..Default::default() - }); - - creators_set.insert(c.address); - } - - // This statement will update base information for each creator. - let query = asset_creators::Entity::insert_many(db_creator_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Position, - asset_creators::Column::Share, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - - // This statement will update whether the creator is verified and the `seq` - // number. `seq` is used to protect the `verified` field, allowing for `mint` - // and `verifyCreator` to be processed out of order. - let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Verified, - asset_creators::Column::Seq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.seq > asset_creators.seq OR asset_creators.seq IS NULL", - query.sql - ); - txn.execute(query).await?; - } + // Upsert into `asset_creators` table. + upsert_creators( + txn, + id_bytes.to_vec(), + &metadata.creators, + slot_i, + seq as i64, + ) + .await?; // Insert into `asset_authority` table. let model = asset_authority::ActiveModel { @@ -350,6 +268,7 @@ where let mut task = DownloadMetadata { asset_data_id: id_bytes.to_vec(), uri: metadata.uri.clone(), + seq: seq as i64, created_at: Some(Utc::now().naive_utc()), }; task.sanitize(); diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index bcc102c0b..5ed21dbf7 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -17,6 +17,7 @@ mod delegate; mod mint_v1; mod redeem; mod transfer; +mod update_metadata; pub use db::*; @@ -53,7 +54,8 @@ where InstructionName::VerifyCollection => "VerifyCollection", InstructionName::UnverifyCollection => "UnverifyCollection", InstructionName::SetAndVerifyCollection => "SetAndVerifyCollection", - InstructionName::SetDecompressibleState | InstructionName::UpdateMetadata => todo!(), + InstructionName::SetDecompressibleState => "SetDecompressibleState", + InstructionName::UpdateMetadata => "UpdateMetadata", }; info!("BGUM instruction txn={:?}: {:?}", ix_str, bundle.txn_id); @@ -94,6 +96,15 @@ where | InstructionName::SetAndVerifyCollection => { collection_verification::process(parsing_result, bundle, txn, cl_audits).await?; } + InstructionName::SetDecompressibleState => (), // Nothing to index. + InstructionName::UpdateMetadata => { + let task = + update_metadata::update_metadata(parsing_result, bundle, txn, cl_audits).await?; + + if let Some(t) = task { + task_manager.send(t)?; + } + } _ => debug!("Bubblegum: Not Implemented Instruction"), } Ok(()) diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index b9b7f2c27..3dc0bc999 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -4,7 +4,8 @@ use log::debug; use crate::{ error::IngesterError, program_transformers::bubblegum::{ - save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, upsert_asset_with_seq, + asset_was_decompressed, save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, + upsert_asset_with_seq, }, }; use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; @@ -32,6 +33,12 @@ where ); debug!("Indexing redeem for asset id: {:?}", asset_id); let id_bytes = asset_id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; @@ -45,7 +52,6 @@ where [0; 32], [0; 32], seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index 573f33a8f..07abbe523 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -2,8 +2,8 @@ use super::save_changelog_event; use crate::{ error::IngesterError, program_transformers::bubblegum::{ - upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, - upsert_asset_with_seq, + asset_was_decompressed, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, }, }; use blockbuster::{ @@ -32,6 +32,12 @@ where .. } => { let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(()); + } + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate || delegate.to_bytes() == [0; 32] { None @@ -51,7 +57,6 @@ where le.schema.data_hash(), le.schema.creator_hash(), seq as i64, - false, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs new file mode 100644 index 000000000..835f9cd0c --- /dev/null +++ b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs @@ -0,0 +1,210 @@ +use crate::{ + error::IngesterError, + program_transformers::bubblegum::{ + asset_was_decompressed, save_changelog_event, upsert_asset_data, + upsert_asset_with_leaf_info, upsert_asset_with_royalty_amount, upsert_asset_with_seq, + upsert_creators, + }, + tasks::{DownloadMetadata, IntoTaskData, TaskData}, +}; +use blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, + token_metadata::state::{TokenStandard, UseMethod, Uses}, +}; +use chrono::Utc; +use digital_asset_types::{ + dao::{ + asset_creators, + sea_orm_active_enums::{ChainMutability, Mutability}, + }, + json::ChainDataV1, +}; +use log::warn; +use num_traits::FromPrimitive; +use sea_orm::{entity::*, query::*, ConnectionTrait, EntityTrait, JsonValue}; + +pub async fn update_metadata<'c, T>( + parsing_result: &BubblegumInstruction, + bundle: &InstructionBundle<'c>, + txn: &'c T, + cl_audits: bool, +) -> Result, IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + if let ( + Some(le), + Some(cl), + Some(Payload::UpdateMetadata { + current_metadata, + update_args, + }), + ) = ( + &parsing_result.leaf_update, + &parsing_result.tree_update, + &parsing_result.payload, + ) { + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + + #[allow(unreachable_patterns)] + return match le.schema { + LeafSchema::V1 { id, nonce, .. } => { + let id_bytes = id.to_bytes(); + + // First check to see if this asset has been decompressed and if so do not update. + if asset_was_decompressed(txn, id_bytes.to_vec()).await? { + return Ok(None); + } + + let slot_i = bundle.slot as i64; + + let uri = if let Some(uri) = &update_args.uri { + uri.replace('\0', "") + } else { + current_metadata.uri.replace('\0', "") + }; + if uri.is_empty() { + return Err(IngesterError::DeserializationError( + "URI is empty".to_string(), + )); + } + + let name = if let Some(name) = update_args.name.clone() { + name + } else { + current_metadata.name.clone() + }; + + let symbol = if let Some(symbol) = update_args.symbol.clone() { + symbol + } else { + current_metadata.symbol.clone() + }; + + let primary_sale_happened = + if let Some(primary_sale_happened) = update_args.primary_sale_happened { + primary_sale_happened + } else { + current_metadata.primary_sale_happened + }; + + let mut chain_data = ChainDataV1 { + name: name.clone(), + symbol: symbol.clone(), + edition_nonce: current_metadata.edition_nonce, + primary_sale_happened, + token_standard: Some(TokenStandard::NonFungible), + uses: current_metadata.uses.clone().map(|u| Uses { + use_method: UseMethod::from_u8(u.use_method as u8).unwrap(), + remaining: u.remaining, + total: u.total, + }), + }; + chain_data.sanitize(); + let chain_data_json = serde_json::to_value(chain_data) + .map_err(|e| IngesterError::DeserializationError(e.to_string()))?; + + let is_mutable = if let Some(is_mutable) = update_args.is_mutable { + is_mutable + } else { + current_metadata.is_mutable + }; + + let chain_mutability = if is_mutable { + ChainMutability::Mutable + } else { + ChainMutability::Immutable + }; + + upsert_asset_data( + txn, + id_bytes.to_vec(), + chain_mutability, + chain_data_json, + uri.clone(), + Mutability::Mutable, + JsonValue::String("processing".to_string()), + slot_i, + Some(true), + name.into_bytes().to_vec(), + symbol.into_bytes().to_vec(), + seq as i64, + ) + .await?; + + // Partial update of asset table with just royalty amount (seller fee basis points). + let seller_fee_basis_points = + if let Some(seller_fee_basis_points) = update_args.seller_fee_basis_points { + seller_fee_basis_points + } else { + current_metadata.seller_fee_basis_points + }; + + upsert_asset_with_royalty_amount( + txn, + id_bytes.to_vec(), + seller_fee_basis_points as i32, + seq as i64, + ) + .await?; + + // Partial update of asset table with just leaf. + let tree_id = bundle.keys.get(5).unwrap().0.to_vec(); + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + nonce as i64, + tree_id, + le.leaf_hash.to_vec(), + le.schema.data_hash(), + le.schema.creator_hash(), + seq as i64, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + + // Update `asset_creators` table. + + // Delete any existing creators. + asset_creators::Entity::delete_many() + .filter( + Condition::all().add(asset_creators::Column::AssetId.eq(id_bytes.to_vec())), + ) + .exec(txn) + .await?; + + // Upsert into `asset_creators` table. + let creators = if let Some(creators) = &update_args.creators { + creators + } else { + ¤t_metadata.creators + }; + upsert_creators(txn, id_bytes.to_vec(), creators, slot_i, seq as i64).await?; + + if uri.is_empty() { + warn!( + "URI is empty for mint {}. Skipping background task.", + bs58::encode(id).into_string() + ); + return Ok(None); + } + + let mut task = DownloadMetadata { + asset_data_id: id_bytes.to_vec(), + uri, + seq: seq as i64, + created_at: Some(Utc::now().naive_utc()), + }; + task.sanitize(); + let t = task.into_task_data()?; + Ok(Some(t)) + } + _ => Err(IngesterError::NotImplemented), + }; + } + Err(IngesterError::ParsingError( + "Ix not parsed correctly".to_string(), + )) +} diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs index 061a2ac9b..70fdaee63 100644 --- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs +++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs @@ -23,7 +23,7 @@ use num_traits::FromPrimitive; use plerkle_serialization::Pubkey as FBPubkey; use sea_orm::{ entity::*, query::*, sea_query::OnConflict, ActiveValue::Set, ConnectionTrait, DbBackend, - DbErr, EntityTrait, FromQueryResult, JoinType, JsonValue, + DbErr, EntityTrait, JsonValue, }; use std::collections::HashSet; @@ -147,8 +147,10 @@ pub async fn save_v1_asset( slot_updated: Set(slot_i), reindex: Set(Some(true)), id: Set(id.to_vec()), - raw_name: Set(name.to_vec()), - raw_symbol: Set(symbol.to_vec()), + raw_name: Set(Some(name.to_vec())), + raw_symbol: Set(Some(symbol.to_vec())), + download_metadata_seq: Set(Some(0)), + ..Default::default() }; let txn = conn.begin().await?; let mut query = asset_data::Entity::insert(asset_data_model) @@ -274,13 +276,14 @@ pub async fn save_v1_asset( txn.execute(query) .await .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; + if let Some(c) = &metadata.collection { let model = asset_grouping::ActiveModel { asset_id: Set(id.to_vec()), group_key: Set("collection".to_string()), group_value: Set(Some(c.key.to_string())), - verified: Set(Some(c.verified)), - seq: Set(None), + verified: Set(c.verified), + group_info_seq: Set(None), slot_updated: Set(Some(slot_i)), ..Default::default() }; @@ -291,10 +294,10 @@ pub async fn save_v1_asset( asset_grouping::Column::GroupKey, ]) .update_columns([ - asset_grouping::Column::GroupKey, asset_grouping::Column::GroupValue, - asset_grouping::Column::Seq, + asset_grouping::Column::Verified, asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, ]) .to_owned(), ) @@ -309,6 +312,7 @@ pub async fn save_v1_asset( } txn.commit().await?; let creators = data.creators.unwrap_or_default(); + if !creators.is_empty() { let mut creators_set = HashSet::new(); let existing_creators: Vec = asset_creators::Entity::find() @@ -319,6 +323,7 @@ pub async fn save_v1_asset( ) .all(conn) .await?; + if !existing_creators.is_empty() { let mut db_creators = Vec::with_capacity(creators.len()); for (i, c) in creators.into_iter().enumerate() { @@ -330,7 +335,6 @@ pub async fn save_v1_asset( creator: Set(c.address.to_bytes().to_vec()), share: Set(c.share as i32), verified: Set(c.verified), - seq: Set(Some(0)), slot_updated: Set(Some(slot_i)), position: Set(i as i16), ..Default::default() @@ -346,7 +350,8 @@ pub async fn save_v1_asset( ) .exec(&txn) .await?; - if db_creators.len() > 0 { + + if !db_creators.is_empty() { let mut query = asset_creators::Entity::insert_many(db_creators) .on_conflict( OnConflict::columns([ @@ -357,7 +362,7 @@ pub async fn save_v1_asset( asset_creators::Column::Creator, asset_creators::Column::Share, asset_creators::Column::Verified, - asset_creators::Column::Seq, + asset_creators::Column::VerifiedSeq, asset_creators::Column::SlotUpdated, ]) .to_owned(), @@ -385,6 +390,7 @@ pub async fn save_v1_asset( let mut task = DownloadMetadata { asset_data_id: id.to_vec(), uri, + seq: 0, created_at: Some(Utc::now().naive_utc()), }; task.sanitize(); diff --git a/nft_ingester/src/tasks/common/mod.rs b/nft_ingester/src/tasks/common/mod.rs index bf39e455d..3b533c2f7 100644 --- a/nft_ingester/src/tasks/common/mod.rs +++ b/nft_ingester/src/tasks/common/mod.rs @@ -18,6 +18,7 @@ const TASK_NAME: &str = "DownloadMetadata"; pub struct DownloadMetadata { pub asset_data_id: Vec, pub uri: String, + pub seq: i64, #[serde(skip_serializing)] pub created_at: Option, } @@ -110,24 +111,29 @@ impl BgTask for DownloadMetadataTask { id: Unchanged(download_metadata.asset_data_id.clone()), metadata: Set(body), reindex: Set(Some(false)), + download_metadata_seq: Set(Some(download_metadata.seq)), ..Default::default() }; debug!( "download metadata for {:?}", bs58::encode(download_metadata.asset_data_id.clone()).into_string() ); - asset_data::Entity::update(model) - .filter(asset_data::Column::Id.eq(download_metadata.asset_data_id.clone())) - .exec(db) - .await - .map(|_| ()) - .map_err(|db| { - IngesterError::TaskManagerError(format!( - "Database error with {}, error: {}", - self.name(), - db - )) - })?; + let mut query = asset_data::Entity::update(model) + .filter(asset_data::Column::Id.eq(download_metadata.asset_data_id.clone())); + if download_metadata.seq != 0 { + query = query.filter( + Condition::any() + .add(asset_data::Column::DownloadMetadataSeq.lte(download_metadata.seq)) + .add(asset_data::Column::DownloadMetadataSeq.is_null()), + ); + } + query.exec(db).await.map(|_| ()).map_err(|db| { + IngesterError::TaskManagerError(format!( + "Database error with {}, error: {}", + self.name(), + db + )) + })?; if meta_url.is_err() { return Err(IngesterError::UnrecoverableTaskError(format!( diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 469626eac..8142c3012 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.70.0" \ No newline at end of file +channel = "1.73.0" diff --git a/tools/bgtask_creator/src/main.rs b/tools/bgtask_creator/src/main.rs index a08dd87d1..17d1bba0c 100644 --- a/tools/bgtask_creator/src/main.rs +++ b/tools/bgtask_creator/src/main.rs @@ -322,6 +322,7 @@ WHERE let mut task = DownloadMetadata { asset_data_id: asset.id, uri: asset.metadata_url, + seq: 0, created_at: Some(Utc::now().naive_utc()), };