Skip to content

Commit

Permalink
insert download metadata tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored and kespinola committed Apr 16, 2024
1 parent 53fa481 commit 11b4151
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 27 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions metaplex-rpc-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ lazy_static = { workspace = true }
log = { workspace = true }
proxy-wasm = { workspace = true }
regex = { workspace = true }
wasi = { workspace = true }
wasm-bindgen = { workspace = true }

[lints]
workspace = true
2 changes: 1 addition & 1 deletion nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
log::{debug, error},
plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData},
plerkle_serialization::root_as_transaction_info,
program_transformers::{error::ProgramTransformerResult, ProgramTransformer, TransactionInfo},
program_transformers::ProgramTransformer,
sqlx::{Pool, Postgres},
std::sync::Arc,
tokio::{
Expand Down
5 changes: 5 additions & 0 deletions nft_ingester2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ sqlx = { workspace = true, features = [
"offline",
"json",
] }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
digital_asset_types = { workspace = true }
futures = { workspace = true }
hyper = { workspace = true, features = ["server"] }
json5 = { workspace = true }
Expand All @@ -28,7 +30,10 @@ opentelemetry_sdk = { workspace = true, features = ["trace"] }
program_transformers = { workspace = true }
prometheus = { workspace = true }
redis = { workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"] }
rust-crypto = { workspace = true }
sea-orm = { workspace = true, features = ["sqlx-postgres"] }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
solana-sdk = { workspace = true } # only prom rn
tokio = { workspace = true, features = [
Expand Down
6 changes: 4 additions & 2 deletions nft_ingester2/config-run.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ redis:
postgres:
url: postgres://solana:solana@localhost/solana
min_connections: 10
max_connections: 25
max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible
program_transformer:
transactions_cl_audits: false
max_tasks_in_process: 100
max_tasks_in_process: 40
download_metadata_handler:
max_attempts: 3
29 changes: 27 additions & 2 deletions nft_ingester2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
serde::{de, Deserialize},
std::{net::SocketAddr, path::Path, time::Duration},
tokio::fs,
tracing::warn,
yellowstone_grpc_tools::config::{
deserialize_usize_str, ConfigGrpcRequestAccounts, ConfigGrpcRequestCommitment,
ConfigGrpcRequestTransactions,
Expand Down Expand Up @@ -154,6 +155,15 @@ pub struct ConfigIngester {
pub redis: ConfigIngesterRedis,
pub postgres: ConfigIngesterPostgres,
pub program_transformer: ConfigIngesterProgramTransformer,
pub download_metadata_handler: ConfigDownloadMetadataHandler,
}

impl ConfigIngester {
pub fn check(&self) {
if self.postgres.max_connections < self.program_transformer.max_tasks_in_process {
warn!("`postgres.max_connections` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible");
}
}
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -305,7 +315,7 @@ impl ConfigIngesterPostgres {
}

pub const fn default_max_connections() -> usize {
25
50
}
}

Expand All @@ -326,6 +336,21 @@ impl ConfigIngesterProgramTransformer {
}

pub const fn default_max_tasks_in_process() -> usize {
100
40
}
}

#[derive(Debug, Clone, Copy, Deserialize)]
pub struct ConfigDownloadMetadataHandler {
#[serde(
default = "ConfigDownloadMetadataHandler::default_max_attempts",
deserialize_with = "deserialize_usize_str"
)]
pub max_attempts: usize,
}

impl ConfigDownloadMetadataHandler {
pub const fn default_max_attempts() -> usize {
3
}
}
85 changes: 65 additions & 20 deletions nft_ingester2/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use {
crate::{
config::ConfigIngester,
config::{ConfigDownloadMetadataHandler, ConfigIngester},
postgres::{create_pool as pg_create_pool, metrics_pgpool},
prom::{
program_transformer_task_status_inc, program_transformer_tasks_total_set,
ProgramTransformerTaskStatusKind,
download_metadata_inserted_total_inc, program_transformer_task_status_inc,
program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind,
},
redis::{metrics_xlen, ProgramTransformerInfo, RedisStream},
util::create_shutdown,
},
chrono::Utc,
crypto::{digest::Digest, sha2::Sha256},
digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks},
futures::{
future::{pending, BoxFuture, FusedFuture, FutureExt},
stream::StreamExt,
Expand All @@ -17,9 +20,18 @@ use {
error::ProgramTransformerError, DownloadMetadataInfo, DownloadMetadataNotifier,
ProgramTransformer,
},
std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
sea_orm::{
entity::{ActiveModelTrait, ActiveValue},
error::{DbErr, RuntimeErr},
SqlxPostgresConnector,
},
sqlx::{Error as SqlxError, PgPool},
std::{
borrow::Cow,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
},
tokio::{
task::JoinSet,
Expand All @@ -29,8 +41,6 @@ use {
};

pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
println!("{:#?}", config);

// connect to Redis
let client = redis::Client::open(config.redis.url.clone())?;
let connection = client.get_multiplexed_tokio_connection().await?;
Expand Down Expand Up @@ -62,12 +72,12 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
// program transforms related
let pt_accounts = Arc::new(ProgramTransformer::new(
pgpool.clone(),
create_notifier(),
create_download_metadata_notifier(pgpool.clone(), config.download_metadata_handler)?,
false,
));
let pt_transactions = Arc::new(ProgramTransformer::new(
pgpool.clone(),
create_notifier(),
create_download_metadata_notifier(pgpool.clone(), config.download_metadata_handler)?,
config.program_transformer.transactions_cl_audits,
));
let pt_max_tasks_in_process = config.program_transformer.max_tasks_in_process;
Expand Down Expand Up @@ -214,14 +224,49 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
result
}

fn create_notifier() -> DownloadMetadataNotifier {
Box::new(
move |_info: DownloadMetadataInfo| -> BoxFuture<
'static,
Result<(), Box<dyn std::error::Error + Send + Sync>>,
> {
// TODO
Box::pin(async move { Ok(()) })
},
)
fn create_download_metadata_notifier(
pgpool: PgPool,
config: ConfigDownloadMetadataHandler,
) -> anyhow::Result<DownloadMetadataNotifier> {
let max_attempts = config.max_attempts.try_into()?;
Ok(Box::new(move |info: DownloadMetadataInfo| -> BoxFuture<
'static,
Result<(), Box<dyn std::error::Error + Send + Sync>>,
> {
let pgpool = pgpool.clone();
Box::pin(async move {
const NAME: &str = "DownloadMetadata";

let data = serde_json::to_value(info)?;

let mut hasher = Sha256::new();
hasher.input(NAME.as_bytes());
hasher.input(serde_json::to_vec(&data)?.as_slice());
let hash = hasher.result_str();

let model = tasks::ActiveModel {
id: ActiveValue::Set(hash),
task_type: ActiveValue::Set(NAME.to_owned()),
data: ActiveValue::Set(data),
status: ActiveValue::Set(TaskStatus::Pending),
created_at: ActiveValue::Set(Utc::now().naive_utc()),
locked_until: ActiveValue::Set(None),
locked_by: ActiveValue::Set(None),
max_attempts: ActiveValue::Set(max_attempts),
attempts: ActiveValue::Set(0),
duration: ActiveValue::Set(None),
errors: ActiveValue::Set(None),
};
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pgpool);

match model.insert(&conn).await.map(|_mode| ()) {
// skip unique_violation error
Err(DbErr::Query(RuntimeErr::SqlxError(SqlxError::Database(dberr)))) if dberr.code() == Some(Cow::Borrowed("23505")) => {},
value => value?,
};
download_metadata_inserted_total_inc();

Ok(())
})
}))
}
1 change: 1 addition & 0 deletions nft_ingester2/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async fn main() -> anyhow::Result<()> {
let config = config_load::<ConfigIngester>(&args.config)
.await
.with_context(|| format!("failed to parse config from: {}", args.config))?;
config.check();
ingester::run(config).await
}
}
Expand Down
11 changes: 10 additions & 1 deletion nft_ingester2/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
},
prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
std::{net::SocketAddr, sync::Once},
tracing::{error, info},
};
Expand Down Expand Up @@ -46,6 +46,10 @@ lazy_static::lazy_static! {
Opts::new("program_transformer_task_status", "Status of processed messages"),
&["status"],
).unwrap();

static ref DOWNLOAD_METADATA_INSERTED_TOTAL: IntCounter = IntCounter::new(
"download_metadata_inserted_total", "Total number of inserted tasks for download metadata"
).unwrap();
}

pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
Expand All @@ -65,6 +69,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(PGPOOL_CONNECTIONS_TOTAL);
register!(PROGRAM_TRANSFORMER_TASKS_TOTAL);
register!(PROGRAM_TRANSFORMER_TASK_STATUS);
register!(DOWNLOAD_METADATA_INSERTED_TOTAL);

VERSION
.with_label_values(&[
Expand Down Expand Up @@ -171,3 +176,7 @@ pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKin
}])
.inc()
}

pub fn download_metadata_inserted_total_inc() {
DOWNLOAD_METADATA_INSERTED_TOTAL.inc()
}
1 change: 1 addition & 0 deletions program_transformers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ heck = { workspace = true }
mpl-bubblegum = { workspace = true }
num-traits = { workspace = true }
sea-orm = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion program_transformers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use {
entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr,
SqlxPostgresConnector, TransactionTrait,
},
serde::Serialize,
solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature},
solana_transaction_status::InnerInstructions,
sqlx::PgPool,
Expand Down Expand Up @@ -52,7 +53,7 @@ pub struct TransactionInfo {
pub meta_inner_instructions: Vec<InnerInstructions>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct DownloadMetadataInfo {
asset_data_id: Vec<u8>,
uri: String,
Expand Down

0 comments on commit 11b4151

Please sign in to comment.