Skip to content

Commit

Permalink
load snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 22, 2024
1 parent 7350865 commit e7d283b
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/extract_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<Warehous
}

if let Some(b) = acc.get_resource::<BalanceResourceV5>().ok() {
s.balance = Some(b.coin())
s.balance = b.coin()
}
if let Some(sw) = acc.get_resource::<SlowWalletResourceV5>().ok() {
s.slow_wallet_locked = sw.unlocked;
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
pub mod cypher_templates;
pub mod enrich_exchange_onboarding;
pub mod enrich_whitepages;
pub mod exchange_orders;
pub mod schema_exchange_orders;
pub mod extract_snapshot;
pub mod extract_transactions;
pub mod load;
pub mod load_account_state;
pub mod load_exchange_orders;
pub mod load_tx_cypher;
pub mod neo4j_init;
Expand Down
32 changes: 32 additions & 0 deletions src/load_account_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use log::info;
use neo4rs::Graph;
use anyhow::{Context, Result};
use crate::schema_account_state::WarehouseAccState;



pub async fn impl_batch_snapshot_insert(
pool: &Graph,
batch_snapshots: &[WarehouseAccState],
) -> Result<()> {

let list_str = WarehouseAccState::to_cypher_map(batch_snapshots);
let cypher_string = WarehouseAccState::cypher_batch_insert_str(&list_str);

// Execute the query
let cypher_query = neo4rs::query(&cypher_string);
let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;

let row = res.next().await?.context("no row returned")?;

let merged_snapshots: u64 = row
.get("merged_snapshots")
.context("no unique_accounts field")?;

info!("merged snapshots: {}", merged_snapshots);

Ok(())
}
2 changes: 1 addition & 1 deletion src/load_exchange_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use log::{error, info, warn};
use neo4rs::{query, Graph};

use crate::{
exchange_orders::{read_orders_from_file, ExchangeOrder},
schema_exchange_orders::{read_orders_from_file, ExchangeOrder},
queue,
};

Expand Down
54 changes: 51 additions & 3 deletions src/schema_account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub struct WarehouseAccState {
pub address: AccountAddress,
pub time: WarehouseTime,
pub sequence_num: u64,
pub balance: Option<u64>,
pub balance: u64,
pub slow_wallet_locked: u64,
pub slow_wallet_transferred: u64,
}
Expand All @@ -17,10 +17,9 @@ impl WarehouseAccState {
address,
sequence_num: 0,
time: WarehouseTime::default(),
balance: None,
balance: 0,
slow_wallet_locked: 0,
slow_wallet_transferred: 0,

}
}
pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) {
Expand All @@ -36,3 +35,52 @@ pub struct WarehouseTime {
pub version: u64,
pub epoch: u64,
}

impl WarehouseAccState {
/// creates one transaction record in the cypher query map format
/// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000
pub fn to_cypher_object_template(&self) -> String {
format!(
r#"{{address: {}, balance: {} }}"#,
self.address,
self.balance,
// self.order_type,
// self.amount,
// self.price,
// self.created_at.to_rfc3339(),
// self.created_at.timestamp_micros(),
// self.filled_at.to_rfc3339(),
// self.filled_at.timestamp_micros()
)
}

/// create a cypher query string for the map object
pub fn to_cypher_map(list: &[Self]) -> String {
let mut list_literal = "".to_owned();
for el in list {
let s = el.to_cypher_object_template();
list_literal.push_str(&s);
list_literal.push(',');
}
list_literal.pop(); // need to drop last comma ","
format!("[{}]", list_literal)
}

pub fn cypher_batch_insert_str(list_str: &str) -> String {
format!(
r#"
WITH {list_str} AS tx_data
UNWIND tx_data AS tx
MATCH (addr:Account {{address: tx.address}})
MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance }})
MERGE (addr)-[rel:State]->(snap)
RETURN
COUNT(snap) AS merged_snapshots
"#
)
}
}
File renamed without changes.
8 changes: 8 additions & 0 deletions tests/test_extract_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ async fn test_extract_v5_manifest() -> Result<()> {
let s = extract_v5_snapshot(&manifest_file).await?;
// NOTE: the parsing drops 1 blob, which is the 0x1 account, because it would not have the DiemAccount struct on it as a user address would have.
assert!(s.len() == 17338);
let first = s.first().unwrap();

assert!(&first.address.to_hex_literal() == "0x407d4d486fdc4e796504135e545be77");
assert!(first.balance == 100135989588);
assert!(first.slow_wallet_locked == 140001000000);
assert!(first.slow_wallet_transferred == 15999000000);
assert!(first.sequence_num == 7);

Ok(())
}

Expand Down
30 changes: 30 additions & 0 deletions tests/test_load_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

mod support;

use libra_forensic_db::{
extract_snapshot::extract_v5_snapshot, load_account_state::impl_batch_snapshot_insert, neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes}
};
use support::{fixtures::v5_state_manifest_fixtures_path, neo4j_testcontainer::start_neo4j_container};

#[tokio::test]
async fn test_snapshot_batch() -> anyhow::Result<()> {
libra_forensic_db::log_setup();
let manifest_file = v5_state_manifest_fixtures_path().join("state.manifest");
assert!(manifest_file.exists());
let s = extract_v5_snapshot(&manifest_file).await?;


let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_localhost_pool(port)
.await
.expect("could not get neo4j connection pool");
maybe_create_indexes(&graph)
.await
.expect("could start index");

impl_batch_snapshot_insert(&graph, &s).await?;


Ok(())
}
16 changes: 8 additions & 8 deletions tests/test_e2e_tx_neo4j.rs → tests/test_load_tx.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
mod support;
use anyhow::Result;
use diem_crypto::HashValue;
use libra_forensic_db::cypher_templates::{write_batch_tx_string, write_batch_user_create};
use libra_forensic_db::load::try_load_one_archive;
use libra_forensic_db::load_tx_cypher::tx_batch;
use libra_forensic_db::scan::scan_dir_archive;
use libra_forensic_db::schema_transaction::WarehouseTxMaster;

use libra_forensic_db::{
cypher_templates::{write_batch_tx_string, write_batch_user_create},
extract_transactions::extract_current_transactions,
load::try_load_one_archive,
load_tx_cypher::tx_batch,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
scan::scan_dir_archive,
schema_transaction::WarehouseTxMaster,
};
use neo4rs::query;
use support::neo4j_testcontainer::start_neo4j_container;
Expand Down Expand Up @@ -37,9 +38,8 @@ async fn test_tx_batch() -> anyhow::Result<()> {
assert!(res.created_accounts == 60);
assert!(res.modified_accounts == 228);
assert!(res.unchanged_accounts == 0);
assert!(res.created_tx == txs.len() as u64); // 705
// CHECK
// get the sum of all transactions in db
assert!(res.created_tx == txs.len() as u64);

let cypher_query = query(
"MATCH ()-[r:Tx]->()
RETURN count(r) AS total_tx_count",
Expand Down
2 changes: 1 addition & 1 deletion tests/test_supporting_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::path::PathBuf;

use anyhow::Result;
use libra_forensic_db::{
exchange_orders::{read_orders_from_file, ExchangeOrder},
schema_exchange_orders::{read_orders_from_file, ExchangeOrder},
load_exchange_orders,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
};
Expand Down

0 comments on commit e7d283b

Please sign in to comment.