Skip to content

Commit

Permalink
nft_ingester: use program_transformers crate
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 1, 2023
1 parent 1b2759b commit 52d1773
Show file tree
Hide file tree
Showing 27 changed files with 119 additions and 2,509 deletions.
30 changes: 1 addition & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ anchor-lang = "0.28.0"
anyhow = "1.0.75"
async-std = "1.0.0"
async-trait = "0.1.60"
base64 = "0.21.0"
blockbuster = "0.9.0-beta.1"
borsh = "~0.10.3"
bs58 = "0.4.0"
Expand All @@ -43,7 +42,6 @@ fake = "2.5.0"
figment = "0.10.8"
flatbuffers = "23.1.21"
futures = "0.3.28"
futures-util = "0.3.27"
hex = "0.4.3"
hyper = "0.14.23"
indexmap = "1.9.3"
Expand All @@ -60,12 +58,12 @@ mpl-candy-machine-core = "2.0.1"
mpl-token-metadata = "=2.0.0-beta.1"
nft_ingester = { path = "nft_ingester" }
num-derive = "0.3.3"
num-integer = { version = "0.1.44", default_features = false }
num-traits = "0.2.15"
open-rpc-derive = "0.0.4"
open-rpc-schema = "0.0.4"
plerkle_messenger = "1.6.0"
plerkle_serialization = "1.6.0"
program_transformers = { path = "program_transformers" }
prometheus = "0.13.3"
proxy-wasm = "0.2.0"
rand = "0.8.5"
Expand All @@ -82,10 +80,8 @@ serde = "1.0.137"
serde_json = "1.0.81"
solana-account-decoder = "~1.16.16"
solana-client = "~1.16.16"
solana-geyser-plugin-interface = "~1.16.16"
solana-program = "~1.16.16"
solana-sdk = "~1.16.16"
solana-sdk-macro = "~1.16.16"
solana-transaction-status = "~1.16.16"
spl-account-compression = "0.2.0"
spl-associated-token-account = ">= 1.1.3, < 3.0"
Expand All @@ -104,7 +100,6 @@ tracing = "0.1.35"
tracing-subscriber = "0.3.16"
txn_forwarder = { path = "tools/txn_forwarder" }
url = "2.3.1"
uuid = "1.0.0"
wasi = "0.7.0"
wasm-bindgen = "0.2.83"

Expand Down
18 changes: 1 addition & 17 deletions nft_ingester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ repository = { workspace = true }
publish = { workspace = true }

[dependencies]
anchor-lang = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
blockbuster = { workspace = true }
borsh = { workspace = true }
bs58 = { workspace = true }
Expand All @@ -17,46 +15,32 @@ cadence-macros = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "cargo"] }
digital_asset_types = { workspace = true, features = ["json_types", "sql_types"] }
env_logger = { workspace = true }
figment = { workspace = true, features = ["env", "toml", "yaml"] }
flatbuffers = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
mpl-bubblegum = { workspace = true }
num-integer = { workspace = true }
num-traits = { workspace = true }
plerkle_messenger = { workspace = true, features = ["redis"] }
plerkle_serialization = { workspace = true }
program_transformers = { workspace = true }
rand = { workspace = true }
redis = { workspace = true, features = ["aio", "tokio-comp", "streams", "tokio-native-tls-comp"] }
regex = { workspace = true }
reqwest = { workspace = true }
rust-crypto = { workspace = true }
sea-orm = { workspace = true, features = ["macros", "runtime-tokio-rustls", "sqlx-postgres", "with-chrono", "mock"] }
sea-query = { workspace = true, features = ["postgres-array"] }
serde = { workspace = true }
serde_json = { workspace = true }
solana-account-decoder = { workspace = true }
solana-client = { workspace = true }
solana-geyser-plugin-interface = { workspace = true }
solana-sdk = { workspace = true }
solana-sdk-macro = { workspace = true }
solana-transaction-status = { workspace = true }
spl-account-compression = { workspace = true, features = ["no-entrypoint"] }
spl-concurrent-merkle-tree = { workspace = true }
spl-token = { workspace = true, features = ["no-entrypoint"] }
sqlx = { workspace = true, features = ["macros", "runtime-tokio-rustls", "postgres", "uuid", "offline", "json"] }
stretto = { workspace = true, features = ["async"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["tracing"] }
tokio-postgres = { workspace = true }
tokio-stream = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json", "env-filter", "ansi"] }
url = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
40 changes: 24 additions & 16 deletions nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
use std::sync::Arc;

use crate::{
metric, metrics::capture_result, program_transformers::ProgramTransformer, tasks::TaskData,
};
use cadence_macros::{is_global_default_set, statsd_count, statsd_time};
use chrono::Utc;
use log::{debug, error};
use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData};
use plerkle_serialization::root_as_account_info;
use sqlx::{Pool, Postgres};
use tokio::{
sync::mpsc::UnboundedSender,
task::{JoinHandle, JoinSet},
time::Instant,
use {
crate::{
metric,
metrics::capture_result,
tasks::{create_download_metadata_notifier, TaskData},
},
cadence_macros::{is_global_default_set, statsd_count, statsd_time},
chrono::Utc,
log::{debug, error},
plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData},
plerkle_serialization::root_as_account_info,
program_transformers::ProgramTransformer,
sqlx::{Pool, Postgres},
std::sync::Arc,
tokio::{
sync::mpsc::UnboundedSender,
task::{JoinHandle, JoinSet},
time::Instant,
},
};

pub fn account_worker<T: Messenger>(
Expand All @@ -26,7 +30,11 @@ pub fn account_worker<T: Messenger>(
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false));
let manager = Arc::new(ProgramTransformer::new(
pool,
create_download_metadata_notifier(bg_task_sender),
false,
));
loop {
let e = msg.recv(stream_key, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
Expand Down
50 changes: 6 additions & 44 deletions nft_ingester/src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,15 @@
use crate::tasks::TaskData;
use blockbuster::error::BlockbusterError;
use plerkle_messenger::MessengerError;
use plerkle_serialization::error::PlerkleSerializationError;
use sea_orm::{DbErr, TransactionError};
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
use {
crate::tasks::TaskData, plerkle_messenger::MessengerError,
plerkle_serialization::error::PlerkleSerializationError, sea_orm::DbErr,
tokio::sync::mpsc::error::SendError,
};

#[derive(Error, Debug, PartialEq, Eq)]
#[derive(Debug, thiserror::Error)]
pub enum IngesterError {
#[error("ChangeLog Event Malformed")]
ChangeLogEventMalformed,
#[error("Compressed Asset Event Malformed")]
CompressedAssetEventMalformed,
#[error("Network Error: {0}")]
BatchInitNetworkingError(String),
#[error("Error writing batch files")]
BatchInitIOError,
#[error("Storage listener error: ({msg})")]
StorageListenerError { msg: String },
#[error("Storage Write Error: {0}")]
StorageWriteError(String),
#[error("NotImplemented")]
NotImplemented,
#[error("Deserialization Error: {0}")]
DeserializationError(String),
#[error("Task Manager Error: {0}")]
Expand All @@ -36,12 +24,6 @@ pub enum IngesterError {
SerializatonError(String),
#[error("Messenger error; {0}")]
MessengerError(String),
#[error("Blockbuster Parsing error: {0}")]
ParsingError(String),
#[error("Database Error: {0}")]
DatabaseError(String),
#[error("Unknown Task Type: {0}")]
UnknownTaskType(String),
#[error("BG Task Manager Not Started")]
TaskManagerNotStarted,
#[error("Unrecoverable task error: {0}")]
Expand All @@ -50,8 +32,6 @@ pub enum IngesterError {
CacheStorageWriteError(String),
#[error("HttpError {status_code}")]
HttpError { status_code: String },
#[error("AssetIndex Error {0}")]
AssetIndexError(String),
}

impl From<reqwest::Error> for IngesterError {
Expand All @@ -72,30 +52,12 @@ impl From<serde_json::Error> for IngesterError {
}
}

impl From<BlockbusterError> for IngesterError {
fn from(err: BlockbusterError) -> Self {
IngesterError::ParsingError(err.to_string())
}
}

impl From<std::io::Error> for IngesterError {
fn from(_err: std::io::Error) -> Self {
IngesterError::BatchInitIOError
}
}

impl From<DbErr> for IngesterError {
fn from(e: DbErr) -> Self {
IngesterError::StorageWriteError(e.to_string())
}
}

impl From<TransactionError<IngesterError>> for IngesterError {
fn from(e: TransactionError<IngesterError>) -> Self {
IngesterError::StorageWriteError(e.to_string())
}
}

impl From<SendError<TaskData>> for IngesterError {
fn from(err: SendError<TaskData>) -> Self {
IngesterError::TaskManagerError(format!("Could not create task: {:?}", err.to_string()))
Expand Down
1 change: 0 additions & 1 deletion nft_ingester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub mod config;
pub mod database;
pub mod error;
pub mod metrics;
pub mod program_transformers;
pub mod stream;
pub mod tasks;
pub mod transaction_notifications;
1 change: 0 additions & 1 deletion nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ pub mod config;
mod database;
pub mod error;
pub mod metrics;
mod program_transformers;
mod stream;
pub mod tasks;
mod transaction_notifications;
Expand Down
Loading

0 comments on commit 52d1773

Please sign in to comment.