Skip to content

Commit

Permalink
download-metadata subtask
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 2, 2023
1 parent 485bfba commit d0958c2
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nft_ingester2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ opentelemetry_sdk = { workspace = true, features = ["trace"] }
program_transformers = { workspace = true }
prometheus = { workspace = true }
redis = { workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"] }
reqwest = { workspace = true }
rust-crypto = { workspace = true }
sea-orm = { workspace = true, features = ["sqlx-postgres"] }
serde = { workspace = true }
Expand Down
12 changes: 12 additions & 0 deletions nft_ingester2/config-download-metadata.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Important: only ONE `download-metadata` instance is supported right now!
prometheus: 127.0.0.1:8875
postgres:
url: postgres://solana:solana@localhost/solana
min_connections: 10
max_connections: 50
download_metadata:
max_in_process: 50 # maximum tasks in process (downloading metadata)
prefetch_queue_size: 100
limit_to_fetch: 200 # maximum number of tasks fetched from database
wait_tasks_max_idle_ms: 100 # if we do not have pending tasks, wait max ms
download_timeout_ms: 5_000
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ postgres:
program_transformer:
transactions_cl_audits: false
max_tasks_in_process: 40
download_metadata_handler:
download_metadata:
max_attempts: 3
67 changes: 63 additions & 4 deletions nft_ingester2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub struct ConfigIngester {
pub redis: ConfigIngesterRedis,
pub postgres: ConfigIngesterPostgres,
pub program_transformer: ConfigIngesterProgramTransformer,
pub download_metadata_handler: ConfigDownloadMetadataHandler,
pub download_metadata: ConfigIngesterDownloadMetadata,
}

impl ConfigIngester {
Expand Down Expand Up @@ -368,16 +368,75 @@ impl ConfigIngesterProgramTransformer {
}

#[derive(Debug, Clone, Copy, Deserialize)]
pub struct ConfigDownloadMetadataHandler {
pub struct ConfigIngesterDownloadMetadata {
#[serde(
default = "ConfigDownloadMetadataHandler::default_max_attempts",
default = "ConfigIngesterDownloadMetadata::default_max_attempts",
deserialize_with = "deserialize_usize_str"
)]
pub max_attempts: usize,
}

impl ConfigDownloadMetadataHandler {
impl ConfigIngesterDownloadMetadata {
pub const fn default_max_attempts() -> usize {
3
}
}

#[derive(Debug, Deserialize)]
pub struct ConfigDownloadMetadata {
pub postgres: ConfigIngesterPostgres,
pub download_metadata: ConfigDownloadMetadataOpts,
}

#[derive(Debug, Clone, Copy, Deserialize)]
pub struct ConfigDownloadMetadataOpts {
#[serde(
default = "ConfigDownloadMetadataOpts::default_max_in_process",
deserialize_with = "deserialize_usize_str"
)]
pub max_in_process: usize,
#[serde(
default = "ConfigDownloadMetadataOpts::default_prefetch_queue_size",
deserialize_with = "deserialize_usize_str"
)]
pub prefetch_queue_size: usize,
#[serde(
default = "ConfigDownloadMetadataOpts::default_limit_to_fetch",
deserialize_with = "deserialize_usize_str"
)]
pub limit_to_fetch: usize,
#[serde(
default = "ConfigDownloadMetadataOpts::default_wait_tasks_max_idle",
deserialize_with = "deserialize_duration_str",
rename = "wait_tasks_max_idle_ms"
)]
pub wait_tasks_max_idle: Duration,
#[serde(
default = "ConfigDownloadMetadataOpts::default_download_timeout",
deserialize_with = "deserialize_duration_str",
rename = "download_timeout_ms"
)]
pub download_timeout: Duration,
}

impl ConfigDownloadMetadataOpts {
pub const fn default_max_in_process() -> usize {
50
}

pub const fn default_prefetch_queue_size() -> usize {
100
}

pub const fn default_limit_to_fetch() -> usize {
200
}

pub const fn default_wait_tasks_max_idle() -> Duration {
Duration::from_millis(100)
}

pub const fn default_download_timeout() -> Duration {
Duration::from_millis(5_000)
}
}
236 changes: 236 additions & 0 deletions nft_ingester2/src/download_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
use {
crate::{
config::{ConfigDownloadMetadata, ConfigDownloadMetadataOpts},
postgres::{create_pool as pg_create_pool, metrics_pgpool},
util::create_shutdown,
},
digital_asset_types::dao::{asset_data, sea_orm_active_enums::TaskStatus, tasks},
futures::{
future::{pending, FutureExt},
stream::StreamExt,
},
program_transformers::DownloadMetadataInfo,
reqwest::{ClientBuilder, StatusCode},
sea_orm::{
entity::{ActiveValue, ColumnTrait, EntityTrait},
query::{Condition, Order, QueryFilter, QueryOrder, QuerySelect},
sea_query::expr::Expr,
SqlxPostgresConnector, TransactionTrait,
},
sqlx::PgPool,
std::{sync::Arc, time::Duration},
tokio::{
sync::{mpsc, Notify},
task::JoinSet,
time::sleep,
},
tracing::{info, warn},
};

pub const TASK_TYPE: &str = "DownloadMetadata";

pub async fn run(config: ConfigDownloadMetadata) -> anyhow::Result<()> {
let mut shutdown = create_shutdown()?;

// open connection to postgres
let pool = pg_create_pool(config.postgres).await?;
tokio::spawn({
let pool = pool.clone();
async move { metrics_pgpool(pool).await }
});

// reset previously runned tasks
tokio::select! {
result = reset_pending_tasks(pool.clone()) => {
let updated = result?;
info!("Reset {updated} tasks to Pending status");
},
Some(signal) = shutdown.next() => {
warn!("{signal} received, waiting spawned tasks...");
return Ok(())
},
}

// prefetch queue
let (tasks_tx, mut tasks_rx) = mpsc::channel(config.download_metadata.prefetch_queue_size);
let prefetch_shutdown = Arc::new(Notify::new());
let prefetch_jh = {
let pool = pool.clone();
let download_metadata = config.download_metadata;
let shutdown = Arc::clone(&prefetch_shutdown);
async move {
tokio::select! {
result = get_pending_tasks(pool, tasks_tx, download_metadata) => result,
_ = shutdown.notified() => Ok(())
}
}
};
tokio::pin!(prefetch_jh);

// process tasks
let mut tasks = JoinSet::new();
loop {
let pending_task_fut = if tasks.len() >= config.download_metadata.max_in_process {
pending().boxed()
} else {
tasks_rx.recv().boxed()
};

let tasks_fut = if tasks.is_empty() {
pending().boxed()
} else {
tasks.join_next().boxed()
};

tokio::select! {
Some(signal) = shutdown.next() => {
warn!("{signal} received, waiting spawned tasks...");
break Ok(());
},
result = &mut prefetch_jh => break result,
Some(result) = tasks_fut => {
result??;
},
Some(pending_task) = pending_task_fut => {
tasks.spawn(execute_task(pool.clone(), pending_task, config.download_metadata.download_timeout));
}
};
}?;

tokio::select! {
Some(signal) = shutdown.next() => {
anyhow::bail!("{signal} received, force shutdown...");
}
result = async move {
// shutdown `prefetch` channel
prefetch_shutdown.notify_one();
// wait all spawned tasks
while let Some(result) = tasks.join_next().await {
result??;
}
// shutdown database connection
pool.close().await;
Ok::<(), anyhow::Error>(())
} => result,
}
}

// On startup reset tasks status
async fn reset_pending_tasks(pool: PgPool) -> anyhow::Result<u64> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
tasks::Entity::update_many()
.set(tasks::ActiveModel {
status: ActiveValue::Set(TaskStatus::Pending),
..Default::default()
})
.filter(
Condition::all()
.add(tasks::Column::Status.eq(TaskStatus::Running))
.add(tasks::Column::TaskType.eq(TASK_TYPE)),
)
.exec(&conn)
.await
.map(|result| result.rows_affected)
.map_err(Into::into)
}

// Select Pending tasks, update status to Running and send to prefetch queue
async fn get_pending_tasks(
pool: PgPool,
tasks_tx: mpsc::Sender<tasks::Model>,
config: ConfigDownloadMetadataOpts,
) -> anyhow::Result<()> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
loop {
let pending_tasks = tasks::Entity::find()
.filter(
Condition::all()
.add(tasks::Column::Status.eq(TaskStatus::Pending))
.add(
Expr::col(tasks::Column::Attempts)
.less_or_equal(Expr::col(tasks::Column::MaxAttempts)),
),
)
.order_by(tasks::Column::Attempts, Order::Asc)
.order_by(tasks::Column::CreatedAt, Order::Desc)
.limit(config.limit_to_fetch as u64)
.all(&conn)
.await?;

if pending_tasks.is_empty() {
sleep(config.wait_tasks_max_idle).await;
} else {
tasks::Entity::update_many()
.set(tasks::ActiveModel {
status: ActiveValue::Set(TaskStatus::Running),
..Default::default()
})
.filter(tasks::Column::Id.is_in(pending_tasks.iter().map(|v| v.id.clone())))
.exec(&conn)
.await?;

for task in pending_tasks {
tasks_tx
.send(task)
.await
.map_err(|_error| anyhow::anyhow!("failed to send task to prefetch queue"))?;
}
}
}
}

// Try to download metadata and remove task with asset_data update or update tasks to Pending/Failed
async fn execute_task(pool: PgPool, task: tasks::Model, timeout: Duration) -> anyhow::Result<()> {
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
match download_metadata(task.data, timeout).await {
Ok((asset_data_id, metadata)) => {
// Remove task and set metadata in transacstion
let txn = conn.begin().await?;
tasks::Entity::delete_by_id(task.id).exec(&txn).await?;
asset_data::Entity::update(asset_data::ActiveModel {
id: ActiveValue::Unchanged(asset_data_id),
metadata: ActiveValue::Set(metadata),
reindex: ActiveValue::Set(Some(false)),
..Default::default()
})
.exec(&txn)
.await?;
txn.commit().await?;
}
Err(error) => {
let status = if task.attempts + 1 == task.max_attempts {
TaskStatus::Failed
} else {
TaskStatus::Pending
};
tasks::Entity::update(tasks::ActiveModel {
id: ActiveValue::Unchanged(task.id),
status: ActiveValue::Set(status),
attempts: ActiveValue::Set(task.attempts + 1),
errors: ActiveValue::Set(Some(error.to_string())),
..Default::default()
})
.exec(&conn)
.await?;
}
}
Ok(())
}

async fn download_metadata(
data: serde_json::Value,
timeout: Duration,
) -> anyhow::Result<(Vec<u8>, serde_json::Value)> {
let (id, uri) = serde_json::from_value::<DownloadMetadataInfo>(data)?.into_inner();

// Need to check for malicious sites ?
let client = ClientBuilder::new().timeout(timeout).build()?;
let response = client.get(uri).send().await?;

anyhow::ensure!(
response.status() == StatusCode::OK,
"HttpError status_code: {}",
response.status().as_str()
);
Ok((id, response.json().await?))
}
Loading

0 comments on commit d0958c2

Please sign in to comment.