diff --git a/Cargo.lock b/Cargo.lock index f0950c421..46dad762b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2157,7 +2157,9 @@ dependencies = [ "num-traits", "schemars", "serde", + "serde_derive", "solana-sdk", + "sqlx", ] [[package]] diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index 1533bae64..e0fc97ca7 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -75,7 +75,10 @@ pub async fn get_by_creator( pagination: &Pagination, limit: u64, ) -> Result, DbErr> { - let mut condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN asset_creators_v3 ON ast_pubkey = asc_pubkey WHERE asc_creator = $1".to_string(); + let mut condition = + "SELECT ast_pubkey FROM assets_v3 LEFT JOIN asset_creators_v3 ON ast_pubkey = asc_pubkey + LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE asc_creator = $1" + .to_string(); if only_verified { condition = format!("{} AND asc_verified = true", condition); } @@ -143,7 +146,7 @@ pub async fn get_by_grouping( return Ok(vec![]); } - let condition = "SELECT ast_pubkey FROM assets_v3 WHERE ast_collection = $1 AND ast_is_collection_verified = true"; + let condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE ast_collection = $1 AND ast_is_collection_verified = true"; let values = vec![Set(group_value.clone()) .into_value() .ok_or(DbErr::Custom(format!( @@ -178,7 +181,7 @@ pub async fn get_assets_by_owner( pagination: &Pagination, limit: u64, ) -> Result, DbErr> { - let condition = "SELECT ast_pubkey FROM assets_v3 WHERE ast_owner = $1"; + let condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE ast_owner = $1"; let values = vec![Set(owner.to_bytes().to_vec().as_slice()) .into_value() .ok_or(DbErr::Custom(format!( @@ -208,7 +211,7 @@ pub async fn get_by_authority( pagination: &Pagination, limit: u64, ) -> Result, DbErr> { - let condition = "SELECT ast_pubkey FROM assets_v3 WHERE ast_authority = $1"; + let condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE ast_authority = $1"; let values = vec![Set(authority.as_slice()) .into_value() .ok_or(DbErr::Custom(format!( @@ -240,7 +243,10 @@ async fn get_by_related_condition( pagination: &Pagination, limit: u64, ) -> Result, DbErr> { - let condition = &format!("{} AND ast_supply > 0", condition); + let condition = &format!( + "{} AND ast_supply > 0 AND tsk_status = 'success' ", + condition + ); let (mut condition, values, offset) = paginate(pagination, limit, condition, values)?; condition = format!("{} LIMIT {}", condition, limit); diff --git a/entities/Cargo.toml b/entities/Cargo.toml index c95584da0..f29ef4687 100644 --- a/entities/Cargo.toml +++ b/entities/Cargo.toml @@ -10,4 +10,6 @@ serde = "1.0.193" solana-sdk = "~1.16" num-derive = "0.4.1" num-traits = "0.2.17" -schemars = "0.8.6" \ No newline at end of file +schemars = "0.8.6" +serde_derive = "1.0.195" +sqlx = { version = "0.6.2", features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] } \ No newline at end of file diff --git a/entities/src/enums.rs b/entities/src/enums.rs index 0504ce161..da452be98 100644 --- a/entities/src/enums.rs +++ b/entities/src/enums.rs @@ -112,3 +112,22 @@ pub enum RoyaltyModel { #[serde(rename = "single")] Single, } + +#[derive( + serde_derive::Deserialize, + serde_derive::Serialize, + PartialEq, + Debug, + Eq, + Hash, + sqlx::Type, + Copy, + Clone, +)] +#[sqlx(type_name = "task_status", rename_all = "lowercase")] +pub enum TaskStatus { + Pending, + Running, + Success, + Failed, +} diff --git a/init_v3.sql b/init_v3.sql index 5ee74c368..a724807d0 100644 --- a/init_v3.sql +++ b/init_v3.sql @@ -46,13 +46,20 @@ CREATE TABLE asset_creators_v3 ( ); CREATE INDEX asset_creators_v3_creator ON asset_creators_v3(asc_creator, asc_verified); -CREATE TABLE metadata ( - mtd_id bigserial - CONSTRAINT metadata_pk +CREATE TABLE tasks ( + tsk_id bigserial + CONSTRAINT tasks_pk PRIMARY KEY, - mtd_url text NOT NULL + tsk_metadata_url text NOT NULL, + tsk_status task_status NOT NULL, + tsk_locked_until timestamptz NULL DEFAULT (now() AT TIME ZONE 'utc'::text), + tsk_attempts int2 NOT NULL DEFAULT 0, + tsk_max_attempts int2 NOT NULL DEFAULT 10, + tsk_error text ); -CREATE UNIQUE INDEX metadata_url ON metadata (mtd_url); +CREATE UNIQUE INDEX tasks_metadata_url ON tasks (tsk_metadata_url); +CREATE INDEX tasks_status ON tasks (tsk_status); +CREATE INDEX tasks_locked_until ON tasks (tsk_locked_until); CREATE TABLE assets_v3 ( ast_pubkey bytea NOT NULL, @@ -75,7 +82,7 @@ CREATE TABLE assets_v3 ( ast_metadata_url_id bigint NULL, ast_slot_updated bigint NOT NULL, CONSTRAINT assets_pkey PRIMARY KEY (ast_pubkey), - FOREIGN KEY (ast_metadata_url_id) REFERENCES metadata(mtd_id) ON DELETE RESTRICT ON UPDATE CASCADE + FOREIGN KEY (ast_metadata_url_id) REFERENCES tasks(tsk_id) ON DELETE RESTRICT ON UPDATE CASCADE ); -- indexes on the fields that will not get updated: -- so far we only know of V1 specification versions, so we only index on others @@ -106,22 +113,6 @@ CREATE INDEX assets_v3_is_frozen ON assets_v3(ast_is_frozen) WHERE ast_is_frozen CREATE INDEX assets_v3_supply ON assets_v3(ast_supply) WHERE ast_supply IS NOT NULL; CREATE INDEX assets_v3_slot_updated ON assets_v3(ast_slot_updated); -CREATE TABLE tasks ( - tsk_id bigserial - CONSTRAINT tasks_pk - PRIMARY KEY, - tsk_metadata_url bigint NOT NULL, - tsk_status task_status NOT NULL, - tsk_locked_until timestamptz NULL DEFAULT (now() AT TIME ZONE 'utc'::text), - tsk_attempts int2 NOT NULL DEFAULT 0, - tsk_max_attempts int2 NOT NULL DEFAULT 10, - tsk_error text, - FOREIGN KEY (tsk_metadata_url) REFERENCES metadata(mtd_id) ON DELETE RESTRICT ON UPDATE CASCADE -); -CREATE UNIQUE INDEX tasks_metadata_url ON tasks (tsk_metadata_url); -CREATE INDEX tasks_status ON tasks (tsk_status); -CREATE INDEX tasks_locked_until ON tasks (tsk_locked_until); - CREATE TABLE last_synced_key ( id integer NOT NULL PRIMARY KEY DEFAULT 1, last_synced_asset_update_key bytea, diff --git a/nft_ingester/src/db_v2.rs b/nft_ingester/src/db_v2.rs index 636ea9eb1..8576fa610 100644 --- a/nft_ingester/src/db_v2.rs +++ b/nft_ingester/src/db_v2.rs @@ -1,8 +1,8 @@ +use entities::enums::TaskStatus; use sqlx::{ postgres::{PgConnectOptions, PgPoolOptions, Postgres}, ConnectOptions, PgPool, QueryBuilder, Row, }; -use std::collections::{HashMap, HashSet}; use crate::config::DatabaseConfig; use crate::error::IngesterError; @@ -12,25 +12,6 @@ pub struct DBClient { pub pool: PgPool, } -#[derive( - serde_derive::Deserialize, - serde_derive::Serialize, - PartialEq, - Debug, - Eq, - Hash, - sqlx::Type, - Copy, - Clone, -)] -#[sqlx(type_name = "task_status", rename_all = "lowercase")] -pub enum TaskStatus { - Pending, - Running, - Success, - Failed, -} - #[derive(Debug, Clone)] pub struct Task { pub ofd_metadata_url: String, @@ -40,18 +21,9 @@ pub struct Task { pub ofd_error: Option, } -pub struct TaskForInsert { - pub ofd_metadata_url: i64, - pub ofd_locked_until: Option>, - pub ofd_attempts: i32, - pub ofd_max_attempts: i32, - pub ofd_error: Option, -} - #[derive(Debug, Clone)] pub struct JsonDownloadTask { pub metadata_url: String, - pub metadata_url_key: i64, pub status: TaskStatus, pub attempts: i16, pub max_attempts: i16, @@ -59,7 +31,7 @@ pub struct JsonDownloadTask { pub struct UpdatedTask { pub status: TaskStatus, - pub metadata_url_key: i64, + pub metadata_url: String, pub attempts: i16, pub error: String, } @@ -97,7 +69,7 @@ impl DBClient { QueryBuilder::new("UPDATE tasks SET tsk_status = tmp.tsk_status, tsk_attempts = tmp.tsk_attempts, tsk_error = tmp.tsk_error FROM ("); query_builder.push_values(data, |mut b, key| { - b.push_bind(key.metadata_url_key); + b.push_bind(key.metadata_url); b.push_bind(key.status); b.push_bind(key.attempts); b.push_bind(key.error); @@ -128,9 +100,7 @@ impl DBClient { tsk_locked_until = NOW() + INTERVAL '20 seconds' FROM cte WHERE t.tsk_id = cte.tsk_id - RETURNING ( - SELECT mtd_url FROM metadata m WHERE m.mtd_id = t.tsk_metadata_url) as metadata_url, t.tsk_metadata_url, - t.tsk_status, t.tsk_attempts, t.tsk_max_attempts;"); + RETURNING t.tsk_metadata_url, t.tsk_status, t.tsk_attempts, t.tsk_max_attempts;"); let query = query_builder.build(); let rows = query @@ -141,15 +111,13 @@ impl DBClient { let mut tasks = Vec::new(); for row in rows { - let metadata_url: String = row.get("metadata_url"); - let metadata_url_key: i64 = row.get("tsk_metadata_url"); + let metadata_url: String = row.get("tsk_metadata_url"); let status: TaskStatus = row.get("tsk_status"); let attempts: i16 = row.get("tsk_attempts"); let max_attempts: i16 = row.get("tsk_max_attempts"); tasks.push(JsonDownloadTask { metadata_url, - metadata_url_key, status, attempts, max_attempts, @@ -159,58 +127,8 @@ impl DBClient { Ok(tasks) } - pub async fn insert_metadata( - &self, - urls: &Vec<&str>, - ) -> Result, IngesterError> { - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("INSERT INTO metadata (mtd_url)"); - - query_builder.push_values(urls, |mut b, key| { - b.push_bind(key); - }); - - query_builder.push("ON CONFLICT (mtd_url) DO NOTHING RETURNING mtd_id, mtd_url;"); - - let query = query_builder.build(); - let rows = query - .fetch_all(&self.pool) - .await - .map_err(|err| IngesterError::DatabaseError(format!("Insert one metadata: {}", err)))?; - - let res: HashMap = rows - .iter() - .map(|row| (row.get("mtd_url"), row.get("mtd_id"))) - .collect(); - - Ok(res) - } - - pub async fn insert_tasks(&self, data: &Vec) -> Result<(), IngesterError> { - let mut keys = HashSet::new(); - for off_d in data { - keys.insert(off_d.ofd_metadata_url.as_str()); - } - - let keys = keys.into_iter().collect::>(); - let ids_keys = self.insert_metadata(&keys).await?; - - let mut offchain_data_to_insert = Vec::new(); - - for offchain_d in data.iter() { - // save tasks only for those links which are new - if let Some(id) = ids_keys.get(&offchain_d.ofd_metadata_url) { - offchain_data_to_insert.push(TaskForInsert { - ofd_metadata_url: *id, - ofd_locked_until: offchain_d.ofd_locked_until, - ofd_attempts: offchain_d.ofd_attempts, - ofd_max_attempts: offchain_d.ofd_max_attempts, - ofd_error: offchain_d.ofd_error.clone(), - }); - } - } - - offchain_data_to_insert.sort_by(|a, b| a.ofd_metadata_url.cmp(&b.ofd_metadata_url)); + pub async fn insert_tasks(&self, data: &mut Vec) -> Result<(), IngesterError> { + data.sort_by(|a, b| a.ofd_metadata_url.cmp(&b.ofd_metadata_url)); let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( "INSERT INTO tasks ( @@ -223,12 +141,12 @@ impl DBClient { ) ", ); - query_builder.push_values(offchain_data_to_insert, |mut b, off_d| { - b.push_bind(off_d.ofd_metadata_url); + query_builder.push_values(data, |mut b, off_d| { + b.push_bind(off_d.ofd_metadata_url.clone()); b.push_bind(off_d.ofd_locked_until); b.push_bind(off_d.ofd_attempts); b.push_bind(off_d.ofd_max_attempts); - b.push_bind(off_d.ofd_error); + b.push_bind(off_d.ofd_error.clone()); b.push_bind(TaskStatus::Pending); }); diff --git a/nft_ingester/src/json_downloader.rs b/nft_ingester/src/json_downloader.rs index a010f85ef..1829f698a 100644 --- a/nft_ingester/src/json_downloader.rs +++ b/nft_ingester/src/json_downloader.rs @@ -1,5 +1,6 @@ use crate::config::{setup_config, BackgroundTaskConfig, BackgroundTaskRunnerConfig}; -use crate::db_v2::{DBClient, TaskStatus, UpdatedTask}; +use crate::db_v2::{DBClient, UpdatedTask}; +use entities::enums::TaskStatus; use log::{debug, error, info}; use metrics_utils::{JsonDownloaderMetricsConfig, MetricStatus}; use reqwest::{Client, ClientBuilder}; @@ -79,7 +80,7 @@ impl JsonDownloader { }; let data_to_insert = UpdatedTask { status, - metadata_url_key: task.metadata_url_key, + metadata_url: task.metadata_url, attempts: task.attempts + 1, error: response.status().as_str().to_string(), }; @@ -104,7 +105,7 @@ impl JsonDownloader { .unwrap(); let data_to_insert = UpdatedTask { status: TaskStatus::Success, - metadata_url_key: task.metadata_url_key, + metadata_url: task.metadata_url, attempts: task.attempts + 1, error: "".to_string(), }; @@ -118,7 +119,7 @@ impl JsonDownloader { } else { let data_to_insert = UpdatedTask { status: TaskStatus::Failed, - metadata_url_key: task.metadata_url_key, + metadata_url: task.metadata_url, attempts: task.attempts + 1, error: "Failed to deserialize metadata body" .to_string(), diff --git a/nft_ingester/src/mplx_updates_processor.rs b/nft_ingester/src/mplx_updates_processor.rs index cf19fe64d..4d500488e 100644 --- a/nft_ingester/src/mplx_updates_processor.rs +++ b/nft_ingester/src/mplx_updates_processor.rs @@ -346,7 +346,7 @@ impl MplxAccsProcessor { tasks_to_insert.extend(tasks); - let res = self.db_client_v2.insert_tasks(&tasks_to_insert).await; + let res = self.db_client_v2.insert_tasks(&mut tasks_to_insert).await; result_to_metrics(self.metrics.clone(), &res, "accounts_saving_tasks"); } } diff --git a/postgre-client/src/asset_filter_client.rs b/postgre-client/src/asset_filter_client.rs index cef5aa8c2..9b35f7d9c 100644 --- a/postgre-client/src/asset_filter_client.rs +++ b/postgre-client/src/asset_filter_client.rs @@ -25,25 +25,21 @@ impl PgClient { after: Option, ) -> (QueryBuilder<'a, Postgres>, bool) { let mut query_builder = QueryBuilder::new( - "SELECT ast_pubkey pubkey, ast_slot_created slot_created, ast_slot_updated slot_updated FROM assets_v3 ", + "SELECT ast_pubkey pubkey, ast_slot_created slot_created, ast_slot_updated slot_updated FROM assets_v3 INNER JOIN tasks ON ast_metadata_url_id = tsk_id", ); - let mut group_clause_required = false; if filter.creator_address.is_some() || filter.creator_verified.is_some() || filter.royalty_target.is_some() { query_builder.push(" INNER JOIN asset_creators_v3 ON ast_pubkey = asc_pubkey "); - group_clause_required = true; - } - if filter.json_uri.is_some() { - query_builder.push(" INNER JOIN metadata ON ast_metadata_url_id = mtd_id "); - group_clause_required = true; } // todo: if we implement the additional params like negata and all/any switch, the true part and the AND prefix should be refactored query_builder.push(" WHERE TRUE "); + // todo: this breaks some tests, so neew to fix them if future + query_builder.push(" AND tsk_status = 'success' "); if let Some(spec_version) = &filter.specification_version { query_builder.push(" AND assets_v3.ast_specification_version = "); query_builder.push_bind(spec_version); @@ -136,7 +132,7 @@ impl PgClient { } if let Some(json_uri) = &filter.json_uri { - query_builder.push(" AND metadata.mtd_url = "); + query_builder.push(" AND tsk_metadata_url = "); query_builder.push_bind(json_uri); } @@ -190,10 +186,7 @@ impl PgClient { } } - // Add GROUP BY clause if necessary - if group_clause_required { - query_builder.push(" GROUP BY assets_v3.ast_pubkey, assets_v3.ast_slot_created, assets_v3.ast_slot_updated "); - } + query_builder.push(" GROUP BY assets_v3.ast_pubkey, assets_v3.ast_slot_created, assets_v3.ast_slot_updated "); // Add ORDER BY clause let direction = match (&order.sort_direction, order_reversed) { diff --git a/postgre-client/src/asset_index_client.rs b/postgre-client/src/asset_index_client.rs index ee0feed9e..235c95708 100644 --- a/postgre-client/src/asset_index_client.rs +++ b/postgre-client/src/asset_index_client.rs @@ -46,11 +46,9 @@ impl AssetIndexStorage for PgClient { if !metadata_urls.is_empty() { metadata_urls.sort(); - self.insert_metadata(&mut transaction, metadata_urls.clone()) - .await?; - metadata_url_map = self - .get_metadata_ids(&mut transaction, metadata_urls) + self.insert_tasks(&mut transaction, metadata_urls.clone()) .await?; + metadata_url_map = self.get_tasks_ids(&mut transaction, metadata_urls).await?; } let mut asset_indexes = asset_indexes.to_vec(); diff --git a/postgre-client/src/lib.rs b/postgre-client/src/lib.rs index 94843d52d..1bcd91da8 100644 --- a/postgre-client/src/lib.rs +++ b/postgre-client/src/lib.rs @@ -1,3 +1,4 @@ +use entities::enums::TaskStatus; use sqlx::{ postgres::{PgConnectOptions, PgPoolOptions}, ConnectOptions, PgPool, Postgres, QueryBuilder, Row, Transaction, @@ -42,17 +43,18 @@ impl PgClient { Self { pool } } - pub async fn insert_metadata( + pub async fn insert_tasks( &self, transaction: &mut Transaction<'_, Postgres>, metadata_urls: Vec, ) -> Result<(), String> { let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("INSERT INTO metadata (mtd_url) "); + QueryBuilder::new("INSERT INTO tasks (tsk_metadata_url, tsk_status) "); query_builder.push_values(metadata_urls.iter(), |mut builder, metadata_url| { builder.push_bind(metadata_url); + builder.push_bind(TaskStatus::Pending); }); - query_builder.push(" ON CONFLICT (mtd_url) DO NOTHING;"); + query_builder.push(" ON CONFLICT (tsk_metadata_url) DO NOTHING;"); let query = query_builder.build(); query @@ -63,13 +65,14 @@ impl PgClient { Ok(()) } - pub async fn get_metadata_ids( + pub async fn get_tasks_ids( &self, transaction: &mut Transaction<'_, Postgres>, metadata_urls: Vec, ) -> Result, String> { - let mut query_builder: QueryBuilder<'_, Postgres> = - QueryBuilder::new("SELECT mtd_id, mtd_url FROM metadata WHERE mtd_url in ("); + let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new( + "SELECT tsk_id, tsk_metadata_url FROM tasks WHERE tsk_metadata_url in (", + ); let urls_len = metadata_urls.len(); @@ -91,8 +94,8 @@ impl PgClient { let mut metadata_ids_map = HashMap::new(); for row in rows_result { - let metadata_id: i64 = row.get("mtd_id"); - let metadata_url: String = row.get("mtd_url"); + let metadata_id: i64 = row.get("tsk_id"); + let metadata_url: String = row.get("tsk_metadata_url"); metadata_ids_map.insert(metadata_url, metadata_id); } diff --git a/tests/setup/src/pg.rs b/tests/setup/src/pg.rs index b69188f35..da64206a2 100644 --- a/tests/setup/src/pg.rs +++ b/tests/setup/src/pg.rs @@ -64,7 +64,7 @@ impl<'a> TestEnvironment<'a> { } pub async fn count_rows_in_metadata(&self) -> Result { - let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM metadata") + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM tasks") .fetch_one(&self.pool) .await?;