Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cypher] include cumulative tx amount in Lifetime relation #14

Merged
merged 18 commits into from
Jan 28, 2025
Prev Previous commit
add framework version to txs
Franci MacLento committed Jan 28, 2025
commit 28f4d8bb853e59a474357e38bebed013254dee4c
3 changes: 2 additions & 1 deletion src/cypher_templates.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,8 @@ SET
rel.block_datetime = tx.block_datetime,
rel.block_timestamp = tx.block_timestamp,
rel.relation = tx.relation,
rel.function = tx.function
rel.function = tx.function,
rel.framework_version = tx.framework_version

// Conditionally add `tx.args` if it exists
FOREACH (_ IN CASE WHEN tx.args IS NOT NULL THEN [1] ELSE [] END |
14 changes: 12 additions & 2 deletions src/extract_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::decode_entry_function::decode_entry_function_all_versions;
use crate::scan::FrameworkVersion;
use crate::schema_transaction::{RelationLabel, UserEventTypes, WarehouseEvent, WarehouseTxMaster};
use anyhow::Result;
use chrono::DateTime;
@@ -14,6 +15,7 @@ use std::path::Path;

pub async fn extract_current_transactions(
archive_path: &Path,
framework_version: &FrameworkVersion,
) -> Result<(Vec<WarehouseTxMaster>, Vec<WarehouseEvent>)> {
let manifest_file = archive_path.join("transaction.manifest");
assert!(
@@ -75,8 +77,14 @@ pub async fn extract_current_transactions(
events.append(&mut decoded_events);

if let Some(signed_transaction) = tx.try_as_signed_user_txn() {
let tx =
make_master_tx(signed_transaction, epoch, round, timestamp, decoded_events)?;
let tx = make_master_tx(
signed_transaction,
epoch,
round,
timestamp,
decoded_events,
framework_version,
)?;

// sanity check that we are talking about the same block, and reading vectors sequentially.
if tx.tx_hash != tx_hash_info {
@@ -105,6 +113,7 @@ pub fn make_master_tx(
round: u64,
block_timestamp: u64,
events: Vec<WarehouseEvent>,
framework_version: &FrameworkVersion,
) -> Result<WarehouseTxMaster> {
let tx_hash = user_tx.clone().committed_hash();
let raw = user_tx.raw_transaction_ref();
@@ -136,6 +145,7 @@ pub fn make_master_tx(
relation_label,
block_datetime: DateTime::from_timestamp_micros(block_timestamp as i64).unwrap(),
events,
framework_version: framework_version.clone(),
};

Ok(tx)
15 changes: 11 additions & 4 deletions src/json_rescue_v5_extract.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{
schema_transaction::LEGACY_REBASE_MULTIPLIER,
schema_transaction::{EntryFunctionArgs, RelationLabel, WarehouseEvent, WarehouseTxMaster},
scan::FrameworkVersion,
schema_transaction::{
EntryFunctionArgs, RelationLabel, WarehouseEvent, WarehouseTxMaster,
LEGACY_REBASE_MULTIPLIER,
},
unzip_temp::decompress_tar_archive,
};
use chrono::DateTime;
@@ -45,12 +48,16 @@ pub fn decode_transaction_dataview_v5(
let mut unique_functions = vec![];

for t in txs {
let mut wtxs = WarehouseTxMaster::default();
let mut wtxs = WarehouseTxMaster {
framework_version: FrameworkVersion::V5,
..Default::default()
};

let timestamp = t.timestamp_usecs.unwrap_or(0);
if let TransactionDataView::UserTransaction { sender, script, .. } = &t.transaction {
wtxs.sender = cast_legacy_account(sender)?;

// must cast from V5 Hashvalue buffer layout
// must cast from V5 HashValue buffer layout
wtxs.tx_hash = HashValue::from_slice(t.hash.to_vec())?;

wtxs.function = make_function_name(script);
2 changes: 1 addition & 1 deletion src/load.rs
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ pub async fn try_load_one_archive(
snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?;
}
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&archive_path).await?;
let (txs, _) = extract_current_transactions(&archive_path, &man.version).await?;
let batch_res =
load_tx_cypher::tx_batch(&txs, pool, batch_size, &man.archive_id).await?;
all_results.increment(&batch_res);
3 changes: 2 additions & 1 deletion src/scan.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ use libra_backwards_compatibility::version_five::{
transaction_manifest_v5::v5_read_from_transaction_manifest,
};
use libra_storage::read_snapshot::load_snapshot_manifest;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
fmt,
@@ -59,7 +60,7 @@ impl ManifestInfo {
FrameworkVersion::Unknown
}
}
#[derive(Clone, Debug, Default, PartialEq)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub enum FrameworkVersion {
#[default]
Unknown,
7 changes: 5 additions & 2 deletions src/schema_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::cypher_templates::to_cypher_object;
use crate::{cypher_templates::to_cypher_object, scan::FrameworkVersion};

use chrono::{DateTime, Utc};
use diem_crypto::HashValue;
@@ -115,6 +115,7 @@ pub struct WarehouseTxMaster {
pub expiration_timestamp: u64,
pub entry_function: Option<EntryFunctionArgs>,
pub events: Vec<WarehouseEvent>,
pub framework_version: FrameworkVersion,
// TODO framework version
}

@@ -132,6 +133,7 @@ impl Default for WarehouseTxMaster {
expiration_timestamp: 0,
entry_function: None,
events: vec![],
framework_version: FrameworkVersion::Unknown,
}
}
}
@@ -155,7 +157,7 @@ impl WarehouseTxMaster {
None => "".to_string(),
};
format!(
r#"{{ args: {maybe_args_here},{maybe_coins_here}tx_hash: "{}", block_datetime: datetime("{}"), block_timestamp: {}, relation: "{}", function: "{}", sender: "{}", recipient: "{}"}}"#,
r#"{{ args: {maybe_args_here},{maybe_coins_here}tx_hash: "{}", block_datetime: datetime("{}"), block_timestamp: {}, relation: "{}", function: "{}", sender: "{}", recipient: "{}", framework_version: "{}"}}"#,
self.tx_hash.to_hex_literal(),
self.block_datetime.to_rfc3339(),
self.block_timestamp,
@@ -167,6 +169,7 @@ impl WarehouseTxMaster {
.get_recipient()
.unwrap_or(self.sender)
.to_hex_literal(),
self.framework_version,
maybe_args_here = tx_args,
maybe_coins_here = coins_literal
)
8 changes: 5 additions & 3 deletions tests/test_extract_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
mod support;

use libra_forensic_db::extract_transactions::extract_current_transactions;
use libra_forensic_db::{
extract_transactions::extract_current_transactions, scan::FrameworkVersion,
};

#[tokio::test]
async fn test_extract_tx_from_archive() -> anyhow::Result<()> {
let archive_path = support::fixtures::v7_tx_manifest_fixtures_path();
let list = extract_current_transactions(&archive_path).await?;
let list = extract_current_transactions(&archive_path, &FrameworkVersion::V6).await?;

assert!(list.0.len() == 6);

@@ -15,7 +17,7 @@ async fn test_extract_tx_from_archive() -> anyhow::Result<()> {
#[tokio::test]
async fn test_extract_v6_tx_from_archive() -> anyhow::Result<()> {
let archive_path = support::fixtures::v6_tx_manifest_fixtures_path();
let list = extract_current_transactions(&archive_path).await?;
let list = extract_current_transactions(&archive_path, &FrameworkVersion::V6).await?;

assert!(list.0.len() == 27);
assert!(list.1.len() == 52);
4 changes: 2 additions & 2 deletions tests/test_load_tx.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use libra_forensic_db::{
load::try_load_one_archive,
load_tx_cypher::tx_batch,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
scan::scan_dir_archive,
scan::{scan_dir_archive, FrameworkVersion},
schema_transaction::WarehouseTxMaster,
};
use neo4rs::query;
@@ -18,7 +18,7 @@ use support::neo4j_testcontainer::start_neo4j_container;
async fn test_tx_batch() -> anyhow::Result<()> {
libra_forensic_db::log_setup();
let archive_path = support::fixtures::v6_tx_manifest_fixtures_path();
let (txs, _events) = extract_current_transactions(&archive_path).await?;
let (txs, _events) = extract_current_transactions(&archive_path, &FrameworkVersion::V6).await?;
assert!(txs.len() == 27);

let c = start_neo4j_container();