From c5859d2778aacbafa813573a2fe528861a987fbb Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Wed, 22 Jan 2025 18:55:38 +0100 Subject: [PATCH 1/6] MTG-1242 Fix build search query for Fungible tokens. MTG-1031 Fix test for fungible tokens. --- nft_ingester/src/api/dapi/search_assets.rs | 19 +++--- nft_ingester/src/bin/synchronizer/main.rs | 4 +- nft_ingester/src/bin/synchronizer/readme.md | 39 +++++++++++ nft_ingester/src/index_syncronizer.rs | 68 ++++++++++--------- .../account_based/token_updates_processor.rs | 1 + nft_ingester/tests/api_tests.rs | 17 ++++- postgre-client/src/asset_filter_client.rs | 16 +++++ tests/setup/src/rocks.rs | 4 ++ 8 files changed, 124 insertions(+), 44 deletions(-) create mode 100644 nft_ingester/src/bin/synchronizer/readme.md diff --git a/nft_ingester/src/api/dapi/search_assets.rs b/nft_ingester/src/api/dapi/search_assets.rs index e4869a8c..7e4fa8d3 100644 --- a/nft_ingester/src/api/dapi/search_assets.rs +++ b/nft_ingester/src/api/dapi/search_assets.rs @@ -108,7 +108,7 @@ async fn fetch_assets< JP: JsonPersister + Sync + Send + 'static, PPC: ProcessingPossibilityChecker + Sync + Send + 'static, >( - index_client: Arc, + pg_index_client: Arc, rocks_db: Arc, filter: SearchAssetsQuery, sort_by: AssetSorting, @@ -145,7 +145,7 @@ async fn fetch_assets< } }; - let keys = index_client + let keys = pg_index_client .get_asset_pubkeys_filtered(filter, &sort_by.into(), limit, page, before, after, &options) .await .map_err(|e| { @@ -206,13 +206,14 @@ async fn fetch_assets< }; let mut grand_total = None; if options.show_grand_total { - grand_total = Some(index_client.get_grand_total(filter, &options).await.map_err(|e| { - if e.to_string().contains("statement timeout") { - StorageError::QueryTimedOut - } else { - StorageError::Common(e.to_string()) - } - })?) + grand_total = + Some(pg_index_client.get_grand_total(filter, &options).await.map_err(|e| { + if e.to_string().contains("statement timeout") { + StorageError::QueryTimedOut + } else { + StorageError::Common(e.to_string()) + } + })?) } let resp = AssetList { diff --git a/nft_ingester/src/bin/synchronizer/main.rs b/nft_ingester/src/bin/synchronizer/main.rs index 97af67ab..b89c6f90 100644 --- a/nft_ingester/src/bin/synchronizer/main.rs +++ b/nft_ingester/src/bin/synchronizer/main.rs @@ -43,7 +43,7 @@ pub async fn main() -> Result<(), IngesterError> { red_metrics.register(&mut registry); metrics_utils::utils::start_metrics(registry, args.metrics_port).await; - let index_storage = Arc::new( + let pg_index_storage = Arc::new( init_index_storage_with_migration( &args.pg_database_url, args.pg_max_db_connections, @@ -91,7 +91,7 @@ pub async fn main() -> Result<(), IngesterError> { let synchronizer = Arc::new(Synchronizer::new( rocks_storage.clone(), - index_storage.clone(), + pg_index_storage.clone(), args.dump_synchronizer_batch_size, args.rocks_dump_path.clone(), metrics.clone(), diff --git a/nft_ingester/src/bin/synchronizer/readme.md b/nft_ingester/src/bin/synchronizer/readme.md new file mode 100644 index 00000000..aaa71925 --- /dev/null +++ b/nft_ingester/src/bin/synchronizer/readme.md @@ -0,0 +1,39 @@ +## Building the Project + +Clone the repository and navigate to the project directory: + +```bash +git clone https://github.com/metaplex-foundation/aura.git +cd nft_ingester +``` + +Build the project using Cargo: + +```bash +cargo build --bin synchronizer +``` + +## Running the Service + +Run to see the full list of available arguments: + +```bash +./target/debug/synchronizer -h +``` + +Run Synchronizer with minimum functionality. + +```bash +./target/debug/synchronizer \ + --pg-database-url postgres://solana:solana@localhost:5432/aura_db +``` + + +## Tips for local debugging/testing + +To increase log verbosity, set the log level to debug: +` --log-level debug` + +To fill the local Redis with messages you can use any other Redis that is available. +There is a script that will copy existing messages from one Redis and forward copies of these messages to another one. +`nft_ingester/scripts/transfer_redis_messages.py` \ No newline at end of file diff --git a/nft_ingester/src/index_syncronizer.rs b/nft_ingester/src/index_syncronizer.rs index 53e746ff..d95693ab 100644 --- a/nft_ingester/src/index_syncronizer.rs +++ b/nft_ingester/src/index_syncronizer.rs @@ -34,8 +34,8 @@ where T: AssetIndexSourceStorage, U: AssetIndexStorage, { - primary_storage: Arc, - index_storage: Arc, + rocks_primary_storage: Arc, + pg_index_storage: Arc, dump_synchronizer_batch_size: usize, dump_path: String, metrics: Arc, @@ -49,16 +49,16 @@ where { #[allow(clippy::too_many_arguments)] pub fn new( - primary_storage: Arc, - index_storage: Arc, + rocks_primary_storage: Arc, + pg_index_storage: Arc, dump_synchronizer_batch_size: usize, dump_path: String, metrics: Arc, parallel_tasks: usize, ) -> Self { Synchronizer { - primary_storage, - index_storage, + rocks_primary_storage, + pg_index_storage, dump_synchronizer_batch_size, dump_path, metrics, @@ -109,16 +109,16 @@ where run_full_sync_threshold: i64, asset_type: AssetType, ) -> Result { - let last_indexed_key = self.index_storage.fetch_last_synced_id(asset_type).await?; + let last_indexed_key = self.pg_index_storage.fetch_last_synced_id(asset_type).await?; let last_indexed_key = last_indexed_key.map(decode_u64x2_pubkey).transpose()?; // Fetch the last known key from the primary storage let (last_key, prefix) = match asset_type { AssetType::NonFungible => { - (self.primary_storage.last_known_nft_asset_updated_key()?, "nft") + (self.rocks_primary_storage.last_known_nft_asset_updated_key()?, "nft") }, AssetType::Fungible => { - (self.primary_storage.last_known_fungible_asset_updated_key()?, "fungible") + (self.rocks_primary_storage.last_known_fungible_asset_updated_key()?, "fungible") }, }; let Some(last_key) = last_key else { @@ -199,9 +199,10 @@ where self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await }, SyncStatus::RegularSyncRequired(state) => { + tracing::info!("Regular sync required for nft asset"); self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await }, - SyncStatus::NoSyncRequired => Ok(()), + SyncStatus::NoSyncRequired => Ok(tracing::info!("No sync required for nft asset")), } } @@ -221,10 +222,11 @@ where .await }, SyncStatus::RegularSyncRequired(state) => { + tracing::info!("Regular sync required for fungible asset"); self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) .await }, - SyncStatus::NoSyncRequired => Ok(()), + SyncStatus::NoSyncRequired => Ok(tracing::info!("No sync required for fungible asset")), } } @@ -234,8 +236,12 @@ where asset_type: AssetType, ) -> Result<(), IngesterError> { let last_known_key = match asset_type { - AssetType::NonFungible => self.primary_storage.last_known_nft_asset_updated_key()?, - AssetType::Fungible => self.primary_storage.last_known_fungible_asset_updated_key()?, + AssetType::NonFungible => { + self.rocks_primary_storage.last_known_nft_asset_updated_key()? + }, + AssetType::Fungible => { + self.rocks_primary_storage.last_known_fungible_asset_updated_key()? + }, }; let Some(last_known_key) = last_known_key else { return Ok(()); @@ -273,7 +279,7 @@ where num_shards: u64, ) -> Result<(), IngesterError> { let base_path = std::path::Path::new(self.dump_path.as_str()); - self.index_storage.destructive_prep_to_batch_nft_load().await?; + self.pg_index_storage.destructive_prep_to_batch_nft_load().await?; let shards = shard_pubkeys(num_shards); type ResultWithPaths = Result<(usize, String, String, String, String), String>; @@ -322,7 +328,7 @@ where let end = *end; let shutdown_rx = rx.resubscribe(); let metrics = self.metrics.clone(); - let rocks_storage = self.primary_storage.clone(); + let rocks_storage = self.rocks_primary_storage.clone(); tasks.spawn_blocking(move || { let res = rocks_storage.dump_nft_csv( assets_file, @@ -350,7 +356,7 @@ where while let Some(task) = tasks.join_next().await { let (_cnt, assets_path, creators_path, authorities_path, metadata_path) = task.map_err(|e| e.to_string())??; - let index_storage = self.index_storage.clone(); + let index_storage = self.pg_index_storage.clone(); let semaphore = semaphore.clone(); index_tasks.spawn(async move { index_storage @@ -368,9 +374,9 @@ where task.map_err(|e| e.to_string())?.map_err(|e| e.to_string())?; } tracing::info!("All NFT assets loads complete. Finalizing the batch load"); - self.index_storage.finalize_batch_nft_load().await?; + self.pg_index_storage.finalize_batch_nft_load().await?; tracing::info!("Batch load finalized for NFTs"); - self.index_storage + self.pg_index_storage .update_last_synced_key(last_included_rocks_key, AssetType::NonFungible) .await?; Ok(()) @@ -383,7 +389,7 @@ where num_shards: u64, ) -> Result<(), IngesterError> { let base_path = std::path::Path::new(self.dump_path.as_str()); - self.index_storage.destructive_prep_to_batch_fungible_load().await?; + self.pg_index_storage.destructive_prep_to_batch_fungible_load().await?; let shards = shard_pubkeys(num_shards); let mut tasks: JoinSet> = JoinSet::new(); @@ -405,7 +411,7 @@ where let end = *end; let shutdown_rx = rx.resubscribe(); let metrics = self.metrics.clone(); - let rocks_storage = self.primary_storage.clone(); + let rocks_storage = self.rocks_primary_storage.clone(); tasks.spawn_blocking(move || { let res = rocks_storage.dump_fungible_csv( @@ -423,7 +429,7 @@ where let semaphore = Arc::new(tokio::sync::Semaphore::new(1)); while let Some(task) = tasks.join_next().await { let (_cnt, fungible_tokens_path) = task.map_err(|e| e.to_string())??; - let index_storage = self.index_storage.clone(); + let index_storage = self.pg_index_storage.clone(); let semaphore = semaphore.clone(); index_tasks.spawn(async move { index_storage @@ -435,9 +441,9 @@ where task.map_err(|e| e.to_string())?.map_err(|e| e.to_string())?; } tracing::info!("All token accounts/fungibles loads complete. Finalizing the batch load"); - self.index_storage.finalize_batch_fungible_load().await?; + self.pg_index_storage.finalize_batch_fungible_load().await?; tracing::info!("Batch load finalized for fungibles"); - self.index_storage + self.pg_index_storage .update_last_synced_key(last_included_rocks_key, AssetType::Fungible) .await?; Ok(()) @@ -461,7 +467,7 @@ where break; } let (updated_keys, last_included_key) = - self.primary_storage.fetch_fungible_asset_updated_keys( + self.rocks_primary_storage.fetch_fungible_asset_updated_keys( starting_key.clone(), Some(last_key.clone()), self.dump_synchronizer_batch_size, @@ -482,8 +488,8 @@ where // Update the asset indexes in the index storage // let last_included_key = AssetsUpdateIdx::encode_key(last_included_key); last_included_rocks_key = Some(last_included_key); - let primary_storage = self.primary_storage.clone(); - let index_storage = self.index_storage.clone(); + let primary_storage = self.rocks_primary_storage.clone(); + let index_storage = self.pg_index_storage.clone(); let metrics = self.metrics.clone(); tasks.spawn(async move { Self::syncronize_fungible_batch( @@ -518,7 +524,7 @@ where last_included_rocks_key.slot, last_included_rocks_key.pubkey, ); - self.index_storage + self.pg_index_storage .update_last_synced_key(&last_included_rocks_key, AssetType::Fungible) .await?; } else { @@ -550,7 +556,7 @@ where break; } let (updated_keys, last_included_key) = - self.primary_storage.fetch_nft_asset_updated_keys( + self.rocks_primary_storage.fetch_nft_asset_updated_keys( starting_key.clone(), Some(last_key.clone()), self.dump_synchronizer_batch_size, @@ -571,8 +577,8 @@ where // Update the asset indexes in the index storage // let last_included_key = AssetsUpdateIdx::encode_key(last_included_key); last_included_rocks_key = Some(last_included_key); - let primary_storage = self.primary_storage.clone(); - let index_storage = self.index_storage.clone(); + let primary_storage = self.rocks_primary_storage.clone(); + let index_storage = self.pg_index_storage.clone(); let metrics = self.metrics.clone(); tasks.spawn(async move { Self::syncronize_nft_batch( @@ -607,7 +613,7 @@ where last_included_rocks_key.slot, last_included_rocks_key.pubkey, ); - self.index_storage + self.pg_index_storage .update_last_synced_key(&last_included_rocks_key, AssetType::NonFungible) .await?; } else { diff --git a/nft_ingester/src/processors/account_based/token_updates_processor.rs b/nft_ingester/src/processors/account_based/token_updates_processor.rs index 614ad96a..14d170a1 100644 --- a/nft_ingester/src/processors/account_based/token_updates_processor.rs +++ b/nft_ingester/src/processors/account_based/token_updates_processor.rs @@ -143,6 +143,7 @@ impl TokenAccountsProcessor { .map_err(|e| StorageError::Common(e.to_string())) }) .transpose()?; + let asset_dynamic_details = AssetDynamicDetails { pubkey: mint.pubkey, supply: Some(Updated::new( diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 3e47a114..221c5c38 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -72,6 +72,7 @@ mod tests { Storage, ToFlatbuffersConverter, }; use serde_json::{json, Value}; + use setup::rocks::RocksTestEnvironmentSetup; use solana_program::pubkey::Pubkey; use solana_sdk::signature::Signature; use spl_pod::{ @@ -90,6 +91,7 @@ mod tests { 6, 155, 136, 87, 254, 171, 129, 132, 251, 104, 127, 99, 70, 24, 192, 53, 218, 196, 57, 220, 26, 235, 59, 85, 152, 160, 240, 0, 0, 0, 0, 1, ]); + #[tokio::test] #[tracing_test::traced_test] async fn test_search_assets() { @@ -3434,11 +3436,22 @@ mod tests { } #[tokio::test(flavor = "multi_thread")] - #[ignore = "FIXME: search_assets result returns 0 items"] async fn test_writing_fungible_into_dedicated_table() { let cnt = 100; let cli = Cli::default(); - let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; + + let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures( + &cli, + cnt, + 100, + RocksTestEnvironmentSetup::static_data_for_fungible, + RocksTestEnvironmentSetup::with_authority, + RocksTestEnvironmentSetup::test_owner, + RocksTestEnvironmentSetup::dynamic_data, + RocksTestEnvironmentSetup::collection_without_authority, + ) + .await; + let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( env.rocks_env.storage.clone(), env.pg_env.client.clone(), diff --git a/postgre-client/src/asset_filter_client.rs b/postgre-client/src/asset_filter_client.rs index 68177473..af7d43c7 100644 --- a/postgre-client/src/asset_filter_client.rs +++ b/postgre-client/src/asset_filter_client.rs @@ -209,6 +209,21 @@ fn add_filter_clause<'a>( group_clause_required = true; } + if let Some(ref token_type) = filter.token_type { + if token_type == &TokenType::All && filter.owner_address.is_some() { + query_builder.push( + " LEFT JOIN fungible_tokens ON assets_v3.ast_pubkey = fungible_tokens.fbt_asset ", + ); + group_clause_required = true; + } + if token_type == &TokenType::Fungible && filter.owner_address.is_some() { + query_builder.push( + " INNER JOIN fungible_tokens ON assets_v3.ast_pubkey = fungible_tokens.fbt_asset ", + ); + group_clause_required = true; + } + } + // todo: if we implement the additional params like negata and all/any switch, the true part and the AND prefix should be refactored query_builder.push(" WHERE TRUE "); if let Some(spec_version) = &filter.specification_version { @@ -463,6 +478,7 @@ impl AssetPubkeyFilteredFetcher for PgClient { ) -> Result, IndexDbError> { let (mut query_builder, order_reversed) = Self::build_search_query(filter, order, limit, page, before, after, options)?; + let query = query_builder.build_query_as::(); debug!("SEARCH QUERY: {}", &query.sql()); let start_time = chrono::Utc::now(); diff --git a/tests/setup/src/rocks.rs b/tests/setup/src/rocks.rs index 0f27b0c3..8b2fd08d 100644 --- a/tests/setup/src/rocks.rs +++ b/tests/setup/src/rocks.rs @@ -196,6 +196,10 @@ impl RocksTestEnvironmentSetup { Self::generate_static_data(pubkeys, slot, SpecificationAssetClass::Nft) } + pub fn static_data_for_fungible(pubkeys: &[Pubkey], slot: u64) -> Vec { + Self::generate_static_data(pubkeys, slot, SpecificationAssetClass::FungibleToken) + } + fn generate_static_data( pubkeys: &[Pubkey], slot: u64, From 57c484fcb66ca8d9297b8924e1bc27f877d5bc9b Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Wed, 22 Jan 2025 19:37:09 +0100 Subject: [PATCH 2/6] MTG-1242_MTG-1031 Fix issue --- nft_ingester/src/index_syncronizer.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/nft_ingester/src/index_syncronizer.rs b/nft_ingester/src/index_syncronizer.rs index d95693ab..19c0655e 100644 --- a/nft_ingester/src/index_syncronizer.rs +++ b/nft_ingester/src/index_syncronizer.rs @@ -202,7 +202,10 @@ where tracing::info!("Regular sync required for nft asset"); self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await }, - SyncStatus::NoSyncRequired => Ok(tracing::info!("No sync required for nft asset")), + SyncStatus::NoSyncRequired => { + tracing::info!("No sync required for nft asset"); + Ok(()) + }, } } @@ -226,7 +229,10 @@ where self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) .await }, - SyncStatus::NoSyncRequired => Ok(tracing::info!("No sync required for fungible asset")), + SyncStatus::NoSyncRequired => { + tracing::info!("No sync required for fungible asset"); + Ok(()) + }, } } From 57ef8879bcc714a339bc78f3888ec432b433ed57 Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Fri, 24 Jan 2025 15:21:57 +0100 Subject: [PATCH 3/6] MTG-1245 Fixed integration api test for Fungible tokens, fixed related bugs. - Fixed integration test_token_type test. - Fixed show_fungible option for SearchAssets requests. - Removed switching TokenType depending on the show_fungible option. --- entities/src/api_req_params.rs | 5 ++- nft_ingester/src/api/dapi/asset.rs | 2 + nft_ingester/src/api/dapi/search_assets.rs | 5 +-- nft_ingester/src/index_syncronizer.rs | 12 ++--- nft_ingester/tests/api_tests.rs | 52 +++++++++++++++++++--- postgre-client/src/asset_filter_client.rs | 6 --- 6 files changed, 59 insertions(+), 23 deletions(-) diff --git a/entities/src/api_req_params.rs b/entities/src/api_req_params.rs index 5c5c68ce..ea613976 100644 --- a/entities/src/api_req_params.rs +++ b/entities/src/api_req_params.rs @@ -85,6 +85,9 @@ pub struct SearchAssetsOptions { pub show_inscription: bool, #[serde(default)] pub show_zero_balance: bool, + /// This option enable adding information about token symbol and price. + #[serde(default)] + pub show_fungible: bool, } #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema, Default)] @@ -590,7 +593,7 @@ impl From for GetByMethodsOptions { show_collection_metadata: value.show_collection_metadata, show_inscription: value.show_inscription, show_zero_balance: value.show_zero_balance, - show_fungible: false, + show_fungible: value.show_fungible, } } } diff --git a/nft_ingester/src/api/dapi/asset.rs b/nft_ingester/src/api/dapi/asset.rs index 992f2f72..f30bfbbd 100644 --- a/nft_ingester/src/api/dapi/asset.rs +++ b/nft_ingester/src/api/dapi/asset.rs @@ -101,6 +101,7 @@ fn convert_rocks_asset_model( token_account: asset_selected_maps.token_accounts.get(asset_pubkey).cloned(), inscription, spl_mint: asset_selected_maps.spl_mints.get(asset_pubkey).cloned(), + token_symbol: token_symbols.get(&asset_pubkey.to_string()).cloned(), token_price: token_prices.get(&asset_pubkey.to_string()).cloned(), }) @@ -192,6 +193,7 @@ pub async fn get_by_ids< let unique_asset_ids: Vec = unique_asset_ids_map.keys().cloned().collect(); let asset_ids_string = asset_ids.clone().into_iter().map(|id| id.to_string()).collect_vec(); + let (token_prices, token_symbols) = if options.show_fungible { let token_prices_fut = token_price_fetcher.fetch_token_prices(asset_ids_string.as_slice()); let token_symbols_fut = diff --git a/nft_ingester/src/api/dapi/search_assets.rs b/nft_ingester/src/api/dapi/search_assets.rs index 7e4fa8d3..b51e7cec 100644 --- a/nft_ingester/src/api/dapi/search_assets.rs +++ b/nft_ingester/src/api/dapi/search_assets.rs @@ -36,7 +36,7 @@ pub async fn search_assets< >( index_client: Arc, rocks_db: Arc, - mut filter: SearchAssetsQuery, + filter: SearchAssetsQuery, sort_by: AssetSorting, limit: u64, page: Option, @@ -55,9 +55,6 @@ pub async fn search_assets< tree_gaps_checker: &Option>, native_mint_pubkey: &str, ) -> Result { - if options.show_fungible { - filter.token_type = Some(TokenType::All) - } let show_native_balance = options.show_native_balance; let (asset_list, native_balance) = tokio::join!( fetch_assets( diff --git a/nft_ingester/src/index_syncronizer.rs b/nft_ingester/src/index_syncronizer.rs index 19c0655e..8dd6b886 100644 --- a/nft_ingester/src/index_syncronizer.rs +++ b/nft_ingester/src/index_syncronizer.rs @@ -195,15 +195,15 @@ where let state = self.get_sync_state(run_full_sync_threshold, asset_type).await?; match state { SyncStatus::FullSyncRequired(state) => { - tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); + tracing::debug!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await }, SyncStatus::RegularSyncRequired(state) => { - tracing::info!("Regular sync required for nft asset"); + tracing::debug!("Regular sync required for nft asset"); self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await }, SyncStatus::NoSyncRequired => { - tracing::info!("No sync required for nft asset"); + tracing::debug!("No sync required for nft asset"); Ok(()) }, } @@ -220,17 +220,17 @@ where match state { SyncStatus::FullSyncRequired(state) => { - tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); + tracing::debug!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) .await }, SyncStatus::RegularSyncRequired(state) => { - tracing::info!("Regular sync required for fungible asset"); + tracing::debug!("Regular sync required for fungible asset"); self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) .await }, SyncStatus::NoSyncRequired => { - tracing::info!("No sync required for fungible asset"); + tracing::debug!("No sync required for fungible asset"); Ok(()) }, } diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 221c5c38..f58d36f6 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -2989,11 +2989,11 @@ mod tests { } #[tokio::test(flavor = "multi_thread")] - #[ignore = "FIXME: mismatched number of tokens"] async fn test_token_type() { let cnt = 100; let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; + let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( env.rocks_env.storage.clone(), env.pg_env.client.clone(), @@ -3002,7 +3002,7 @@ mod tests { Arc::new(SynchronizerMetricsConfig::new()), 1, ); - let fungible_token_mint1 = generated_assets.pubkeys[0]; // non-existed token + let fungible_token_mint1 = Pubkey::new_unique(); // non-existed token let fungible_token_mint2 = Pubkey::from_str("METAewgxyPbgwsseH8T16a39CQ5VyVxZi9zXiDPY18m").unwrap(); // MPLX token let mint1 = Mint { @@ -3056,7 +3056,27 @@ mod tests { write_version: 10, }; - let ftm_complete = AssetCompleteDetails { + let ftm_complete1 = AssetCompleteDetails { + pubkey: fungible_token_mint1, + static_details: Some(AssetStaticDetails { + pubkey: fungible_token_mint1, + specification_asset_class: SpecificationAssetClass::FungibleAsset, + royalty_target_type: RoyaltyTargetType::Single, + created_at: 10, + edition_address: None, + }), + owner: Some(AssetOwner { + pubkey: fungible_token_mint1, + owner: Updated::new(10, Some(UpdateVersion::WriteVersion(10)), None), + delegate: Default::default(), + owner_type: Default::default(), + owner_delegate_seq: Default::default(), + is_current_owner: Default::default(), + }), + ..Default::default() + }; + + let ftm_complete2 = AssetCompleteDetails { pubkey: fungible_token_mint2, static_details: Some(AssetStaticDetails { pubkey: fungible_token_mint2, @@ -3076,15 +3096,26 @@ mod tests { ..Default::default() }; + env.rocks_env + .storage + .db + .put_cf( + &env.rocks_env.storage.db.cf_handle(AssetCompleteDetails::NAME).unwrap(), + fungible_token_mint1, + ftm_complete1.convert_to_fb_bytes(), + ) + .unwrap(); + env.rocks_env .storage .db .put_cf( &env.rocks_env.storage.db.cf_handle(AssetCompleteDetails::NAME).unwrap(), fungible_token_mint2, - ftm_complete.convert_to_fb_bytes(), + ftm_complete2.convert_to_fb_bytes(), ) .unwrap(); + let mut batch_storage = BatchSaveStorage::new( env.rocks_env.storage.clone(), 10, @@ -3170,6 +3201,7 @@ mod tests { page: Some(1), owner_address: Some(owner.to_string()), options: SearchAssetsOptions { + show_fungible: true, show_zero_balance: true, show_unverified_collections: true, ..Default::default() @@ -3202,11 +3234,11 @@ mod tests { ); assert!(res.items[1].clone().token_info.unwrap().symbol.is_none()); + assert!(res.items[1].clone().token_info.unwrap().price_info.is_none()); assert_eq!( res.items[1].clone().token_info.unwrap().associated_token_address.unwrap(), fungible_token_account1.to_string() ); - assert!(res.items[1].clone().token_info.unwrap().price_info.is_none()); let payload = SearchAssets { limit: Some(1000), @@ -3276,11 +3308,19 @@ mod tests { let res = api.search_assets(payload, mutexed_tasks.clone()).await.unwrap(); let res: AssetList = serde_json::from_value(res).unwrap(); - // Totally we have 3 assets with required owner + // Totally we have 3 assets with required owner. show_fungible is false by default, so we don't have token info. assert_eq!(res.items.len(), 3); assert!(res.items[0].mint_extensions.is_none()); + assert!(res.items[0].clone().token_info.unwrap().symbol.is_none()); + assert!(res.items[0].clone().token_info.unwrap().price_info.is_none()); + assert!(res.items[1].mint_extensions.is_none()); + assert!(res.items[1].clone().token_info.map_or(true, |info| info.symbol.is_none())); + assert!(res.items[1].clone().token_info.map_or(true, |info| info.price_info.is_none())); + assert!(res.items[2].mint_extensions.is_none()); + assert!(res.items[2].clone().token_info.map_or(true, |info| info.symbol.is_none())); + assert!(res.items[2].clone().token_info.map_or(true, |info| info.price_info.is_none())); let payload = SearchAssets { limit: Some(1000), diff --git a/postgre-client/src/asset_filter_client.rs b/postgre-client/src/asset_filter_client.rs index af7d43c7..ae9f252f 100644 --- a/postgre-client/src/asset_filter_client.rs +++ b/postgre-client/src/asset_filter_client.rs @@ -210,12 +210,6 @@ fn add_filter_clause<'a>( } if let Some(ref token_type) = filter.token_type { - if token_type == &TokenType::All && filter.owner_address.is_some() { - query_builder.push( - " LEFT JOIN fungible_tokens ON assets_v3.ast_pubkey = fungible_tokens.fbt_asset ", - ); - group_clause_required = true; - } if token_type == &TokenType::Fungible && filter.owner_address.is_some() { query_builder.push( " INNER JOIN fungible_tokens ON assets_v3.ast_pubkey = fungible_tokens.fbt_asset ", From a04706893d4c5a23dae85763fe85bc146ceeaa9a Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Fri, 24 Jan 2025 19:37:54 +0100 Subject: [PATCH 4/6] MTG-1245 Rollback some changes --- entities/src/api_req_params.rs | 5 +-- nft_ingester/tests/api_tests.rs | 63 +++++++++++++++------------------ 2 files changed, 29 insertions(+), 39 deletions(-) diff --git a/entities/src/api_req_params.rs b/entities/src/api_req_params.rs index ea613976..5c5c68ce 100644 --- a/entities/src/api_req_params.rs +++ b/entities/src/api_req_params.rs @@ -85,9 +85,6 @@ pub struct SearchAssetsOptions { pub show_inscription: bool, #[serde(default)] pub show_zero_balance: bool, - /// This option enable adding information about token symbol and price. - #[serde(default)] - pub show_fungible: bool, } #[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema, Default)] @@ -593,7 +590,7 @@ impl From for GetByMethodsOptions { show_collection_metadata: value.show_collection_metadata, show_inscription: value.show_inscription, show_zero_balance: value.show_zero_balance, - show_fungible: value.show_fungible, + show_fungible: false, } } } diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 26e20ae9..45038ac8 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -1,5 +1,5 @@ #[cfg(test)] -#[cfg(feature = "integration_tests")] +// #[cfg(feature = "integration_tests")] mod tests { use std::{collections::HashMap, str::FromStr, sync::Arc}; @@ -3202,7 +3202,6 @@ mod tests { page: Some(1), owner_address: Some(owner.to_string()), options: SearchAssetsOptions { - show_fungible: true, show_zero_balance: true, show_unverified_collections: true, ..Default::default() @@ -3217,29 +3216,31 @@ mod tests { // so this token contain info about symbol and price // and 1 non-existed token, so response for it do not include such info assert_eq!(res.items.len(), 2); - assert_eq!(res.items[0].clone().token_info.unwrap().symbol.unwrap(), "MPLX".to_string()); - assert_eq!( - res.items[0].clone().token_info.unwrap().associated_token_address.unwrap(), - fungible_token_account2.to_string() - ); - assert_eq!( - res.items[0].clone().token_info.unwrap().price_info.unwrap().currency.unwrap(), - "USDC".to_string() - ); - assert!( - res.items[0].clone().token_info.unwrap().price_info.unwrap().total_price.unwrap() > 0.0 - ); - assert!( - res.items[0].clone().token_info.unwrap().price_info.unwrap().price_per_token.unwrap() - > 0.0 - ); - - assert!(res.items[1].clone().token_info.unwrap().symbol.is_none()); - assert!(res.items[1].clone().token_info.unwrap().price_info.is_none()); - assert_eq!( - res.items[1].clone().token_info.unwrap().associated_token_address.unwrap(), - fungible_token_account1.to_string() - ); + // + // todo MTG-1263 part related to the show_fungible functionality, that shouldn't work for SearchAssets and some work is needed. + // assert_eq!(res.items[0].clone().token_info.unwrap().symbol.unwrap(), "MPLX".to_string()); + // assert_eq!( + // res.items[0].clone().token_info.unwrap().associated_token_address.unwrap(), + // fungible_token_account2.to_string() + // ); + // assert_eq!( + // res.items[0].clone().token_info.unwrap().price_info.unwrap().currency.unwrap(), + // "USDC".to_string() + // ); + // assert!( + // res.items[0].clone().token_info.unwrap().price_info.unwrap().total_price.unwrap() > 0.0 + // ); + // assert!( + // res.items[0].clone().token_info.unwrap().price_info.unwrap().price_per_token.unwrap() + // > 0.0 + // ); + // + // assert!(res.items[1].clone().token_info.unwrap().symbol.is_none()); + // assert!(res.items[1].clone().token_info.unwrap().price_info.is_none()); + // assert_eq!( + // res.items[1].clone().token_info.unwrap().associated_token_address.unwrap(), + // fungible_token_account1.to_string() + // ); let payload = SearchAssets { limit: Some(1000), @@ -3257,7 +3258,7 @@ mod tests { // We have 1 NonFungible token, created in setup::TestEnvironment::create fn assert_eq!(res.items.len(), 1); - assert!(res.items[0].token_info.is_none()); + // assert!(res.items[0].token_info.is_none()); let payload = SearchAssets { limit: Some(1000), @@ -3292,7 +3293,7 @@ mod tests { // Our NonFungible token is not compressed assert_eq!(res.items.len(), 1); - assert!(res.items[0].token_info.is_none()); + // assert!(res.items[0].token_info.is_none()); let payload = SearchAssets { limit: Some(1000), @@ -3312,16 +3313,8 @@ mod tests { // Totally we have 3 assets with required owner. show_fungible is false by default, so we don't have token info. assert_eq!(res.items.len(), 3); assert!(res.items[0].mint_extensions.is_none()); - assert!(res.items[0].clone().token_info.unwrap().symbol.is_none()); - assert!(res.items[0].clone().token_info.unwrap().price_info.is_none()); - assert!(res.items[1].mint_extensions.is_none()); - assert!(res.items[1].clone().token_info.map_or(true, |info| info.symbol.is_none())); - assert!(res.items[1].clone().token_info.map_or(true, |info| info.price_info.is_none())); - assert!(res.items[2].mint_extensions.is_none()); - assert!(res.items[2].clone().token_info.map_or(true, |info| info.symbol.is_none())); - assert!(res.items[2].clone().token_info.map_or(true, |info| info.price_info.is_none())); let payload = SearchAssets { limit: Some(1000), From 48ad6d6fa03b21a6e4b2d12d8a678879c2618837 Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Fri, 24 Jan 2025 19:54:46 +0100 Subject: [PATCH 5/6] MTG-1245 Rollback some changes --- nft_ingester/src/api/dapi/search_assets.rs | 5 ++++- nft_ingester/src/index_syncronizer.rs | 4 ++-- nft_ingester/tests/api_tests.rs | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/nft_ingester/src/api/dapi/search_assets.rs b/nft_ingester/src/api/dapi/search_assets.rs index b51e7cec..7e4fa8d3 100644 --- a/nft_ingester/src/api/dapi/search_assets.rs +++ b/nft_ingester/src/api/dapi/search_assets.rs @@ -36,7 +36,7 @@ pub async fn search_assets< >( index_client: Arc, rocks_db: Arc, - filter: SearchAssetsQuery, + mut filter: SearchAssetsQuery, sort_by: AssetSorting, limit: u64, page: Option, @@ -55,6 +55,9 @@ pub async fn search_assets< tree_gaps_checker: &Option>, native_mint_pubkey: &str, ) -> Result { + if options.show_fungible { + filter.token_type = Some(TokenType::All) + } let show_native_balance = options.show_native_balance; let (asset_list, native_balance) = tokio::join!( fetch_assets( diff --git a/nft_ingester/src/index_syncronizer.rs b/nft_ingester/src/index_syncronizer.rs index 8dd6b886..7fe7c6cd 100644 --- a/nft_ingester/src/index_syncronizer.rs +++ b/nft_ingester/src/index_syncronizer.rs @@ -195,7 +195,7 @@ where let state = self.get_sync_state(run_full_sync_threshold, asset_type).await?; match state { SyncStatus::FullSyncRequired(state) => { - tracing::debug!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); + tracing::warn!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await }, SyncStatus::RegularSyncRequired(state) => { @@ -220,7 +220,7 @@ where match state { SyncStatus::FullSyncRequired(state) => { - tracing::debug!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); + tracing::warn!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) .await }, diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 45038ac8..66c2e6e2 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -1,5 +1,6 @@ #[cfg(test)] -// #[cfg(feature = "integration_tests")] + +#[cfg(feature = "integration_tests")] mod tests { use std::{collections::HashMap, str::FromStr, sync::Arc}; From 6d0f48b3a2a61eac8bc1c6c4f4f58ea4b2731533 Mon Sep 17 00:00:00 2001 From: andrii_kl <18900364+andrii-kl@users.noreply.github.com> Date: Fri, 24 Jan 2025 20:11:35 +0100 Subject: [PATCH 6/6] MTG-1245 fix ftm --- nft_ingester/tests/api_tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 66c2e6e2..d5dea293 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -1,5 +1,4 @@ #[cfg(test)] - #[cfg(feature = "integration_tests")] mod tests { use std::{collections::HashMap, str::FromStr, sync::Arc};