Skip to content

Commit

Permalink
program_transformers: remove plerkle
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Feb 21, 2024
1 parent 9e3f11a commit 97aee95
Show file tree
Hide file tree
Showing 16 changed files with 259 additions and 115 deletions.
33 changes: 16 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ clone_on_ref_ptr = "deny"
missing_const_for_fn = "deny"
trivially_copy_pass_by_ref = "deny"

[patch.crates-io]
blockbuster = { git = "https://github.com/rpcpool/blockbuster.git", tag = "blockbuster-v1.0.1-no-plerkle" }

[profile.release]
codegen-units = 1
lto = true
34 changes: 30 additions & 4 deletions integration_tests/tests/integration_tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ use migration::sea_orm::{
use migration::{Migrator, MigratorTrait};
use mpl_token_metadata::accounts::Metadata;

use nft_ingester::config;
use nft_ingester::{
config,
plerkle::{
parse_account_keys, parse_message_instructions, parse_meta_inner_instructions,
parse_pubkey, parse_signature, parse_slice,
},
};
use once_cell::sync::Lazy;
use plerkle_serialization::root_as_account_info;
use plerkle_serialization::root_as_transaction_info;
use plerkle_serialization::serializer::serialize_account;
use plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2;
use program_transformers::ProgramTransformer;
use program_transformers::{AccountInfo, ProgramTransformer, TransactionInfo};

use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::pubkey::Pubkey;
Expand Down Expand Up @@ -356,7 +362,12 @@ pub async fn index_account_bytes(setup: &TestSetup, account_bytes: Vec<u8>) {

setup
.transformer
.handle_account_update(account)
.handle_account_update(&AccountInfo {
slot: account.slot(),
pubkey: &parse_pubkey(account.pubkey()).expect("failed to parse account"),
owner: &parse_pubkey(account.owner()).expect("failed to parse account"),
data: parse_slice(account.data()).expect("failed to parse account"),
})
.await
.unwrap();
}
Expand Down Expand Up @@ -419,7 +430,22 @@ async fn cached_fetch_transaction(setup: &TestSetup, sig: Signature) -> Vec<u8>
pub async fn index_transaction(setup: &TestSetup, sig: Signature) {
let txn_bytes: Vec<u8> = cached_fetch_transaction(setup, sig).await;
let txn = root_as_transaction_info(&txn_bytes).unwrap();
setup.transformer.handle_transaction(&txn).await.unwrap();
setup
.transformer
.handle_transaction(&TransactionInfo {
slot: txn.slot(),
signature: &parse_signature(txn.signature()).expect("failed to parse transaction"),
account_keys: &parse_account_keys(txn.account_keys())
.expect("failed to parse transaction"),
message_instructions: &parse_message_instructions(txn.outer_instructions())
.expect("failed to parse transaction"),
meta_inner_instructions: &parse_meta_inner_instructions(
txn.compiled_inner_instructions(),
)
.expect("failed to parse transaction"),
})
.await
.unwrap();
}

async fn cached_fetch_largest_token_account_id(client: &RpcClient, mint: Pubkey) -> Pubkey {
Expand Down
19 changes: 17 additions & 2 deletions nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use {
crate::{
metric,
metrics::capture_result,
plerkle::{parse_pubkey, parse_slice},
tasks::{create_download_metadata_notifier, TaskData},
},
cadence_macros::{is_global_default_set, statsd_count, statsd_time},
chrono::Utc,
log::{debug, error},
plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData},
plerkle_serialization::root_as_account_info,
program_transformers::ProgramTransformer,
program_transformers::{error::ProgramTransformerResult, AccountInfo, ProgramTransformer},
sqlx::{Pool, Postgres},
std::sync::Arc,
tokio::{
Expand Down Expand Up @@ -103,7 +104,7 @@ async fn handle_account(
account = Some(bs58::encode(pubkey.0.as_slice()).into_string());
}
let begin_processing = Instant::now();
let res = manager.handle_account_update(account_update).await;
let res = handle_account_update(manager, account_update).await;
let should_ack = capture_result(
id.clone(),
stream_key,
Expand All @@ -120,3 +121,17 @@ async fn handle_account(
}
ret_id
}

async fn handle_account_update<'a>(
manager: Arc<ProgramTransformer>,
account_update: plerkle_serialization::AccountInfo<'_>,
) -> ProgramTransformerResult<()> {
manager
.handle_account_update(&AccountInfo {
slot: account_update.slot(),
pubkey: &parse_pubkey(account_update.pubkey())?,
owner: &parse_pubkey(account_update.owner())?,
data: parse_slice(account_update.data())?,
})
.await
}
2 changes: 1 addition & 1 deletion nft_ingester/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
pub mod account_updates;
pub mod ack;
pub mod backfiller;
pub mod config;
pub mod database;
pub mod error;
pub mod metrics;
pub mod plerkle;
pub mod stream;
pub mod tasks;
pub mod transaction_notifications;
1 change: 1 addition & 0 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod config;
mod database;
pub mod error;
pub mod metrics;
mod plerkle;
mod stream;
pub mod tasks;
mod transaction_notifications;
Expand Down
80 changes: 80 additions & 0 deletions nft_ingester/src/plerkle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use {
flatbuffers::{ForwardsUOffset, Vector},
plerkle_serialization::{
CompiledInnerInstructions as FBCompiledInnerInstructions,
CompiledInstruction as FBCompiledInstruction, Pubkey as FBPubkey,
},
program_transformers::error::{ProgramTransformerError, ProgramTransformerResult},
solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature},
solana_transaction_status::{InnerInstruction, InnerInstructions},
};

fn deser_err() -> ProgramTransformerError {
ProgramTransformerError::DeserializationError("Could not deserialize data".to_owned())
}

pub fn parse_pubkey(pubkey: Option<&FBPubkey>) -> ProgramTransformerResult<Pubkey> {
Ok(Pubkey::try_from(pubkey.ok_or_else(deser_err)?.0.as_slice())
.expect("valid key from FlatBuffer"))
}

pub fn parse_slice(data: Option<Vector<'_, u8>>) -> ProgramTransformerResult<&[u8]> {
data.map(|data| data.bytes()).ok_or_else(deser_err)
}

pub fn parse_signature(data: Option<&str>) -> ProgramTransformerResult<Signature> {
data.ok_or_else(deser_err)?
.parse()
.map_err(|_error| deser_err())
}

pub fn parse_account_keys(
keys: Option<Vector<'_, FBPubkey>>,
) -> ProgramTransformerResult<Vec<Pubkey>> {
let mut account_keys = vec![];
for key in keys.ok_or_else(deser_err)? {
account_keys.push(Pubkey::try_from(key.0.as_slice()).expect("valid key from FlatBuffer"));
}
Ok(account_keys)
}

pub fn parse_message_instructions(
vec_cix: Option<Vector<'_, ForwardsUOffset<FBCompiledInstruction>>>,
) -> ProgramTransformerResult<Vec<CompiledInstruction>> {
let mut message_instructions = vec![];
for cix in vec_cix.ok_or_else(deser_err)? {
message_instructions.push(CompiledInstruction {
program_id_index: cix.program_id_index(),
accounts: cix.accounts().ok_or_else(deser_err)?.bytes().to_vec(),
data: cix.data().ok_or_else(deser_err)?.bytes().to_vec(),
})
}
Ok(message_instructions)
}

pub fn parse_meta_inner_instructions(
vec_ixs: Option<Vector<'_, ForwardsUOffset<FBCompiledInnerInstructions>>>,
) -> ProgramTransformerResult<Vec<InnerInstructions>> {
let mut meta_inner_instructions = vec![];
for ixs in vec_ixs.ok_or_else(deser_err)? {
let mut instructions = vec![];
for ix in ixs.instructions().ok_or_else(deser_err)? {
let cix = ix.compiled_instruction().ok_or_else(deser_err)?;
instructions.push(InnerInstruction {
instruction: CompiledInstruction {
program_id_index: cix.program_id_index(),
accounts: cix.accounts().ok_or_else(deser_err)?.bytes().to_vec(),
data: cix.data().ok_or_else(deser_err)?.bytes().to_vec(),
},
stack_height: Some(ix.stack_height() as u32),
});
}
meta_inner_instructions.push(InnerInstructions {
index: ixs.index(),
instructions,
})
}
Ok(meta_inner_instructions)

// TODO, same as blockbuster
}
25 changes: 23 additions & 2 deletions nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ use {
crate::{
metric,
metrics::capture_result,
plerkle::{
parse_account_keys, parse_message_instructions, parse_meta_inner_instructions,
parse_signature,
},
tasks::{create_download_metadata_notifier, TaskData},
},
cadence_macros::{is_global_default_set, statsd_count, statsd_time},
chrono::Utc,
log::{debug, error},
plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData},
plerkle_serialization::root_as_transaction_info,
program_transformers::ProgramTransformer,
program_transformers::{error::ProgramTransformerResult, ProgramTransformer, TransactionInfo},
sqlx::{Pool, Postgres},
std::sync::Arc,
tokio::{
Expand Down Expand Up @@ -101,7 +105,7 @@ async fn handle_transaction(
}

let begin = Instant::now();
let res = manager.handle_transaction(&tx).await;
let res = handle_transaction_update(manager, tx).await;
let should_ack = capture_result(
id.clone(),
stream_key,
Expand All @@ -118,3 +122,20 @@ async fn handle_transaction(
}
ret_id
}

async fn handle_transaction_update<'a>(
manager: Arc<ProgramTransformer>,
tx: plerkle_serialization::TransactionInfo<'_>,
) -> ProgramTransformerResult<()> {
manager
.handle_transaction(&TransactionInfo {
slot: tx.slot(),
signature: &parse_signature(tx.signature())?,
account_keys: &parse_account_keys(tx.account_keys())?,
message_instructions: &parse_message_instructions(tx.outer_instructions())?,
meta_inner_instructions: &parse_meta_inner_instructions(
tx.compiled_inner_instructions(),
)?,
})
.await
}
2 changes: 1 addition & 1 deletion program_transformers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ digital_asset_types = { workspace = true, features = ["json_types", "sql_types"]
futures = { workspace = true }
mpl-bubblegum = { workspace = true }
num-traits = { workspace = true }
plerkle_serialization = { workspace = true }
sea-orm = { workspace = true }
serde_json = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
spl-account-compression = { workspace = true, features = ["no-entrypoint"] }
spl-token = { workspace = true, features = ["no-entrypoint"] }
sqlx = { workspace = true }
Expand Down
Loading

0 comments on commit 97aee95

Please sign in to comment.