Skip to content

Commit

Permalink
Merge branch 'main' into cumulative-tx-amount
Browse files Browse the repository at this point in the history
  • Loading branch information
Rosina Saint Pianissimo committed Jan 28, 2025
2 parents 75901a7 + df4d7cb commit 5877022
Show file tree
Hide file tree
Showing 20 changed files with 3,482 additions and 101 deletions.
2 changes: 1 addition & 1 deletion src/analytics/enrich_account_funding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use crate::schema_exchange_orders::{ExchangeOrder, OrderType};

#[cfg(test)]
use crate::date_util::parse_date;
use crate::util::parse_date;

#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct AccountDataAlt {
Expand Down
8 changes: 0 additions & 8 deletions src/date_util.rs

This file was deleted.

42 changes: 22 additions & 20 deletions src/enrich_exchange_onboarding.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::util::de_address_from_any_string;
use anyhow::{Context, Result};
use diem_types::account_address::AccountAddress;
use log::info;
use neo4rs::Graph;
use serde::{Deserialize, Deserializer, Serialize};
use serde::{Deserialize, Serialize};
use std::path::Path;

// Exchange onboard json files are formatted like so:
Expand All @@ -11,33 +12,34 @@ use std::path::Path;
// [
// {
// "user_id": 189,
// "onboarding_addr": "01F3B9C815FEB654718DE5D53CD665699A2B80951B696939E2D9EC27D0126BAD"
// "onramp_address": "01F3B9C815FEB654718DE5D53CD665699A2B80951B696939E2D9EC27D0126BAD"
// },
// ...
// ]

#[derive(Debug, Serialize, Deserialize)]
pub struct ExchangeOnRamp {
#[serde(deserialize_with = "from_any_string")]
onboarding_addr: Option<AccountAddress>,
#[serde(deserialize_with = "de_address_from_any_string")]
onramp_address: Option<AccountAddress>,
// TODO: this should be string, since exchanges/bridges will have different identifiers
user_id: u64,
}

fn from_any_string<'de, D>(deserializer: D) -> Result<Option<AccountAddress>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
// do better hex decoding than this
let mut lower = s.to_ascii_lowercase();
if !lower.contains("0x") {
lower = format!("0x{}", lower);
}

Ok(AccountAddress::from_hex_literal(&lower).ok())
}

// fn from_any_string<'de, D>(deserializer: D) -> Result<Option<AccountAddress>, D::Error>
// where
// D: Deserializer<'de>,
// {
// let s: &str = Deserialize::deserialize(deserializer)?;
// // do better hex decoding than this
// let mut lower = s.to_ascii_lowercase();
// if !lower.contains("0x") {
// lower = format!("0x{}", lower);
// }

// Ok(AccountAddress::from_hex_literal(&lower).ok())
// }

/// get exchnge Onramp file
// TODO: boilerplate copy of enrich_whitepages
impl ExchangeOnRamp {
pub fn parse_json_file(path: &Path) -> Result<Vec<Self>> {
Expand All @@ -49,7 +51,7 @@ impl ExchangeOnRamp {
format!(
r#"{{user_id: {}, address: "{}" }}"#,
self.user_id,
self.onboarding_addr.as_ref().unwrap().to_hex_literal(),
self.onramp_address.as_ref().unwrap().to_hex_literal(),
)
}

Expand All @@ -58,7 +60,7 @@ impl ExchangeOnRamp {
let mut list_literal = "".to_owned();
for el in list {
// skip empty records
if el.onboarding_addr.is_none() {
if el.onramp_address.is_none() {
continue;
};
let s = el.to_cypher_object_template();
Expand Down
36 changes: 14 additions & 22 deletions src/enrich_whitepages.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
use crate::util::de_address_from_any_string;
use anyhow::{Context, Result};
use diem_types::account_address::AccountAddress;
use log::info;
use log::{error, info};
use neo4rs::Graph;
use serde::{Deserialize, Deserializer, Serialize};
use serde::{Deserialize, Serialize};
use std::path::Path;

#[derive(Debug, Serialize, Deserialize)]
pub struct Whitepages {
#[serde(deserialize_with = "from_any_string")]
#[serde(deserialize_with = "de_address_from_any_string")]
address: Option<AccountAddress>,
owner: Option<String>,
address_note: Option<String>,
}

fn from_any_string<'de, D>(deserializer: D) -> Result<Option<AccountAddress>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
// do better hex decoding than this
let mut lower = s.to_ascii_lowercase();
if !lower.contains("0x") {
lower = format!("0x{}", lower);
}
Ok(AccountAddress::from_hex_literal(&lower).ok())
}

impl Whitepages {
pub fn parse_json_file(path: &Path) -> Result<Vec<Self>> {
let s = std::fs::read_to_string(path)?;
Ok(serde_json::from_str(&s)?)
}

pub fn to_cypher_object_template(&self) -> String {
format!(
r#"{{owner: "{}", address: "{}" }}"#,
self.owner.as_ref().unwrap(),
self.address.as_ref().unwrap().to_hex_literal(),
// self.address_note,
)
if let Some(addr) = &self.address {
format!(
r#"{{owner: "{}", address: "{}"}}"#,
self.owner.as_ref().unwrap(),
addr.to_hex_literal(),
)
} else {
error!("missing address at {:#?}", &self);
"".to_string()
}
}

/// create a cypher query string for the map object
Expand Down
16 changes: 11 additions & 5 deletions src/extract_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Result;
use diem_types::account_view::AccountView;
use libra_backwards_compatibility::version_five::{
balance_v5::BalanceResourceV5,
ol_tower_state::TowerStateResource,
ol_wallet::SlowWalletResourceV5,
state_snapshot_v5::{v5_accounts_from_manifest_path, v5_read_from_snapshot_manifest},
};
Expand All @@ -23,10 +24,11 @@ use crate::{
};

// uses libra-compatibility to parse the v5 manifest files, and decode v5 format bytecode into current version data structures (v6+);
pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<WarehouseAccState>> {
pub async fn extract_v5_snapshot(archive_path: &Path) -> Result<Vec<WarehouseAccState>> {
let v5_manifest_path = archive_path.join("state.manifest");
// NOTE: this is duplicated with next step.
let manifest_data = v5_read_from_snapshot_manifest(v5_manifest_path)?;
let account_blobs = v5_accounts_from_manifest_path(v5_manifest_path).await?;
let manifest_data = v5_read_from_snapshot_manifest(&v5_manifest_path)?;
let account_blobs = v5_accounts_from_manifest_path(&v5_manifest_path).await?;

// TODO: see below, massively inefficient
let time = WarehouseTime {
Expand Down Expand Up @@ -57,10 +59,14 @@ pub async fn extract_v5_snapshot(v5_manifest_path: &Path) -> Result<Vec<Warehous
s.balance = b.coin()
}
if let Ok(sw) = acc.get_resource::<SlowWalletResourceV5>() {
s.slow_wallet_locked = sw.unlocked;
s.slow_wallet_unlocked = sw.unlocked;
s.slow_wallet_transferred = sw.transferred;
}

if let Ok(tower) = acc.get_resource::<TowerStateResource>() {
s.miner_height = Some(tower.verified_tower_height);
}

warehouse_state.push(s);
}
Err(e) => {
Expand Down Expand Up @@ -111,7 +117,7 @@ pub async fn extract_current_snapshot(archive_path: &Path) -> Result<Vec<Warehou
}

if let Some(sw) = el.get_resource::<SlowWalletResource>()? {
s.slow_wallet_locked = sw.unlocked;
s.slow_wallet_unlocked = sw.unlocked;
s.slow_wallet_transferred = sw.transferred;
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod analytics;
pub mod batch_tx_type;
pub mod cypher_templates;
pub mod date_util;
pub mod decode_entry_function;
pub mod enrich_exchange_onboarding;
pub mod enrich_whitepages;
Expand All @@ -21,6 +20,7 @@ pub mod schema_account_state;
pub mod schema_exchange_orders;
pub mod schema_transaction;
pub mod unzip_temp;
pub mod util;
pub mod v5_rpc_to_raw;
pub mod warehouse_cli;

Expand Down
8 changes: 4 additions & 4 deletions src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ impl ManifestInfo {
self.version = FrameworkVersion::V7;
};

if v5_read_from_snapshot_manifest(&self.archive_dir).is_ok() {
if v5_read_from_snapshot_manifest(&self.archive_dir.join("state.manifest")).is_ok()
{
self.version = FrameworkVersion::V5;
}
}
Expand All @@ -58,7 +59,7 @@ impl ManifestInfo {
FrameworkVersion::Unknown
}
}
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq)]
pub enum FrameworkVersion {
#[default]
Unknown,
Expand All @@ -73,7 +74,7 @@ impl fmt::Display for FrameworkVersion {
}
}

#[derive(Clone, Debug, clap::ValueEnum)]
#[derive(Clone, Debug, clap::ValueEnum, PartialEq)]
pub enum BundleContent {
Unknown,
StateSnapshot,
Expand Down Expand Up @@ -116,7 +117,6 @@ pub fn scan_dir_archive(
.context("no parent dir found")?
.to_owned();
let contents = test_content(&manifest_path);

let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned();
let mut m = ManifestInfo {
archive_dir: dir.clone(),
Expand Down
50 changes: 29 additions & 21 deletions src/schema_account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@ pub struct WarehouseAccState {
pub time: WarehouseTime,
pub sequence_num: u64,
pub balance: u64,
pub slow_wallet_locked: u64,
pub slow_wallet_unlocked: u64,
pub slow_wallet_transferred: u64,
pub donor_voice_acc: bool,
pub miner_height: Option<u64>,
}

impl Default for WarehouseAccState {
fn default() -> Self {
Self {
address: AccountAddress::ZERO,
time: Default::default(),
sequence_num: Default::default(),
balance: Default::default(),
slow_wallet_locked: Default::default(),
slow_wallet_transferred: Default::default(),
sequence_num: 0,
balance: 0,
slow_wallet_unlocked: 0,
slow_wallet_transferred: 0,
donor_voice_acc: false,
miner_height: None,
time: WarehouseTime::default(),
}
}
}
Expand All @@ -40,12 +42,7 @@ impl WarehouseAccState {
pub fn new(address: AccountAddress) -> Self {
Self {
address,
sequence_num: 0,
time: WarehouseTime::default(),
balance: 0,
slow_wallet_locked: 0,
slow_wallet_transferred: 0,
donor_voice_acc: false,
..Default::default()
}
}
pub fn set_time(&mut self, timestamp: u64, version: u64, epoch: u64) {
Expand All @@ -60,16 +57,17 @@ impl WarehouseAccState {
/// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000
pub fn to_cypher_object_template(&self) -> String {
format!(
r#"{{address: "{}", balance: {}, version: {}, epoch: {}, sequence_num: {}, slow_locked: {}, slow_transfer: {}, framework_version: "{}", donor_voice: {} }}"#,
r#"{{address: "{}", balance: {}, version: {}, epoch: {},sequence_num: {}, slow_unlocked: {}, slow_transfer: {}, framework_version: "{}", donor_voice: {}, miner_height: {}}}"#,
self.address.to_hex_literal(),
self.balance,
self.time.version,
self.time.epoch,
self.sequence_num,
self.slow_wallet_locked,
self.slow_wallet_unlocked,
self.slow_wallet_transferred,
self.time.framework_version,
self.donor_voice_acc,
self.miner_height.unwrap_or(0)
)
}

Expand All @@ -88,15 +86,25 @@ impl WarehouseAccState {
pub fn cypher_batch_insert_str(list_str: &str) -> String {
format!(
r#"
WITH {list_str} AS tx_data
UNWIND tx_data AS tx
WITH {list_str} AS tx_data
UNWIND tx_data AS tx
MERGE (addr:Account {{address: tx.address}})
MERGE (snap:Snapshot {{address: tx.address, balance: tx.balance, framework_version: tx.framework_version, epoch: tx.epoch, version: tx.version, sequence_num: tx.sequence_num, slow_locked: tx.slow_locked, slow_transfer: tx.slow_transfer, donor_voice: tx.donor_voice }})
MERGE (addr)-[rel:State {{version: tx.version}} ]->(snap)
MERGE (addr:Account {{address: tx.address}})
MERGE (snap:Snapshot {{
address: tx.address,
balance: tx.balance,
epoch: tx.epoch,
framework_version: tx.framework_version,
version: tx.version,
sequence_num: tx.sequence_num,
slow_unlocked: tx.slow_unlocked,
slow_transfer: tx.slow_transfer,
donor_voice: tx.donor_voice,
miner_height: coalesce(tx.miner_height, 0)
}})
MERGE (addr)-[rel:State {{version: tx.version}}]->(snap)
RETURN
COUNT(snap) AS merged_snapshots
RETURN COUNT(snap) AS merged_snapshots
"#
)
Expand Down
33 changes: 33 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use chrono::{DateTime, Utc};
use diem_types::account_address::AccountAddress;
use log::error;
use serde::{Deserialize, Deserializer};

/// Helper function to parse "YYYY-MM-DD" into `DateTime<Utc>`
pub fn parse_date(date_str: &str) -> DateTime<Utc> {
let datetime_str = format!("{date_str}T00:00:00Z"); // Append time and UTC offset
DateTime::parse_from_rfc3339(&datetime_str)
.expect("Invalid date format; expected YYYY-MM-DD")
.with_timezone(&Utc)
}

pub fn de_address_from_any_string<'de, D>(
deserializer: D,
) -> Result<Option<AccountAddress>, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
// do better hex decoding than this
let mut lower = s.to_ascii_lowercase();
if !lower.contains("0x") {
lower = format!("0x{}", lower);
}
match AccountAddress::from_hex_literal(&lower) {
Ok(addr) => Ok(Some(addr)),
Err(_) => {
error!("could not parse address: {}", &s);
Ok(None)
}
}
}
Loading

0 comments on commit 5877022

Please sign in to comment.