Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MET-68] Skip indexing if on chain data is missing and add metadata p… #40

Merged
merged 10 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions digital_asset_types/src/dao/scopes/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ pub async fn get_by_creator(
pagination: &Pagination,
limit: u64,
) -> Result<Vec<FullAsset>, DbErr> {
let mut condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN asset_creators_v3 ON ast_pubkey = asc_pubkey WHERE asc_creator = $1".to_string();
let mut condition =
"SELECT ast_pubkey FROM assets_v3 LEFT JOIN asset_creators_v3 ON ast_pubkey = asc_pubkey
LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE asc_creator = $1"
.to_string();
if only_verified {
condition = format!("{} AND asc_verified = true", condition);
}
Expand Down Expand Up @@ -143,7 +146,7 @@ pub async fn get_by_grouping(
return Ok(vec![]);
}

let condition = "SELECT ast_pubkey FROM assets_v3 WHERE ast_collection = $1 AND ast_is_collection_verified = true";
let condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE ast_collection = $1 AND ast_is_collection_verified = true";
let values = vec![Set(group_value.clone())
.into_value()
.ok_or(DbErr::Custom(format!(
Expand Down Expand Up @@ -178,7 +181,7 @@ pub async fn get_assets_by_owner(
pagination: &Pagination,
limit: u64,
) -> Result<Vec<FullAsset>, DbErr> {
let condition = "SELECT ast_pubkey FROM assets_v3 WHERE ast_owner = $1";
let condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE ast_owner = $1";
let values = vec![Set(owner.to_bytes().to_vec().as_slice())
.into_value()
.ok_or(DbErr::Custom(format!(
Expand Down Expand Up @@ -208,7 +211,7 @@ pub async fn get_by_authority(
pagination: &Pagination,
limit: u64,
) -> Result<Vec<FullAsset>, DbErr> {
let condition = "SELECT ast_pubkey FROM assets_v3 WHERE ast_authority = $1";
let condition = "SELECT ast_pubkey FROM assets_v3 LEFT JOIN tasks ON ast_metadata_url_id = tsk_id WHERE ast_authority = $1";
let values = vec![Set(authority.as_slice())
.into_value()
.ok_or(DbErr::Custom(format!(
Expand Down Expand Up @@ -240,7 +243,10 @@ async fn get_by_related_condition(
pagination: &Pagination,
limit: u64,
) -> Result<Vec<FullAsset>, DbErr> {
let condition = &format!("{} AND ast_supply > 0", condition);
let condition = &format!(
"{} AND ast_supply > 0 AND tsk_status = 'success' ",
condition
);
let (mut condition, values, offset) = paginate(pagination, limit, condition, values)?;

condition = format!("{} LIMIT {}", condition, limit);
Expand Down
4 changes: 3 additions & 1 deletion entities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ serde = "1.0.193"
solana-sdk = "~1.16"
num-derive = "0.4.1"
num-traits = "0.2.17"
schemars = "0.8.6"
schemars = "0.8.6"
serde_derive = "1.0.195"
sqlx = { version = "0.6.2", features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] }
19 changes: 19 additions & 0 deletions entities/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,22 @@ pub enum RoyaltyModel {
#[serde(rename = "single")]
Single,
}

#[derive(
serde_derive::Deserialize,
serde_derive::Serialize,
PartialEq,
Debug,
Eq,
Hash,
sqlx::Type,
Copy,
Clone,
)]
#[sqlx(type_name = "task_status", rename_all = "lowercase")]
pub enum TaskStatus {
Pending,
Running,
Success,
Failed,
}
35 changes: 13 additions & 22 deletions init_v3.sql
StanChe marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,20 @@ CREATE TABLE asset_creators_v3 (
);
CREATE INDEX asset_creators_v3_creator ON asset_creators_v3(asc_creator, asc_verified);

CREATE TABLE metadata (
mtd_id bigserial
CONSTRAINT metadata_pk
CREATE TABLE tasks (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced a meaningful name for it is tasks. metadata seems to be be easier to understand in this context.

tsk_id bigserial
CONSTRAINT tasks_pk
PRIMARY KEY,
mtd_url text NOT NULL
tsk_metadata_url text NOT NULL,
tsk_status task_status NOT NULL,
tsk_locked_until timestamptz NULL DEFAULT (now() AT TIME ZONE 'utc'::text),
tsk_attempts int2 NOT NULL DEFAULT 0,
tsk_max_attempts int2 NOT NULL DEFAULT 10,
tsk_error text
);
CREATE UNIQUE INDEX metadata_url ON metadata (mtd_url);
CREATE UNIQUE INDEX tasks_metadata_url ON tasks (tsk_metadata_url);
CREATE INDEX tasks_status ON tasks (tsk_status);
CREATE INDEX tasks_locked_until ON tasks (tsk_locked_until);

CREATE TABLE assets_v3 (
ast_pubkey bytea NOT NULL,
Expand All @@ -75,7 +82,7 @@ CREATE TABLE assets_v3 (
ast_metadata_url_id bigint NULL,
ast_slot_updated bigint NOT NULL,
CONSTRAINT assets_pkey PRIMARY KEY (ast_pubkey),
FOREIGN KEY (ast_metadata_url_id) REFERENCES metadata(mtd_id) ON DELETE RESTRICT ON UPDATE CASCADE
FOREIGN KEY (ast_metadata_url_id) REFERENCES tasks(tsk_id) ON DELETE RESTRICT ON UPDATE CASCADE
);
-- indexes on the fields that will not get updated:
-- so far we only know of V1 specification versions, so we only index on others
Expand Down Expand Up @@ -106,22 +113,6 @@ CREATE INDEX assets_v3_is_frozen ON assets_v3(ast_is_frozen) WHERE ast_is_frozen
CREATE INDEX assets_v3_supply ON assets_v3(ast_supply) WHERE ast_supply IS NOT NULL;
CREATE INDEX assets_v3_slot_updated ON assets_v3(ast_slot_updated);

CREATE TABLE tasks (
tsk_id bigserial
CONSTRAINT tasks_pk
PRIMARY KEY,
tsk_metadata_url bigint NOT NULL,
tsk_status task_status NOT NULL,
tsk_locked_until timestamptz NULL DEFAULT (now() AT TIME ZONE 'utc'::text),
tsk_attempts int2 NOT NULL DEFAULT 0,
tsk_max_attempts int2 NOT NULL DEFAULT 10,
tsk_error text,
FOREIGN KEY (tsk_metadata_url) REFERENCES metadata(mtd_id) ON DELETE RESTRICT ON UPDATE CASCADE
);
CREATE UNIQUE INDEX tasks_metadata_url ON tasks (tsk_metadata_url);
CREATE INDEX tasks_status ON tasks (tsk_status);
CREATE INDEX tasks_locked_until ON tasks (tsk_locked_until);

CREATE TABLE last_synced_key (
id integer NOT NULL PRIMARY KEY DEFAULT 1,
last_synced_asset_update_key bytea,
Expand Down
102 changes: 10 additions & 92 deletions nft_ingester/src/db_v2.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use entities::enums::TaskStatus;
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions, Postgres},
ConnectOptions, PgPool, QueryBuilder, Row,
};
use std::collections::{HashMap, HashSet};

use crate::config::DatabaseConfig;
use crate::error::IngesterError;
Expand All @@ -12,25 +12,6 @@ pub struct DBClient {
pub pool: PgPool,
}

#[derive(
serde_derive::Deserialize,
serde_derive::Serialize,
PartialEq,
Debug,
Eq,
Hash,
sqlx::Type,
Copy,
Clone,
)]
#[sqlx(type_name = "task_status", rename_all = "lowercase")]
pub enum TaskStatus {
Pending,
Running,
Success,
Failed,
}

#[derive(Debug, Clone)]
pub struct Task {
pub ofd_metadata_url: String,
Expand All @@ -40,26 +21,17 @@ pub struct Task {
pub ofd_error: Option<String>,
}

pub struct TaskForInsert {
pub ofd_metadata_url: i64,
pub ofd_locked_until: Option<chrono::DateTime<chrono::Utc>>,
pub ofd_attempts: i32,
pub ofd_max_attempts: i32,
pub ofd_error: Option<String>,
}

#[derive(Debug, Clone)]
pub struct JsonDownloadTask {
pub metadata_url: String,
pub metadata_url_key: i64,
pub status: TaskStatus,
pub attempts: i16,
pub max_attempts: i16,
}

pub struct UpdatedTask {
pub status: TaskStatus,
pub metadata_url_key: i64,
pub metadata_url: String,
pub attempts: i16,
pub error: String,
}
Expand Down Expand Up @@ -97,7 +69,7 @@ impl DBClient {
QueryBuilder::new("UPDATE tasks SET tsk_status = tmp.tsk_status, tsk_attempts = tmp.tsk_attempts, tsk_error = tmp.tsk_error FROM (");

query_builder.push_values(data, |mut b, key| {
b.push_bind(key.metadata_url_key);
b.push_bind(key.metadata_url);
b.push_bind(key.status);
b.push_bind(key.attempts);
b.push_bind(key.error);
Expand Down Expand Up @@ -128,9 +100,7 @@ impl DBClient {
tsk_locked_until = NOW() + INTERVAL '20 seconds'
FROM cte
WHERE t.tsk_id = cte.tsk_id
RETURNING (
SELECT mtd_url FROM metadata m WHERE m.mtd_id = t.tsk_metadata_url) as metadata_url, t.tsk_metadata_url,
t.tsk_status, t.tsk_attempts, t.tsk_max_attempts;");
RETURNING t.tsk_metadata_url, t.tsk_status, t.tsk_attempts, t.tsk_max_attempts;");

let query = query_builder.build();
let rows = query
Expand All @@ -141,15 +111,13 @@ impl DBClient {
let mut tasks = Vec::new();

for row in rows {
let metadata_url: String = row.get("metadata_url");
let metadata_url_key: i64 = row.get("tsk_metadata_url");
let metadata_url: String = row.get("tsk_metadata_url");
let status: TaskStatus = row.get("tsk_status");
let attempts: i16 = row.get("tsk_attempts");
let max_attempts: i16 = row.get("tsk_max_attempts");

tasks.push(JsonDownloadTask {
metadata_url,
metadata_url_key,
status,
attempts,
max_attempts,
Expand All @@ -159,58 +127,8 @@ impl DBClient {
Ok(tasks)
}

pub async fn insert_metadata(
&self,
urls: &Vec<&str>,
) -> Result<HashMap<String, i64>, IngesterError> {
let mut query_builder: QueryBuilder<'_, Postgres> =
QueryBuilder::new("INSERT INTO metadata (mtd_url)");

query_builder.push_values(urls, |mut b, key| {
b.push_bind(key);
});

query_builder.push("ON CONFLICT (mtd_url) DO NOTHING RETURNING mtd_id, mtd_url;");

let query = query_builder.build();
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|err| IngesterError::DatabaseError(format!("Insert one metadata: {}", err)))?;

let res: HashMap<String, i64> = rows
.iter()
.map(|row| (row.get("mtd_url"), row.get("mtd_id")))
.collect();

Ok(res)
}

pub async fn insert_tasks(&self, data: &Vec<Task>) -> Result<(), IngesterError> {
let mut keys = HashSet::new();
for off_d in data {
keys.insert(off_d.ofd_metadata_url.as_str());
}

let keys = keys.into_iter().collect::<Vec<_>>();
let ids_keys = self.insert_metadata(&keys).await?;

let mut offchain_data_to_insert = Vec::new();

for offchain_d in data.iter() {
// save tasks only for those links which are new
if let Some(id) = ids_keys.get(&offchain_d.ofd_metadata_url) {
offchain_data_to_insert.push(TaskForInsert {
ofd_metadata_url: *id,
ofd_locked_until: offchain_d.ofd_locked_until,
ofd_attempts: offchain_d.ofd_attempts,
ofd_max_attempts: offchain_d.ofd_max_attempts,
ofd_error: offchain_d.ofd_error.clone(),
});
}
}

offchain_data_to_insert.sort_by(|a, b| a.ofd_metadata_url.cmp(&b.ofd_metadata_url));
pub async fn insert_tasks(&self, data: &mut Vec<Task>) -> Result<(), IngesterError> {
data.sort_by(|a, b| a.ofd_metadata_url.cmp(&b.ofd_metadata_url));

let mut query_builder: QueryBuilder<'_, Postgres> = QueryBuilder::new(
"INSERT INTO tasks (
Expand All @@ -223,12 +141,12 @@ impl DBClient {
) ",
);

query_builder.push_values(offchain_data_to_insert, |mut b, off_d| {
b.push_bind(off_d.ofd_metadata_url);
query_builder.push_values(data, |mut b, off_d| {
b.push_bind(off_d.ofd_metadata_url.clone());
b.push_bind(off_d.ofd_locked_until);
b.push_bind(off_d.ofd_attempts);
b.push_bind(off_d.ofd_max_attempts);
b.push_bind(off_d.ofd_error);
b.push_bind(off_d.ofd_error.clone());
b.push_bind(TaskStatus::Pending);
});

Expand Down
9 changes: 5 additions & 4 deletions nft_ingester/src/json_downloader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::config::{setup_config, BackgroundTaskConfig, BackgroundTaskRunnerConfig};
use crate::db_v2::{DBClient, TaskStatus, UpdatedTask};
use crate::db_v2::{DBClient, UpdatedTask};
use entities::enums::TaskStatus;
use log::{debug, error, info};
use metrics_utils::{JsonDownloaderMetricsConfig, MetricStatus};
use reqwest::{Client, ClientBuilder};
Expand Down Expand Up @@ -79,7 +80,7 @@ impl JsonDownloader {
};
let data_to_insert = UpdatedTask {
status,
metadata_url_key: task.metadata_url_key,
metadata_url: task.metadata_url,
attempts: task.attempts + 1,
error: response.status().as_str().to_string(),
};
Expand All @@ -104,7 +105,7 @@ impl JsonDownloader {
.unwrap();
let data_to_insert = UpdatedTask {
status: TaskStatus::Success,
metadata_url_key: task.metadata_url_key,
metadata_url: task.metadata_url,
attempts: task.attempts + 1,
error: "".to_string(),
};
Expand All @@ -118,7 +119,7 @@ impl JsonDownloader {
} else {
let data_to_insert = UpdatedTask {
status: TaskStatus::Failed,
metadata_url_key: task.metadata_url_key,
metadata_url: task.metadata_url,
attempts: task.attempts + 1,
error: "Failed to deserialize metadata body"
.to_string(),
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/mplx_updates_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl MplxAccsProcessor {

tasks_to_insert.extend(tasks);

let res = self.db_client_v2.insert_tasks(&tasks_to_insert).await;
let res = self.db_client_v2.insert_tasks(&mut tasks_to_insert).await;
result_to_metrics(self.metrics.clone(), &res, "accounts_saving_tasks");
}
}
Expand Down
Loading
Loading