diff --git a/digital_asset_types/src/dao/extensions/asset_grouping.rs b/digital_asset_types/src/dao/extensions/asset_grouping.rs index 1d1bce87a..49b091efb 100644 --- a/digital_asset_types/src/dao/extensions/asset_grouping.rs +++ b/digital_asset_types/src/dao/extensions/asset_grouping.rs @@ -1,10 +1,11 @@ use sea_orm::{EntityTrait, EnumIter, Related, RelationDef, RelationTrait}; -use crate::dao::{asset, asset_grouping}; +use crate::dao::{asset, asset_authority, asset_grouping}; #[derive(Copy, Clone, Debug, EnumIter)] pub enum Relation { Asset, + AssetAuthority, } impl RelationTrait for Relation { @@ -14,6 +15,10 @@ impl RelationTrait for Relation { .from(asset_grouping::Column::AssetId) .to(asset::Column::Id) .into(), + Self::AssetAuthority => asset_grouping::Entity::belongs_to(asset_authority::Entity) + .from(asset_grouping::Column::AssetId) + .to(asset_authority::Column::Id) + .into(), } } } @@ -23,3 +28,9 @@ impl Related for asset_grouping::Entity { Relation::Asset.def() } } + +impl Related for asset_grouping::Entity { + fn to() -> RelationDef { + Relation::AssetAuthority.def() + } +} diff --git a/program_transformers/src/mpl_core_program/v1_asset.rs b/program_transformers/src/mpl_core_program/v1_asset.rs index 5cd408562..0b69cf9da 100644 --- a/program_transformers/src/mpl_core_program/v1_asset.rs +++ b/program_transformers/src/mpl_core_program/v1_asset.rs @@ -26,7 +26,8 @@ use { entity::{ActiveValue, ColumnTrait, EntityTrait}, query::{JsonValue, QueryFilter, QueryTrait}, sea_query::query::OnConflict, - ConnectionTrait, DbBackend, TransactionTrait, + sea_query::Expr, + ConnectionTrait, CursorTrait, DbBackend, TransactionTrait, }, serde_json::{value::Value, Map}, solana_sdk::pubkey::Pubkey, @@ -76,8 +77,8 @@ pub async fn save_v1_asset( // Note: This indexes both Core Assets and Core Collections. let asset = match account_data { - MplCoreAccountData::Asset(indexable_asset) => indexable_asset, - MplCoreAccountData::Collection(indexable_asset) => indexable_asset, + MplCoreAccountData::Asset(indexable_asset) + | MplCoreAccountData::Collection(indexable_asset) => indexable_asset, _ => return Err(ProgramTransformerError::NotImplemented), }; @@ -133,6 +134,11 @@ pub async fn save_v1_asset( .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + if matches!(account_data, MplCoreAccountData::Collection(_)) { + update_group_asset_authorities(conn, id_vec.clone(), update_authority.clone(), slot_i) + .await?; + } + //----------------------- // asset_data table //----------------------- @@ -361,7 +367,7 @@ pub async fn save_v1_asset( ) .build(DbBackend::Postgres); query.sql = format!( - "{} WHERE excluded.slot_updated > asset_grouping.slot_updated", + "{} WHERE excluded.slot_updated >= asset_grouping.slot_updated", query.sql ); txn.execute(query) @@ -536,3 +542,61 @@ fn convert_keys_to_snake_case(plugins_json: &mut Value) { _ => {} } } + +/// Updates the `asset_authority` for all assets that are part of a collection in a batch. +/// This function performs a cursor-based paginated read and batch update. +async fn update_group_asset_authorities( + conn: &T, + group_value: Vec, + authority: Vec, + slot: i64, +) -> ProgramTransformerResult<()> { + let mut after = None; + + let group_key = "collection".to_string(); + let group_value = bs58::encode(group_value).into_string(); + + let mut query = asset_grouping::Entity::find() + .filter(asset_grouping::Column::GroupKey.eq(group_key)) + .filter(asset_grouping::Column::GroupValue.eq(group_value)) + .cursor_by(asset_grouping::Column::AssetId); + let mut query = query.first(1_000); + + loop { + if let Some(after) = after.clone() { + query = query.after(after); + } + + let entries = query.all(conn).await?; + + if entries.is_empty() { + break; + } + + let asset_ids = entries + .clone() + .into_iter() + .map(|entry| entry.asset_id) + .collect::>(); + + asset_authority::Entity::update_many() + .col_expr( + asset_authority::Column::Authority, + Expr::value(authority.clone()), + ) + .col_expr(asset_authority::Column::SlotUpdated, Expr::value(slot)) + .filter(asset_authority::Column::AssetId.is_in(asset_ids)) + .filter(asset_authority::Column::Authority.ne(authority.clone())) + .filter(Expr::cust_with_values( + "asset_authority.slot_updated < $1", + vec![slot], + )) + .exec(conn) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + + after = entries.last().map(|entry| entry.asset_id.clone()); + } + + Ok(()) +}