Skip to content

Commit

Permalink
remove plerkle from program_transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Nov 24, 2023
1 parent 02acd4b commit 7f51b96
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 67 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ txn_forwarder = { path = "tools/txn_forwarder" }
url = "2.3.1"

[patch.crates-io]
blockbuster = { path = "/home/kirill/projects/blockbuster/blockbuster" }
blockbuster = { git = "https://github.com/rpcpool/blockbuster.git", tag = "blockbuster-rm-plerkle-v0.9.0-beta.1" }

[workspace.lints.clippy]
clone_on_ref_ptr = "deny"
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
metric,
metrics::capture_result,
plerkle::{parse_pubkey, parse_vector},
plerkle::{parse_pubkey, parse_slice},
tasks::{create_download_metadata_notifier, TaskData},
},
cadence_macros::{is_global_default_set, statsd_count, statsd_time},
Expand Down Expand Up @@ -131,7 +131,7 @@ async fn handle_account_update<'a>(
slot: account_update.slot(),
pubkey: &parse_pubkey(account_update.pubkey())?,
owner: &parse_pubkey(account_update.owner())?,
data: parse_vector(account_update.data())?,
data: parse_slice(account_update.data())?,
})
.await
}
88 changes: 70 additions & 18 deletions nft_ingester/src/plerkle.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,78 @@
use {
flatbuffers::Vector,
plerkle_serialization::Pubkey as FBPubkey,
flatbuffers::{ForwardsUOffset, Vector},
plerkle_serialization::{
CompiledInnerInstructions as FBCompiledInnerInstructions,
CompiledInstruction as FBCompiledInstruction, Pubkey as FBPubkey,
},
program_transformers::error::{ProgramTransformerError, ProgramTransformerResult},
solana_sdk::pubkey::Pubkey,
solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature},
solana_transaction_status::{InnerInstruction, InnerInstructions},
};

fn deser_err() -> ProgramTransformerError {
ProgramTransformerError::DeserializationError("Could not deserialize data".to_owned())
}

pub fn parse_pubkey(pubkey: Option<&FBPubkey>) -> ProgramTransformerResult<Pubkey> {
Ok(Pubkey::try_from(
pubkey
.ok_or_else(|| {
ProgramTransformerError::DeserializationError(
"Could not deserialize data".to_owned(),
)
})?
.0
.as_slice(),
)
.expect("valid key from FlatBuffer"))
Ok(Pubkey::try_from(pubkey.ok_or_else(deser_err)?.0.as_slice())
.expect("valid key from FlatBuffer"))
}

pub fn parse_slice(data: Option<Vector<'_, u8>>) -> ProgramTransformerResult<&[u8]> {
data.map(|data| data.bytes()).ok_or_else(deser_err)
}

pub fn parse_signature(data: Option<&str>) -> ProgramTransformerResult<Signature> {
data.ok_or_else(deser_err)?
.parse()
.map_err(|_error| deser_err())
}

pub fn parse_account_keys(
keys: Option<Vector<'_, FBPubkey>>,
) -> ProgramTransformerResult<Vec<Pubkey>> {
let mut account_keys = vec![];
for key in keys.ok_or_else(deser_err)? {
account_keys.push(Pubkey::try_from(key.0.as_slice()).expect("valid key from FlatBuffer"));
}
Ok(account_keys)
}

pub fn parse_message_instructions(
vec_cix: Option<Vector<'_, ForwardsUOffset<FBCompiledInstruction>>>,
) -> ProgramTransformerResult<Vec<CompiledInstruction>> {
let mut message_instructions = vec![];
for cix in vec_cix.ok_or_else(deser_err)? {
message_instructions.push(CompiledInstruction {
program_id_index: cix.program_id_index(),
accounts: cix.accounts().ok_or_else(deser_err)?.bytes().to_vec(),
data: cix.data().ok_or_else(deser_err)?.bytes().to_vec(),
})
}
Ok(message_instructions)
}

pub fn parse_vector(data: Option<Vector<'_, u8>>) -> ProgramTransformerResult<&[u8]> {
data.map(|data| data.bytes()).ok_or_else(|| {
ProgramTransformerError::DeserializationError("Could not deserialize data".to_owned())
})
pub fn parse_meta_inner_instructions(
vec_ixs: Option<Vector<'_, ForwardsUOffset<FBCompiledInnerInstructions>>>,
) -> ProgramTransformerResult<Vec<InnerInstructions>> {
let mut meta_inner_instructions = vec![];
for ixs in vec_ixs.ok_or_else(deser_err)? {
let mut instructions = vec![];
for ix in ixs.instructions().ok_or_else(deser_err)? {
let cix = ix.compiled_instruction().ok_or_else(deser_err)?;
instructions.push(InnerInstruction {
instruction: CompiledInstruction {
program_id_index: cix.program_id_index(),
accounts: cix.accounts().ok_or_else(deser_err)?.bytes().to_vec(),
data: cix.data().ok_or_else(deser_err)?.bytes().to_vec(),
},
stack_height: Some(ix.stack_height() as u32),
});
}
meta_inner_instructions.push(InnerInstructions {
index: ixs.index(),
instructions,
})
}
Ok(meta_inner_instructions)
}
25 changes: 23 additions & 2 deletions nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ use {
crate::{
metric,
metrics::capture_result,
plerkle::{
parse_account_keys, parse_message_instructions, parse_meta_inner_instructions,
parse_signature,
},
tasks::{create_download_metadata_notifier, TaskData},
},
cadence_macros::{is_global_default_set, statsd_count, statsd_time},
chrono::Utc,
log::{debug, error},
plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData},
plerkle_serialization::root_as_transaction_info,
program_transformers::ProgramTransformer,
program_transformers::{error::ProgramTransformerResult, ProgramTransformer, TransactionInfo},
sqlx::{Pool, Postgres},
std::sync::Arc,
tokio::{
Expand Down Expand Up @@ -101,7 +105,7 @@ async fn handle_transaction(
}

let begin = Instant::now();
let res = manager.handle_transaction(&tx).await;
let res = handle_transaction_update(manager, tx).await;
let should_ack = capture_result(
id.clone(),
stream_key,
Expand All @@ -118,3 +122,20 @@ async fn handle_transaction(
}
ret_id
}

async fn handle_transaction_update<'a>(
manager: Arc<ProgramTransformer>,
tx: plerkle_serialization::TransactionInfo<'_>,
) -> ProgramTransformerResult<()> {
manager
.handle_transaction(&TransactionInfo {
slot: tx.slot(),
signature: &parse_signature(tx.signature())?,
account_keys: parse_account_keys(tx.account_keys())?,
message_instructions: &parse_message_instructions(tx.outer_instructions())?,
meta_inner_instructions: &parse_meta_inner_instructions(
tx.compiled_inner_instructions(),
)?,
})
.await
}
2 changes: 1 addition & 1 deletion program_transformers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ digital_asset_types = { workspace = true }
futures = { workspace = true }
mpl-bubblegum = { workspace = true }
num-traits = { workspace = true }
plerkle_serialization = { workspace = true }
sea-orm = { workspace = true, features = [] }
serde_json = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
spl-account-compression = { workspace = true, features = ["no-entrypoint"] }
spl-token = { workspace = true, features = ["no-entrypoint"] }
sqlx = { workspace = true, features = [] }
Expand Down
16 changes: 4 additions & 12 deletions program_transformers/src/bubblegum/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,10 @@ pub async fn decompress<'c, T>(
where
T: ConnectionTrait + TransactionTrait,
{
let id_bytes = bundle.keys.get(3).unwrap().0.as_slice();
let id_bytes = bundle.keys.get(3).unwrap().to_bytes().to_vec();

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?;
upsert_asset_with_compression_info(
txn,
id_bytes.to_vec(),
false,
false,
1,
Some(id_bytes.to_vec()),
true,
)
.await
upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.clone()).await?;
upsert_asset_with_compression_info(txn, id_bytes.clone(), false, false, 1, Some(id_bytes), true)
.await
}
4 changes: 2 additions & 2 deletions program_transformers/src/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
} else {
Some(delegate.to_bytes().to_vec())
};
let tree_id = bundle.keys.get(3).unwrap().0.to_vec();
let tree_id = bundle.keys.get(3).unwrap().to_bytes().to_vec();

// ActiveValue::Set initial mint info.
let asset_model = asset::ActiveModel {
Expand Down Expand Up @@ -311,7 +311,7 @@ where
// Insert into `asset_authority` table.
let model = asset_authority::ActiveModel {
asset_id: ActiveValue::Set(id_bytes.to_vec()),
authority: ActiveValue::Set(bundle.keys.get(0).unwrap().0.to_vec()), //TODO - we need to rem,ove the optional bubblegum signer logic
authority: ActiveValue::Set(bundle.keys.get(0).unwrap().to_bytes().to_vec()), //TODO - we need to rem,ove the optional bubblegum signer logic
seq: ActiveValue::Set(seq as i64),
slot_updated: ActiveValue::Set(slot_i),
..Default::default()
Expand Down
60 changes: 32 additions & 28 deletions program_transformers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use {
},
},
futures::future::BoxFuture,
plerkle_serialization::{Pubkey as FBPubkey, TransactionInfo},
sea_orm::{DatabaseConnection, SqlxPostgresConnector},
solana_sdk::pubkey::Pubkey,
solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature},
solana_transaction_status::InnerInstructions,
sqlx::PgPool,
std::collections::{HashMap, HashSet, VecDeque},
tracing::{debug, error, info},
Expand All @@ -35,6 +35,15 @@ pub struct AccountInfo<'a> {
pub data: &'a [u8],
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TransactionInfo<'a> {
pub slot: u64,
pub signature: &'a Signature,
pub account_keys: Vec<Pubkey>,
pub message_instructions: &'a [CompiledInstruction],
pub meta_inner_instructions: &'a [InnerInstructions],
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DownloadMetadataInfo {
asset_data_id: Vec<u8>,
Expand Down Expand Up @@ -96,46 +105,42 @@ impl ProgramTransformer {
}
}

pub fn break_transaction<'i>(
pub fn break_transaction<'a>(
&self,
tx: &'i TransactionInfo<'i>,
) -> VecDeque<(IxPair<'i>, Option<Vec<IxPair<'i>>>)> {
let ref_set: HashSet<&[u8]> = self.key_set.iter().map(|k| k.as_ref()).collect();
order_instructions(ref_set, tx)
tx_info: &'a TransactionInfo<'_>,
) -> VecDeque<(IxPair<'a>, Option<Vec<IxPair<'a>>>)> {
order_instructions(
&self.key_set,
&tx_info.account_keys,
tx_info.message_instructions,
tx_info.meta_inner_instructions,
)
}

#[allow(clippy::borrowed_box)]
pub fn match_program(&self, key: &Pubkey) -> Option<&Box<dyn ProgramParser>> {
self.parsers.get(key)
}

pub async fn handle_transaction<'a>(
pub async fn handle_transaction(
&self,
tx: &'a TransactionInfo<'a>,
tx_info: &TransactionInfo<'_>,
) -> ProgramTransformerResult<()> {
let sig: Option<&str> = tx.signature();
info!("Handling Transaction: {:?}", sig);
let instructions = self.break_transaction(tx);
let accounts = tx.account_keys().unwrap_or_default();
let slot = tx.slot();
let txn_id = tx.signature().unwrap_or("");
let mut keys: Vec<FBPubkey> = Vec::with_capacity(accounts.len());
for k in accounts.into_iter() {
keys.push(*k);
}
info!("Handling Transaction: {:?}", tx_info.signature);
let instructions = self.break_transaction(tx_info);
let mut not_impl = 0;
let ixlen = instructions.len();
debug!("Instructions: {}", ixlen);
let contains = instructions
.iter()
.filter(|(ib, _inner)| ib.0 .0.as_ref() == mpl_bubblegum::ID.as_ref());
.filter(|(ib, _inner)| ib.0 == mpl_bubblegum::ID);
debug!("Instructions bgum: {}", contains.count());
for (outer_ix, inner_ix) in instructions {
let (program, instruction) = outer_ix;
let ix_accounts = instruction.accounts().unwrap().iter().collect::<Vec<_>>();
let ix_accounts = &instruction.accounts;
let ix_account_len = ix_accounts.len();
let max = ix_accounts.iter().max().copied().unwrap_or(0) as usize;
if keys.len() < max {
if tx_info.account_keys.len() < max {
return Err(ProgramTransformerError::DeserializationError(
"Missing Accounts in Serialized Ixn/Txn".to_string(),
));
Expand All @@ -144,22 +149,21 @@ impl ProgramTransformer {
ix_accounts
.iter()
.fold(Vec::with_capacity(ix_account_len), |mut acc, a| {
if let Some(key) = keys.get(*a as usize) {
if let Some(key) = tx_info.account_keys.get(*a as usize) {
acc.push(*key);
}
acc
});
let ix = InstructionBundle {
txn_id,
txn_id: &tx_info.signature.to_string(),
program,
instruction: Some(instruction),
inner_ix,
keys: ix_accounts.as_slice(),
slot,
slot: tx_info.slot,
};

let program_key =
Pubkey::try_from(ix.program.0.as_slice()).expect("valid key from FlatBuffer");
let program_key = ix.program;
if let Some(program) = self.match_program(&program_key) {
debug!("Found a ix for program: {:?}", program.key());
let result = program.handle_instruction(&ix)?;
Expand All @@ -177,7 +181,7 @@ impl ProgramTransformer {
.map_err(|err| {
error!(
"Failed to handle bubblegum instruction for txn {:?}: {:?}",
sig, err
tx_info.signature, err
);
err
})?;
Expand Down

0 comments on commit 7f51b96

Please sign in to comment.