Skip to content

Commit

Permalink
[cli] extract fix gz state snapshots (#16)
Browse files Browse the repository at this point in the history
* info prints

* faster clearing of v5 queue

* patch, check queue before decompress

* fix indexes

* patch merge, missing tx.coin

* clean

* patch v5 coin amount decimal scaling

* patch batch test

* refactor manifest info to check for content type

* clean

* clippy

* update libra-framework

* patch test

---------

Co-authored-by: Beauregard Von Staccato <[email protected]>
Co-authored-by: Sere MacHart <[email protected]>
Co-authored-by: Peregrine Leveret <[email protected]>
Co-authored-by: Algernon Accelerando <[email protected]>
Co-authored-by: Isa Leveret <[email protected]>
Co-authored-by: Nella Accelerando <[email protected]>
Co-authored-by: Bea LeLeveret <[email protected]>
Co-authored-by: Lucietta O'Crescendo <lucietta_o'[email protected]>
Co-authored-by: xyz <xyz>
Co-authored-by: Archibald O'Accelerando <archibald_o'[email protected]>
Co-authored-by: Vale McPresto <[email protected]>
Co-authored-by: Nella Beaver <[email protected]>
Co-authored-by: Isa Presto <[email protected]>
  • Loading branch information
13 people authored Jan 30, 2025
1 parent db8a217 commit 35b6785
Show file tree
Hide file tree
Showing 19 changed files with 140 additions and 225 deletions.
1 change: 0 additions & 1 deletion src/analytics/enrich_rms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,5 +245,4 @@ fn test_rms_pipeline() {
assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0));

process_shill(&mut swaps);
// dbg!(&swaps);
}
77 changes: 0 additions & 77 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,78 +89,10 @@ pub async fn get_date_range_deposits_alt(
deposited,
};
top_deposits.push(d);
// dbg!(&d);
}
Ok(top_deposits)
}

// pub async fn get_date_range_deposits(
// pool: &Graph,
// top_n: u64,
// start: DateTime<Utc>,
// end: DateTime<Utc>,
// ) -> Result<Vec<Deposit>> {
// let mut top_deposits = vec![];

// let q = format!(
// // r#"
// // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit

// // // Step 1: Get the list of all depositors
// // MATCH (depositor:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WITH COLLECT(DISTINCT depositor) AS all_depositors, olswap_deposit, tx

// // // Step 2: Match depositors and amounts within the date range

// // UNWIND all_depositors AS depositor

// // OPTIONAL MATCH (depositor)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WHERE tx2.block_datetime >= datetime('{}') AND tx2.block_datetime <= datetime('{}')

// // WITH
// // depositor.address AS account,
// // COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount
// // RETURN account, toFloat(deposit_amount) as deposited
// // ORDER BY deposited DESC

// // "#,
// r#"
// WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit
// MATCH
// (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}})
// WHERE
// tx.`block_datetime` > datetime("{}")
// AND tx.`block_datetime` < datetime("{}")
// WITH
// DISTINCT(u),
// SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount
// ORDER BY totalTxAmount DESCENDING
// RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited

// "#,
// start.to_rfc3339(),
// end.to_rfc3339(),
// // top_n,
// );
// let cypher_query = neo4rs::query(&q);

// // Execute the query
// let mut result = pool.execute(cypher_query).await?;

// // Fetch the first row only
// while let Some(r) = result.next().await? {
// let account_str = r.get::<String>("account").unwrap_or("unknown".to_string());
// let deposited = r.get::<f64>("deposited").unwrap_or(0.0);
// let d = Deposit {
// account: account_str.parse().unwrap_or(AccountAddress::ZERO),
// deposited,
// };
// top_deposits.push(d);
// // dbg!(&d);
// }
// Ok(top_deposits)
// }

pub async fn get_exchange_users(
pool: &Graph,
top_n: u64,
Expand Down Expand Up @@ -194,7 +126,6 @@ pub async fn get_exchange_users(
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -226,7 +157,6 @@ pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result<Vec<MinFun
funded: funded as f64,
};
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -263,7 +193,6 @@ pub async fn get_one_exchange_user(
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -311,11 +240,6 @@ impl Matching {
.map(|el| el.user_id)
.collect();

// dbg!(&ids);
// let user_ledger = funded.iter().find(|el| {
// // check if we have already identified it
// self.definite.0.get(el.user_id).none()
// });
Ok((*ids.first().unwrap(), *ids.get(1).unwrap()))
}

Expand Down Expand Up @@ -460,7 +384,6 @@ impl Matching {

let mut eval: Vec<AccountAddress> = vec![];
deposits.iter().for_each(|el| {
// dbg!(&el);
if el.deposited >= user.funded &&
// must not already have been tagged impossible
!pending.impossible.contains(&el.account) &&
Expand Down
1 change: 0 additions & 1 deletion src/cypher_templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ use serde_json::Value;
pub fn to_cypher_object<T: Serialize>(object: &T) -> Result<String> {
// Serialize the struct to a JSON value
let serialized_value = serde_json::to_value(object).expect("Failed to serialize");
// dbg!(&serialized_value);

let flattener = smooth_json::Flattener {
separator: "_",
Expand Down
1 change: 0 additions & 1 deletion src/enrich_exchange_onboarding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOnRamp]) ->
// cypher queries makes it annoying to do a single insert of users and
// txs
let cypher_string = ExchangeOnRamp::cypher_batch_link_owner(&list_str);
// dbg!(&cypher_string);

// Execute the query
let cypher_query = neo4rs::query(&cypher_string);
Expand Down
28 changes: 14 additions & 14 deletions src/extract_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ pub async fn extract_current_transactions(

// first increment the block metadata. This assumes the vector is sequential.
if let Some(block) = tx.try_as_block_metadata() {
// check the epochs are incrementing or not
if epoch > block.epoch()
&& round > block.round()
&& timestamp > block.timestamp_usecs()
{
dbg!(
epoch,
block.epoch(),
round,
block.round(),
timestamp,
block.timestamp_usecs()
);
}
// // check the epochs are incrementing or not
// if epoch > block.epoch()
// && round > block.round()
// && timestamp > block.timestamp_usecs()
// {
// dbg!(
// epoch,
// block.epoch(),
// round,
// block.round(),
// timestamp,
// block.timestamp_usecs()
// );
// }

epoch = block.epoch();
round = block.round();
Expand Down
19 changes: 11 additions & 8 deletions src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ pub async fn ingest_all(
let pending = queue::get_queued(pool).await?;
info!("pending archives: {}", pending.len());

// This manifest may be for a .gz file, we should handle here as well
for (_p, m) in archive_map.0.iter() {
info!("checking if we need to decompress");
let (new_unzip_path, temp) = unzip_temp::maybe_handle_gz(&m.archive_dir)?;
let mut better_man = ManifestInfo::new(&new_unzip_path);
better_man.set_info()?;

println!(
"\nProcessing: {:?} with archive: {}",
m.contents,
Expand All @@ -60,6 +66,7 @@ pub async fn ingest_all(
m.archive_dir.display()
);
}
drop(temp);
}

Ok(())
Expand All @@ -70,9 +77,6 @@ pub async fn try_load_one_archive(
pool: &Graph,
batch_size: usize,
) -> Result<BatchTxReturn> {
info!("checking if we need to decompress");
let (archive_path, temp) = unzip_temp::maybe_handle_gz(&man.archive_dir)?;

let mut all_results = BatchTxReturn::new();
match man.contents {
crate::scan::BundleContent::Unknown => todo!(),
Expand All @@ -82,24 +86,23 @@ pub async fn try_load_one_archive(
error!("no framework version detected");
bail!("could not load archive from manifest");
}
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&archive_path).await?,
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?,
crate::scan::FrameworkVersion::V6 => {
extract_current_snapshot(&archive_path).await?
extract_current_snapshot(&man.archive_dir).await?
}
crate::scan::FrameworkVersion::V7 => {
extract_current_snapshot(&archive_path).await?
extract_current_snapshot(&man.archive_dir).await?
}
};
snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?;
}
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&archive_path, &man.version).await?;
let (txs, _) = extract_current_transactions(&man.archive_dir, &man.version).await?;
let batch_res =
load_tx_cypher::tx_batch(&txs, pool, batch_size, &man.archive_id).await?;
all_results.increment(&batch_res);
}
crate::scan::BundleContent::EpochEnding => todo!(),
}
drop(temp);
Ok(all_results)
}
2 changes: 0 additions & 2 deletions src/load_account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ pub async fn snapshot_batch(

match impl_batch_snapshot_insert(pool, c).await {
Ok(batch) => {
// dbg!(&batch);
all_results.increment(&batch);
// dbg!(&all_results);
queue::update_task(pool, archive_id, true, i).await?;
info!("...success");
}
Expand Down
115 changes: 75 additions & 40 deletions src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,44 @@ pub struct ManifestInfo {
}

impl ManifestInfo {
pub fn new(archive_dir: &Path) -> Self {
let archive_id = archive_dir
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_owned();
ManifestInfo {
archive_dir: archive_dir.to_path_buf(),
archive_id,
version: FrameworkVersion::Unknown,
contents: BundleContent::Unknown,
processed: false,
}
}

pub fn set_info(&mut self) -> Result<()> {
self.set_contents()?;
self.try_set_framework_version();
Ok(())
}

/// find out the type of content in the manifest
pub fn set_contents(&mut self) -> Result<()> {
// filenames may be in .gz format
let pattern = format!(
"{}/*.manifest*", // also try .gz
self.archive_dir
.to_str()
.context("cannot parse starting dir")?
);

if let Some(man_file) = glob(&pattern)?.flatten().next() {
self.contents = BundleContent::new_from_man_file(&man_file);
}
Ok(())
}

pub fn try_set_framework_version(&mut self) -> FrameworkVersion {
match self.contents {
BundleContent::Unknown => return FrameworkVersion::Unknown,
Expand All @@ -41,9 +79,8 @@ impl ManifestInfo {
// first check if the v7 manifest will parse
if let Ok(_bak) = load_snapshot_manifest(&man_path) {
self.version = FrameworkVersion::V7;
};

if v5_read_from_snapshot_manifest(&self.archive_dir.join("state.manifest")).is_ok()
} else if v5_read_from_snapshot_manifest(&self.archive_dir.join("state.manifest"))
.is_ok()
{
self.version = FrameworkVersion::V5;
}
Expand Down Expand Up @@ -83,6 +120,19 @@ pub enum BundleContent {
EpochEnding,
}
impl BundleContent {
pub fn new_from_man_file(man_file: &Path) -> Self {
let s = man_file.to_str().expect("invalid path");
if s.contains("transaction.manifest") {
return BundleContent::Transaction;
};
if s.contains("epoch_ending.manifest") {
return BundleContent::EpochEnding;
};
if s.contains("state.manifest") {
return BundleContent::StateSnapshot;
};
BundleContent::Unknown
}
pub fn filename(&self) -> String {
match self {
BundleContent::Unknown => "*.manifest".to_string(),
Expand Down Expand Up @@ -110,44 +160,29 @@ pub fn scan_dir_archive(

let mut archive = BTreeMap::new();

for entry in glob(&pattern)? {
match entry {
Ok(manifest_path) => {
let dir = manifest_path
.parent()
.context("no parent dir found")?
.to_owned();
let contents = test_content(&manifest_path);
let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned();
let mut m = ManifestInfo {
archive_dir: dir.clone(),
archive_id,
version: FrameworkVersion::Unknown,
contents,
processed: false,
};
m.try_set_framework_version();

archive.insert(manifest_path.clone(), m);
}
Err(e) => println!("{:?}", e),
}
for manifest_path in glob(&pattern)?.flatten() {
let archive_dir = manifest_path
.parent()
.expect("can't find manifest dir, weird");
let mut man = ManifestInfo::new(archive_dir);
man.set_info()?;
archive.insert(archive_dir.to_path_buf(), man);
}
Ok(ArchiveMap(archive))
}

/// find out the type of content in the manifest
fn test_content(manifest_path: &Path) -> BundleContent {
let s = manifest_path.to_str().expect("path invalid");
if s.contains("transaction.manifest") {
return BundleContent::Transaction;
};
if s.contains("epoch_ending.manifest") {
return BundleContent::EpochEnding;
};
if s.contains("state.manifest") {
return BundleContent::StateSnapshot;
};

BundleContent::Unknown
}
// /// find out the type of content in the manifest
// fn test_content(manifest_path: &Path) -> BundleContent {
// let s = manifest_path.to_str().expect("path invalid");
// if s.contains("transaction.manifest") {
// return BundleContent::Transaction;
// };
// if s.contains("epoch_ending.manifest") {
// return BundleContent::EpochEnding;
// };
// if s.contains("state.manifest") {
// return BundleContent::StateSnapshot;
// };

// BundleContent::Unknown
// }
Loading

0 comments on commit 35b6785

Please sign in to comment.