From e7d283bb2d7dde5e04d2c2010b78724076f75568 Mon Sep 17 00:00:00 2001 From: 0o-de-lally <1364012+0o-de-lally@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:20:38 -0500 Subject: [PATCH] load snapshot --- src/extract_snapshot.rs | 2 +- src/lib.rs | 3 +- src/load_account_state.rs | 32 +++++++++++ src/load_exchange_orders.rs | 2 +- src/schema_account_state.rs | 54 +++++++++++++++++-- ...ge_orders.rs => schema_exchange_orders.rs} | 0 tests/test_extract_state.rs | 8 +++ tests/test_load_state.rs | 30 +++++++++++ .../{test_e2e_tx_neo4j.rs => test_load_tx.rs} | 16 +++--- tests/test_supporting_data.rs | 2 +- 10 files changed, 134 insertions(+), 15 deletions(-) create mode 100644 src/load_account_state.rs rename src/{exchange_orders.rs => schema_exchange_orders.rs} (100%) create mode 100644 tests/test_load_state.rs rename tests/{test_e2e_tx_neo4j.rs => test_load_tx.rs} (93%) diff --git a/src/extract_snapshot.rs b/src/extract_snapshot.rs index 33b4916..a55341a 100644 --- a/src/extract_snapshot.rs +++ b/src/extract_snapshot.rs @@ -31,7 +31,7 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result().ok() { - s.balance = Some(b.coin()) + s.balance = b.coin() } if let Some(sw) = acc.get_resource::().ok() { s.slow_wallet_locked = sw.unlocked; diff --git a/src/lib.rs b/src/lib.rs index 3af10b4..ff5ed80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,11 @@ pub mod cypher_templates; pub mod enrich_exchange_onboarding; pub mod enrich_whitepages; -pub mod exchange_orders; +pub mod schema_exchange_orders; pub mod extract_snapshot; pub mod extract_transactions; pub mod load; +pub mod load_account_state; pub mod load_exchange_orders; pub mod load_tx_cypher; pub mod neo4j_init; diff --git a/src/load_account_state.rs b/src/load_account_state.rs new file mode 100644 index 0000000..e9423e9 --- /dev/null +++ b/src/load_account_state.rs @@ -0,0 +1,32 @@ +use log::info; +use neo4rs::Graph; +use anyhow::{Context, Result}; +use crate::schema_account_state::WarehouseAccState; + + + +pub async fn impl_batch_snapshot_insert( + pool: &Graph, + batch_snapshots: &[WarehouseAccState], +) -> Result<()> { + + let list_str = WarehouseAccState::to_cypher_map(batch_snapshots); + let cypher_string = WarehouseAccState::cypher_batch_insert_str(&list_str); + + // Execute the query + let cypher_query = neo4rs::query(&cypher_string); + let mut res = pool + .execute(cypher_query) + .await + .context("execute query error")?; + + let row = res.next().await?.context("no row returned")?; + + let merged_snapshots: u64 = row + .get("merged_snapshots") + .context("no unique_accounts field")?; + + info!("merged snapshots: {}", merged_snapshots); + + Ok(()) +} diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index e478384..8aa6045 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -5,7 +5,7 @@ use log::{error, info, warn}; use neo4rs::{query, Graph}; use crate::{ - exchange_orders::{read_orders_from_file, ExchangeOrder}, + schema_exchange_orders::{read_orders_from_file, ExchangeOrder}, queue, }; diff --git a/src/schema_account_state.rs b/src/schema_account_state.rs index feccd4e..050ca4f 100644 --- a/src/schema_account_state.rs +++ b/src/schema_account_state.rs @@ -6,7 +6,7 @@ pub struct WarehouseAccState { pub address: AccountAddress, pub time: WarehouseTime, pub sequence_num: u64, - pub balance: Option, + pub balance: u64, pub slow_wallet_locked: u64, pub slow_wallet_transferred: u64, } @@ -17,10 +17,9 @@ impl WarehouseAccState { address, sequence_num: 0, time: WarehouseTime::default(), - balance: None, + balance: 0, slow_wallet_locked: 0, slow_wallet_transferred: 0, - } } pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) { @@ -36,3 +35,52 @@ pub struct WarehouseTime { pub version: u64, pub epoch: u64, } + +impl WarehouseAccState { + /// creates one transaction record in the cypher query map format + /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 + pub fn to_cypher_object_template(&self) -> String { + format!( + r#"{{address: {}, balance: {} }}"#, + self.address, + self.balance, + // self.order_type, + // self.amount, + // self.price, + // self.created_at.to_rfc3339(), + // self.created_at.timestamp_micros(), + // self.filled_at.to_rfc3339(), + // self.filled_at.timestamp_micros() + ) + } + + /// create a cypher query string for the map object + pub fn to_cypher_map(list: &[Self]) -> String { + let mut list_literal = "".to_owned(); + for el in list { + let s = el.to_cypher_object_template(); + list_literal.push_str(&s); + list_literal.push(','); + } + list_literal.pop(); // need to drop last comma "," + format!("[{}]", list_literal) + } + + pub fn cypher_batch_insert_str(list_str: &str) -> String { + format!( + r#" + WITH {list_str} AS tx_data + UNWIND tx_data AS tx + + MATCH (addr:Account {{address: tx.address}}) + + MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance }}) + MERGE (addr)-[rel:State]->(snap) + + RETURN + COUNT(snap) AS merged_snapshots + +"# + ) + } +} diff --git a/src/exchange_orders.rs b/src/schema_exchange_orders.rs similarity index 100% rename from src/exchange_orders.rs rename to src/schema_exchange_orders.rs diff --git a/tests/test_extract_state.rs b/tests/test_extract_state.rs index 730f0a5..b92e125 100644 --- a/tests/test_extract_state.rs +++ b/tests/test_extract_state.rs @@ -11,6 +11,14 @@ async fn test_extract_v5_manifest() -> Result<()> { let s = extract_v5_snapshot(&manifest_file).await?; // NOTE: the parsing drops 1 blob, which is the 0x1 account, because it would not have the DiemAccount struct on it as a user address would have. assert!(s.len() == 17338); + let first = s.first().unwrap(); + + assert!(&first.address.to_hex_literal() == "0x407d4d486fdc4e796504135e545be77"); + assert!(first.balance == 100135989588); + assert!(first.slow_wallet_locked == 140001000000); + assert!(first.slow_wallet_transferred == 15999000000); + assert!(first.sequence_num == 7); + Ok(()) } diff --git a/tests/test_load_state.rs b/tests/test_load_state.rs new file mode 100644 index 0000000..3e0d6f9 --- /dev/null +++ b/tests/test_load_state.rs @@ -0,0 +1,30 @@ + +mod support; + +use libra_forensic_db::{ + extract_snapshot::extract_v5_snapshot, load_account_state::impl_batch_snapshot_insert, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes} +}; +use support::{fixtures::v5_state_manifest_fixtures_path, neo4j_testcontainer::start_neo4j_container}; + +#[tokio::test] +async fn test_snapshot_batch() -> anyhow::Result<()> { + libra_forensic_db::log_setup(); + let manifest_file = v5_state_manifest_fixtures_path().join("state.manifest"); + assert!(manifest_file.exists()); + let s = extract_v5_snapshot(&manifest_file).await?; + + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let graph = get_neo4j_localhost_pool(port) + .await + .expect("could not get neo4j connection pool"); + maybe_create_indexes(&graph) + .await + .expect("could start index"); + + impl_batch_snapshot_insert(&graph, &s).await?; + + + Ok(()) +} diff --git a/tests/test_e2e_tx_neo4j.rs b/tests/test_load_tx.rs similarity index 93% rename from tests/test_e2e_tx_neo4j.rs rename to tests/test_load_tx.rs index 2f2e4fe..681540b 100644 --- a/tests/test_e2e_tx_neo4j.rs +++ b/tests/test_load_tx.rs @@ -1,14 +1,15 @@ mod support; use anyhow::Result; use diem_crypto::HashValue; -use libra_forensic_db::cypher_templates::{write_batch_tx_string, write_batch_user_create}; -use libra_forensic_db::load::try_load_one_archive; -use libra_forensic_db::load_tx_cypher::tx_batch; -use libra_forensic_db::scan::scan_dir_archive; -use libra_forensic_db::schema_transaction::WarehouseTxMaster; + use libra_forensic_db::{ + cypher_templates::{write_batch_tx_string, write_batch_user_create}, extract_transactions::extract_current_transactions, + load::try_load_one_archive, + load_tx_cypher::tx_batch, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, + scan::scan_dir_archive, + schema_transaction::WarehouseTxMaster, }; use neo4rs::query; use support::neo4j_testcontainer::start_neo4j_container; @@ -37,9 +38,8 @@ async fn test_tx_batch() -> anyhow::Result<()> { assert!(res.created_accounts == 60); assert!(res.modified_accounts == 228); assert!(res.unchanged_accounts == 0); - assert!(res.created_tx == txs.len() as u64); // 705 - // CHECK - // get the sum of all transactions in db + assert!(res.created_tx == txs.len() as u64); + let cypher_query = query( "MATCH ()-[r:Tx]->() RETURN count(r) AS total_tx_count", diff --git a/tests/test_supporting_data.rs b/tests/test_supporting_data.rs index 894ddd6..58e4c2c 100644 --- a/tests/test_supporting_data.rs +++ b/tests/test_supporting_data.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use anyhow::Result; use libra_forensic_db::{ - exchange_orders::{read_orders_from_file, ExchangeOrder}, + schema_exchange_orders::{read_orders_from_file, ExchangeOrder}, load_exchange_orders, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}, };