Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MTG-1242 Fix build search query for Fungible tokens. MTG-1031 Fix tes… #379

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions nft_ingester/src/api/dapi/search_assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ async fn fetch_assets<
JP: JsonPersister + Sync + Send + 'static,
PPC: ProcessingPossibilityChecker + Sync + Send + 'static,
>(
index_client: Arc<impl postgre_client::storage_traits::AssetPubkeyFilteredFetcher>,
pg_index_client: Arc<impl postgre_client::storage_traits::AssetPubkeyFilteredFetcher>,
rocks_db: Arc<Storage>,
filter: SearchAssetsQuery,
sort_by: AssetSorting,
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn fetch_assets<
}
};

let keys = index_client
let keys = pg_index_client
.get_asset_pubkeys_filtered(filter, &sort_by.into(), limit, page, before, after, &options)
.await
.map_err(|e| {
Expand Down Expand Up @@ -206,13 +206,14 @@ async fn fetch_assets<
};
let mut grand_total = None;
if options.show_grand_total {
grand_total = Some(index_client.get_grand_total(filter, &options).await.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?)
grand_total =
Some(pg_index_client.get_grand_total(filter, &options).await.map_err(|e| {
if e.to_string().contains("statement timeout") {
StorageError::QueryTimedOut
} else {
StorageError::Common(e.to_string())
}
})?)
}

let resp = AssetList {
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/bin/synchronizer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn main() -> Result<(), IngesterError> {
red_metrics.register(&mut registry);
metrics_utils::utils::start_metrics(registry, args.metrics_port).await;

let index_storage = Arc::new(
let pg_index_storage = Arc::new(
init_index_storage_with_migration(
&args.pg_database_url,
args.pg_max_db_connections,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub async fn main() -> Result<(), IngesterError> {

let synchronizer = Arc::new(Synchronizer::new(
rocks_storage.clone(),
index_storage.clone(),
pg_index_storage.clone(),
args.dump_synchronizer_batch_size,
args.rocks_dump_path.clone(),
metrics.clone(),
Expand Down
39 changes: 39 additions & 0 deletions nft_ingester/src/bin/synchronizer/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## Building the Project

Clone the repository and navigate to the project directory:

```bash
git clone https://github.com/metaplex-foundation/aura.git
cd nft_ingester
```

Build the project using Cargo:

```bash
cargo build --bin synchronizer
```

## Running the Service

Run to see the full list of available arguments:

```bash
./target/debug/synchronizer -h
```

Run Synchronizer with minimum functionality.

```bash
./target/debug/synchronizer \
--pg-database-url postgres://solana:solana@localhost:5432/aura_db
```


## Tips for local debugging/testing

To increase log verbosity, set the log level to debug:
` --log-level debug`

To fill the local Redis with messages you can use any other Redis that is available.
There is a script that will copy existing messages from one Redis and forward copies of these messages to another one.
`nft_ingester/scripts/transfer_redis_messages.py`
74 changes: 43 additions & 31 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ where
T: AssetIndexSourceStorage,
U: AssetIndexStorage,
{
primary_storage: Arc<T>,
index_storage: Arc<U>,
rocks_primary_storage: Arc<T>,
pg_index_storage: Arc<U>,
dump_synchronizer_batch_size: usize,
dump_path: String,
metrics: Arc<SynchronizerMetricsConfig>,
Expand All @@ -49,16 +49,16 @@ where
{
#[allow(clippy::too_many_arguments)]
pub fn new(
primary_storage: Arc<T>,
index_storage: Arc<U>,
rocks_primary_storage: Arc<T>,
pg_index_storage: Arc<U>,
dump_synchronizer_batch_size: usize,
dump_path: String,
metrics: Arc<SynchronizerMetricsConfig>,
parallel_tasks: usize,
) -> Self {
Synchronizer {
primary_storage,
index_storage,
rocks_primary_storage,
pg_index_storage,
dump_synchronizer_batch_size,
dump_path,
metrics,
Expand Down Expand Up @@ -109,16 +109,16 @@ where
run_full_sync_threshold: i64,
asset_type: AssetType,
) -> Result<SyncStatus, IngesterError> {
let last_indexed_key = self.index_storage.fetch_last_synced_id(asset_type).await?;
let last_indexed_key = self.pg_index_storage.fetch_last_synced_id(asset_type).await?;
let last_indexed_key = last_indexed_key.map(decode_u64x2_pubkey).transpose()?;

// Fetch the last known key from the primary storage
let (last_key, prefix) = match asset_type {
AssetType::NonFungible => {
(self.primary_storage.last_known_nft_asset_updated_key()?, "nft")
(self.rocks_primary_storage.last_known_nft_asset_updated_key()?, "nft")
},
AssetType::Fungible => {
(self.primary_storage.last_known_fungible_asset_updated_key()?, "fungible")
(self.rocks_primary_storage.last_known_fungible_asset_updated_key()?, "fungible")
},
};
let Some(last_key) = last_key else {
Expand Down Expand Up @@ -199,9 +199,13 @@ where
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await
},
SyncStatus::RegularSyncRequired(state) => {
tracing::info!("Regular sync required for nft asset");
andrii-kl marked this conversation as resolved.
Show resolved Hide resolved
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key).await
},
SyncStatus::NoSyncRequired => Ok(()),
SyncStatus::NoSyncRequired => {
tracing::info!("No sync required for nft asset");
Ok(())
},
}
}

Expand All @@ -221,10 +225,14 @@ where
.await
},
SyncStatus::RegularSyncRequired(state) => {
tracing::info!("Regular sync required for fungible asset");
self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await
},
SyncStatus::NoSyncRequired => Ok(()),
SyncStatus::NoSyncRequired => {
tracing::info!("No sync required for fungible asset");
Ok(())
},
}
}

Expand All @@ -234,8 +242,12 @@ where
asset_type: AssetType,
) -> Result<(), IngesterError> {
let last_known_key = match asset_type {
AssetType::NonFungible => self.primary_storage.last_known_nft_asset_updated_key()?,
AssetType::Fungible => self.primary_storage.last_known_fungible_asset_updated_key()?,
AssetType::NonFungible => {
self.rocks_primary_storage.last_known_nft_asset_updated_key()?
},
AssetType::Fungible => {
self.rocks_primary_storage.last_known_fungible_asset_updated_key()?
},
};
let Some(last_known_key) = last_known_key else {
return Ok(());
Expand Down Expand Up @@ -273,7 +285,7 @@ where
num_shards: u64,
) -> Result<(), IngesterError> {
let base_path = std::path::Path::new(self.dump_path.as_str());
self.index_storage.destructive_prep_to_batch_nft_load().await?;
self.pg_index_storage.destructive_prep_to_batch_nft_load().await?;

let shards = shard_pubkeys(num_shards);
type ResultWithPaths = Result<(usize, String, String, String, String), String>;
Expand Down Expand Up @@ -322,7 +334,7 @@ where
let end = *end;
let shutdown_rx = rx.resubscribe();
let metrics = self.metrics.clone();
let rocks_storage = self.primary_storage.clone();
let rocks_storage = self.rocks_primary_storage.clone();
tasks.spawn_blocking(move || {
let res = rocks_storage.dump_nft_csv(
assets_file,
Expand Down Expand Up @@ -350,7 +362,7 @@ where
while let Some(task) = tasks.join_next().await {
let (_cnt, assets_path, creators_path, authorities_path, metadata_path) =
task.map_err(|e| e.to_string())??;
let index_storage = self.index_storage.clone();
let index_storage = self.pg_index_storage.clone();
let semaphore = semaphore.clone();
index_tasks.spawn(async move {
index_storage
Expand All @@ -368,9 +380,9 @@ where
task.map_err(|e| e.to_string())?.map_err(|e| e.to_string())?;
}
tracing::info!("All NFT assets loads complete. Finalizing the batch load");
self.index_storage.finalize_batch_nft_load().await?;
self.pg_index_storage.finalize_batch_nft_load().await?;
tracing::info!("Batch load finalized for NFTs");
self.index_storage
self.pg_index_storage
.update_last_synced_key(last_included_rocks_key, AssetType::NonFungible)
.await?;
Ok(())
Expand All @@ -383,7 +395,7 @@ where
num_shards: u64,
) -> Result<(), IngesterError> {
let base_path = std::path::Path::new(self.dump_path.as_str());
self.index_storage.destructive_prep_to_batch_fungible_load().await?;
self.pg_index_storage.destructive_prep_to_batch_fungible_load().await?;

let shards = shard_pubkeys(num_shards);
let mut tasks: JoinSet<Result<(usize, String), String>> = JoinSet::new();
Expand All @@ -405,7 +417,7 @@ where
let end = *end;
let shutdown_rx = rx.resubscribe();
let metrics = self.metrics.clone();
let rocks_storage = self.primary_storage.clone();
let rocks_storage = self.rocks_primary_storage.clone();

tasks.spawn_blocking(move || {
let res = rocks_storage.dump_fungible_csv(
Expand All @@ -423,7 +435,7 @@ where
let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
while let Some(task) = tasks.join_next().await {
let (_cnt, fungible_tokens_path) = task.map_err(|e| e.to_string())??;
let index_storage = self.index_storage.clone();
let index_storage = self.pg_index_storage.clone();
let semaphore = semaphore.clone();
index_tasks.spawn(async move {
index_storage
Expand All @@ -435,9 +447,9 @@ where
task.map_err(|e| e.to_string())?.map_err(|e| e.to_string())?;
}
tracing::info!("All token accounts/fungibles loads complete. Finalizing the batch load");
self.index_storage.finalize_batch_fungible_load().await?;
self.pg_index_storage.finalize_batch_fungible_load().await?;
tracing::info!("Batch load finalized for fungibles");
self.index_storage
self.pg_index_storage
.update_last_synced_key(last_included_rocks_key, AssetType::Fungible)
.await?;
Ok(())
Expand All @@ -461,7 +473,7 @@ where
break;
}
let (updated_keys, last_included_key) =
self.primary_storage.fetch_fungible_asset_updated_keys(
self.rocks_primary_storage.fetch_fungible_asset_updated_keys(
starting_key.clone(),
Some(last_key.clone()),
self.dump_synchronizer_batch_size,
Expand All @@ -482,8 +494,8 @@ where
// Update the asset indexes in the index storage
// let last_included_key = AssetsUpdateIdx::encode_key(last_included_key);
last_included_rocks_key = Some(last_included_key);
let primary_storage = self.primary_storage.clone();
let index_storage = self.index_storage.clone();
let primary_storage = self.rocks_primary_storage.clone();
let index_storage = self.pg_index_storage.clone();
let metrics = self.metrics.clone();
tasks.spawn(async move {
Self::syncronize_fungible_batch(
Expand Down Expand Up @@ -518,7 +530,7 @@ where
last_included_rocks_key.slot,
last_included_rocks_key.pubkey,
);
self.index_storage
self.pg_index_storage
.update_last_synced_key(&last_included_rocks_key, AssetType::Fungible)
.await?;
} else {
Expand Down Expand Up @@ -550,7 +562,7 @@ where
break;
}
let (updated_keys, last_included_key) =
self.primary_storage.fetch_nft_asset_updated_keys(
self.rocks_primary_storage.fetch_nft_asset_updated_keys(
starting_key.clone(),
Some(last_key.clone()),
self.dump_synchronizer_batch_size,
Expand All @@ -571,8 +583,8 @@ where
// Update the asset indexes in the index storage
// let last_included_key = AssetsUpdateIdx::encode_key(last_included_key);
last_included_rocks_key = Some(last_included_key);
let primary_storage = self.primary_storage.clone();
let index_storage = self.index_storage.clone();
let primary_storage = self.rocks_primary_storage.clone();
let index_storage = self.pg_index_storage.clone();
let metrics = self.metrics.clone();
tasks.spawn(async move {
Self::syncronize_nft_batch(
Expand Down Expand Up @@ -607,7 +619,7 @@ where
last_included_rocks_key.slot,
last_included_rocks_key.pubkey,
);
self.index_storage
self.pg_index_storage
.update_last_synced_key(&last_included_rocks_key, AssetType::NonFungible)
.await?;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl TokenAccountsProcessor {
.map_err(|e| StorageError::Common(e.to_string()))
})
.transpose()?;

let asset_dynamic_details = AssetDynamicDetails {
pubkey: mint.pubkey,
supply: Some(Updated::new(
Expand Down
17 changes: 15 additions & 2 deletions nft_ingester/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod tests {
Storage, ToFlatbuffersConverter,
};
use serde_json::{json, Value};
use setup::rocks::RocksTestEnvironmentSetup;
use solana_program::pubkey::Pubkey;
use solana_sdk::signature::Signature;
use spl_pod::{
Expand All @@ -90,6 +91,7 @@ mod tests {
6, 155, 136, 87, 254, 171, 129, 132, 251, 104, 127, 99, 70, 24, 192, 53, 218, 196, 57, 220,
26, 235, 59, 85, 152, 160, 240, 0, 0, 0, 0, 1,
]);

#[tokio::test]
#[tracing_test::traced_test]
async fn test_search_assets() {
Expand Down Expand Up @@ -3434,11 +3436,22 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread")]
#[ignore = "FIXME: search_assets result returns 0 items"]
async fn test_writing_fungible_into_dedicated_table() {
let cnt = 100;
let cli = Cli::default();
let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await;

let (env, generated_assets) = setup::TestEnvironment::create_and_setup_from_closures(
&cli,
cnt,
100,
RocksTestEnvironmentSetup::static_data_for_fungible,
RocksTestEnvironmentSetup::with_authority,
RocksTestEnvironmentSetup::test_owner,
RocksTestEnvironmentSetup::dynamic_data,
RocksTestEnvironmentSetup::collection_without_authority,
)
.await;

let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new(
env.rocks_env.storage.clone(),
env.pg_env.client.clone(),
Expand Down
Loading
Loading