Skip to content

Commit

Permalink
unzip archives on the fly, and patch know manifest errors in archive
Browse files Browse the repository at this point in the history
  • Loading branch information
Beauregard MacDiminuendo committed Jan 16, 2025
1 parent 82e23ca commit 1793079
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 72 deletions.
3 changes: 0 additions & 3 deletions src/analytics/enrich_account_funding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ use crate::schema_exchange_orders::{ExchangeOrder, OrderType};
#[cfg(test)]
use crate::date_util::parse_date;

#[cfg(test)]
use crate::date_util::parse_date;

#[derive(Default, Debug, Clone, Deserialize, Serialize)]
pub struct AccountDataAlt {
pub current_balance: f64,
Expand Down
51 changes: 0 additions & 51 deletions src/extract_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,12 @@ use diem_crypto::HashValue;
use diem_types::account_config::{NewBlockEvent, WithdrawEvent};
use diem_types::contract_event::ContractEvent;
use diem_types::{account_config::DepositEvent, transaction::SignedTransaction};
use glob::glob;
use libra_storage::read_tx_chunk::{load_chunk, load_tx_chunk_manifest};
use libra_types::move_resource::coin_register_event::CoinRegisterEvent;
use log::{error, info, warn};
use serde_json::json;
use std::path::Path;

// The manifest file might have written as .gz, when then should not be.
// TODO: Deprecate when archives sources fixed (currently some epochs in V7 broken for epochs in Jan 2025)
fn maybe_fix_manifest(archive_path: &Path) -> Result<()> {
let pattern = format!("{}/**/*.manifest", archive_path.display());
for f in glob(&pattern)? {
if let Some(f) = f.ok() {
let manifest = load_tx_chunk_manifest(&manifest_file)?;
manifest.chunks.iter_mut().map(|e| {
if e.proof.contains(".gz") {
e.proof = *e.proof.trim_end_matches(".gz")
}
});
let literal = serde_json::to_string(&manifest)?;
std::fs::write(manifest_file, literal.as_bytes());
warn!(
"rewriting .manifest file to remove .gz paths, {}",
archive_path.display()
)
}
}
Ok(())
}

/// If we are using this tool with .gz files, we will unzip on the fly
/// If the user prefers to not do on the fly, then they need to update
/// their workflow to `gunzip -r` before starting this.
pub fn maybe_handle_gz(archive_path: &Path) -> Result<(PathBuf, Option<TempPath>)> {
// maybe stuff isn't unzipped yet
let pattern = format!("{}/*.*.gz", archive_path.display());
if !glob(&pattern)?.is_empty() {
info!("Decompressing a temp folder. If you do not want to decompress files on the fly (which are not saved), then you workflow to do a `gunzip -r` before starting this.");
let (p, tp) = make_temp_unzipped(f, false);
maybe_fix_manifest(archive_path);
return Ok((p, Some(tp)));
}
// maybe the user unzipped the files

let pattern = format!("{}/**/*.proof", archive_path.display());
assert!(
!glob(&pattern)?.is_empty(),
"doesn't seem to be a decompressed archived"
);
// check if manifest file incorrectly has the .gz handle fix that.
// try to load it
let manifest = load_tx_chunk_manifest(&manifest_file)?;
maybe_fix_manifest(archive_path);

Ok((archive_path, None))
}

pub async fn extract_current_transactions(
archive_path: &Path,
) -> Result<(Vec<WarehouseTxMaster>, Vec<WarehouseEvent>)> {
Expand Down
4 changes: 2 additions & 2 deletions src/json_rescue_v5_extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use diem_temppath::TempPath;
use diem_types::account_address::AccountAddress;
use log::{error, trace, warn};
use std::path::{Path, PathBuf};

/// The canonical transaction archives for V5 were kept in a different format as in v6 and v7.
/// As of Nov 2024, there's a project to recover the V5 transaction archives to be in the same bytecode flat file format as v6 and v7.
/// Until then, we must parse the json files.
pub fn extract_v5_json_rescue(
one_json_file: &Path,
) -> Result<(Vec<WarehouseTxMaster>, Vec<WarehouseEvent>, Vec<String>)> {
Expand All @@ -45,7 +45,7 @@ pub fn extract_v5_json_rescue(
wtxs.sender = cast_legacy_account(sender)?;

// must cast from V5 Hashvalue buffer layout
wtxs.tx_hash = HashValue::from_slice(&t.hash.to_vec())?;
wtxs.tx_hash = HashValue::from_slice(t.hash.to_vec())?;

wtxs.function = make_function_name(script);
trace!("function: {}", &wtxs.function);
Expand Down
13 changes: 9 additions & 4 deletions src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
load_tx_cypher,
queue::{self, clear_queue, push_queue_from_archive_map},
scan::{ArchiveMap, ManifestInfo},
unzip_temp,
};

use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -69,6 +70,9 @@ pub async fn try_load_one_archive(
pool: &Graph,
batch_size: usize,
) -> Result<BatchTxReturn> {
info!("checking if we need to decompress");
let (archive_path, temp) = unzip_temp::maybe_handle_gz(&man.archive_dir)?;

let mut all_results = BatchTxReturn::new();
match man.contents {
crate::scan::BundleContent::Unknown => todo!(),
Expand All @@ -78,23 +82,24 @@ pub async fn try_load_one_archive(
error!("no framework version detected");
bail!("could not load archive from manifest");
}
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?,
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&archive_path).await?,
crate::scan::FrameworkVersion::V6 => {
extract_current_snapshot(&man.archive_dir).await?
extract_current_snapshot(&archive_path).await?
}
crate::scan::FrameworkVersion::V7 => {
extract_current_snapshot(&man.archive_dir).await?
extract_current_snapshot(&archive_path).await?
}
};
snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?;
}
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&man.archive_dir).await?;
let (txs, _) = extract_current_transactions(&archive_path).await?;
let batch_res =
load_tx_cypher::tx_batch(&txs, pool, batch_size, &man.archive_id).await?;
all_results.increment(&batch_res);
}
crate::scan::BundleContent::EpochEnding => todo!(),
}
drop(temp);
Ok(all_results)
}
4 changes: 1 addition & 3 deletions src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,14 @@ pub fn scan_dir_archive(
content_opt: Option<BundleContent>,
) -> Result<ArchiveMap> {
let path = parent_dir.canonicalize()?;
// filenames may be in .gz format
let filename = content_opt.unwrap_or(BundleContent::Unknown).filename();
dbg!(&filename);
let pattern = format!(
"{}/**/{}",
path.to_str().context("cannot parse starting dir")?,
filename,
);

dbg!(&pattern);

let mut archive = BTreeMap::new();

for entry in glob(&pattern)? {
Expand Down
61 changes: 57 additions & 4 deletions src/unzip_temp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use anyhow::{Context, Result};
use diem_temppath::TempPath;
use flate2::read::GzDecoder;
use glob::glob;
use libra_storage::read_tx_chunk::load_tx_chunk_manifest;
use log::{info, warn};
use std::{
fs::File,
io::copy,
Expand All @@ -14,7 +16,10 @@ use tar::Archive;
// take a single archive file, and get the temp location of the unzipped file
// NOTE: you must return the TempPath to the caller so otherwise when it
// drops out of scope the files will be deleted, this is intentional.
pub fn make_temp_unzipped(archive_file: &Path, tar_opt: bool) -> Result<(PathBuf, TempPath)> {
pub fn test_helper_temp_unzipped(
archive_file: &Path,
tar_opt: bool,
) -> Result<(PathBuf, TempPath)> {
let temp_dir = TempPath::new();
temp_dir.create_as_dir()?;

Expand Down Expand Up @@ -78,8 +83,9 @@ pub fn decompress_tar_archive(src_path: &Path, dst_dir: &Path) -> Result<()> {
}

/// Unzip all .gz files into the same directory
/// Warning: this will take up a lot of disk space, should not be used in production
pub fn decompress_all_gz(parent_dir: &Path) -> Result<()> {
/// Warning: this will take up a lot of disk space, should not be used in production for all files. Use for on the fly decompression.
/// NOTE: Not for tarballs
pub fn decompress_all_gz(parent_dir: &Path, dst_dir: &Path) -> Result<()> {
let path = parent_dir.canonicalize()?;

let pattern = format!(
Expand All @@ -90,7 +96,7 @@ pub fn decompress_all_gz(parent_dir: &Path) -> Result<()> {
for entry in glob(&pattern)? {
match entry {
Ok(src_path) => {
let _ = decompress_file(&src_path, src_path.parent().unwrap(), false);
let _ = decompress_file(&src_path, dst_dir, false);
}
Err(e) => {
println!("{:?}", e);
Expand All @@ -99,3 +105,50 @@ pub fn decompress_all_gz(parent_dir: &Path) -> Result<()> {
}
Ok(())
}

// The manifest file might have written as .gz, when then should not be.
// TODO: Deprecate when archives sources fixed (currently some epochs in V7 broken for epochs in Jan 2025)
fn maybe_fix_manifest(archive_path: &Path) -> Result<()> {
let pattern = format!("{}/**/*.manifest", archive_path.display());
for manifest_path in glob(&pattern)?.flatten() {
let mut manifest = load_tx_chunk_manifest(&manifest_path)?;
manifest.chunks.iter_mut().for_each(|e| {
if e.proof.contains(".gz") {
e.proof = e.proof.trim_end_matches(".gz").to_string();
}
});
let literal = serde_json::to_string(&manifest)?;
std::fs::write(manifest_path, literal.as_bytes())?;
warn!(
"rewriting .manifest file to remove .gz paths, {}",
archive_path.display()
)
}
Ok(())
}

/// If we are using this tool with .gz files, we will unzip on the fly
/// If the user prefers to not do on the fly, then they need to update
/// their workflow to `gunzip -r` before starting this.
pub fn maybe_handle_gz(archive_path: &Path) -> Result<(PathBuf, Option<TempPath>)> {
// maybe stuff isn't unzipped yet
let pattern = format!("{}/*.*.gz", archive_path.display());
if glob(&pattern)?.count() > 0 {
info!("Decompressing a temp folder. If you do not want to decompress files on the fly (which are not saved), then you workflow to do a `gunzip -r` before starting this.");
let temp_dir = TempPath::new();
temp_dir.create_as_dir()?;
decompress_all_gz(archive_path, temp_dir.path())?;
maybe_fix_manifest(archive_path)?;
return Ok((temp_dir.path().to_path_buf(), Some(temp_dir)));
}
// maybe the user unzipped the files

let pattern = format!("{}/*.chunk", archive_path.display());
assert!(
glob(&pattern)?.count() > 0,
"are you sure you decompressed everything here?"
);
maybe_fix_manifest(archive_path)?;

Ok((archive_path.to_path_buf(), None))
}
5 changes: 3 additions & 2 deletions src/warehouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ pub struct WarehouseCli {
#[clap(long, short('r'))]
/// URI of graphDB e.g. neo4j+s://localhost:port
db_uri: Option<String>,
#[clap(long, short('u'))]

#[clap(long, short('u'))]
/// username of db
db_username: Option<String>,
#[clap(long, short('p'))]
/// db password
db_password: Option<String>,

#[clap(long, short('q'))]
/// force clear queue
clear_queue: bool,
Expand Down Expand Up @@ -125,8 +126,8 @@ pub enum AnalyticsSub {
/// slow search producing likely candidates at each day
/// requires top n # for length of initial list to scan
replay_balances: Option<u64>,
#[clap(long)]

#[clap(long)]
/// get perfect deposit matches on dump cases, requires tolerance value of 1.0 or more
match_simple_dumps: Option<f64>,
#[clap(long)]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_scan_dirs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod support;

use anyhow::Result;
use libra_forensic_db::{scan::scan_dir_archive, unzip_temp::make_temp_unzipped};
use libra_forensic_db::{scan::scan_dir_archive, unzip_temp::test_helper_temp_unzipped};
use support::fixtures;

#[test]
Expand Down Expand Up @@ -40,7 +40,7 @@ fn test_scan_dir_for_compressed_v7_manifests() -> Result<()> {
assert!(archives.0.iter().len() == 0);

// This time the scan should find readable files
let (_, unzipped_dir) = make_temp_unzipped(&start_here, false)?;
let (_, unzipped_dir) = test_helper_temp_unzipped(&start_here, false)?;

let archives = scan_dir_archive(unzipped_dir.path(), None)?;
assert!(archives.0.iter().len() > 0);
Expand Down
3 changes: 2 additions & 1 deletion tests/test_unzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use libra_forensic_db::unzip_temp;
#[test]
fn test_unzip() {
let archive_path = support::fixtures::v7_tx_manifest_fixtures_path();
let (_, temp_unzipped_dir) = unzip_temp::make_temp_unzipped(&archive_path, false).unwrap();
let (_, temp_unzipped_dir) =
unzip_temp::test_helper_temp_unzipped(&archive_path, false).unwrap();

assert!(temp_unzipped_dir.path().exists());
assert!(temp_unzipped_dir
Expand Down

0 comments on commit 1793079

Please sign in to comment.