Skip to content

Commit

Permalink
allow ingest-all to select types of content to ingest e.g. transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 8, 2024
1 parent 348183a commit 67cb275
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 16 deletions.
2 changes: 1 addition & 1 deletion warehouse/src/restaurant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sqlx::PgPool;
/// ingest all the archives sequentially.
/// not very good, and made for the lazy
pub async fn sushi_train(parent_dir: &Path, pool: &PgPool) -> Result<u64> {
let s = scan_dir_archive(parent_dir)?;
let s = scan_dir_archive(parent_dir, None)?;
let mut archives_processed = 0u64;
for (p, m) in s.0.iter() {
match m.contents {
Expand Down
22 changes: 17 additions & 5 deletions warehouse/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,33 @@ pub enum EncodingVersion {
V7,
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, clap::ValueEnum)]
pub enum BundleContent {
Unknown,
StateSnapshot,
Transaction,
EpochEnding,
}
impl BundleContent {
pub fn filename(&self) -> String {
match self {
BundleContent::Unknown => "*.manifest".to_string(),
BundleContent::StateSnapshot => "state.manifest".to_string(),
BundleContent::Transaction => "transaction.manifest".to_string(),
BundleContent::EpochEnding => "epoch_ending.manifest".to_string(),
}
}
}

/// Crawl a directory and find all .manifest files.
pub fn scan_dir_archive(parent_dir: &Path) -> Result<ArchiveMap> {
/// Optionally find
pub fn scan_dir_archive(parent_dir: &Path, content_opt: Option<BundleContent>) -> Result<ArchiveMap> {
let path = parent_dir.canonicalize()?;

let filename = content_opt.unwrap_or(BundleContent::Unknown).filename();
let pattern = format!(
"{}/**/*.manifest",
path.to_str().context("cannot parse starting dir")?
"{}/**/{}",
path.to_str().context("cannot parse starting dir")?,
filename,
);

let mut archive = BTreeMap::new();
Expand Down
13 changes: 8 additions & 5 deletions warehouse/src/warehouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::path::PathBuf;
use crate::{
load_entrypoint::{ingest_all, try_load_one_archive},
neo4j_init::{get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV},
scan::scan_dir_archive,
scan::{scan_dir_archive, BundleContent},
};

#[derive(Parser)]
Expand All @@ -33,6 +33,9 @@ pub enum Sub {
IngestAll {
#[clap(long, short('d'))]
start_path: PathBuf,
#[clap(long, short('c'))]
archive_content: Option<BundleContent>,

},
/// process and load a single archive
LoadOne {
Expand All @@ -49,13 +52,13 @@ pub enum Sub {
impl WarehouseCli {
pub async fn run(&self) -> anyhow::Result<()> {
match &self.command {
Sub::IngestAll { start_path } => {
let map = scan_dir_archive(start_path)?;
Sub::IngestAll { start_path, archive_content } => {
let map = scan_dir_archive(start_path, archive_content.to_owned())?;
let pool = try_db_connection_pool(self).await?;

ingest_all(&map, &pool).await?;
}
Sub::LoadOne { archive_dir } => match scan_dir_archive(archive_dir)?.0.get(archive_dir)
Sub::LoadOne { archive_dir } => match scan_dir_archive(archive_dir, None)?.0.get(archive_dir)
{
Some(man) => {
let pool = try_db_connection_pool(self).await?;
Expand All @@ -68,7 +71,7 @@ impl WarehouseCli {
));
}
},
Sub::Check { archive_dir } => match scan_dir_archive(archive_dir)?.0.get(archive_dir) {
Sub::Check { archive_dir } => match scan_dir_archive(archive_dir, None)?.0.get(archive_dir) {
Some(_) => todo!(),
None => {
bail!(format!(
Expand Down
2 changes: 1 addition & 1 deletion warehouse/tests/test_e2e_tx_neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn test_parse_archive_into_neo4j() -> anyhow::Result<()> {
#[tokio::test]
async fn test_load_entry_point_tx() -> anyhow::Result<()> {
let archive_path = support::fixtures::v6_tx_manifest_fixtures_path();
let archive = scan_dir_archive(&archive_path)?;
let archive = scan_dir_archive(&archive_path, None)?;
let (_, man) = archive.0.first_key_value().unwrap();

let c = start_neo4j_container();
Expand Down
8 changes: 4 additions & 4 deletions warehouse/tests/test_scan_dirs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn v7_fixtures_gzipped() -> PathBuf {
fn test_scan_dir_for_v5_manifests() -> Result<()> {
let start_here = v5_fixtures_path();

let s = scan_dir_archive(&start_here)?;
let s = scan_dir_archive(&start_here, None)?;

assert!(s.0.len() == 2);
Ok(())
Expand All @@ -35,7 +35,7 @@ fn test_scan_dir_for_v5_manifests() -> Result<()> {
fn test_scan_dir_for_v7_manifests() -> Result<()> {
let start_here = v7_fixtures_path();

let s = scan_dir_archive(&start_here)?;
let s = scan_dir_archive(&start_here, None)?;

let archives = s.0;
assert!(archives.len() == 3);
Expand All @@ -47,15 +47,15 @@ fn test_scan_dir_for_v7_manifests() -> Result<()> {
fn test_scan_dir_for_compressed_v7_manifests() -> Result<()> {
let start_here = v7_fixtures_gzipped();

let archives = scan_dir_archive(&start_here)?;
let archives = scan_dir_archive(&start_here, None)?;

// a normal scan should find no files.
assert!(archives.0.iter().len() == 0);

// This time the scan should find readable files
let unzipped_dir = make_temp_unzipped(&start_here, true)?;

let archives = scan_dir_archive(&unzipped_dir)?;
let archives = scan_dir_archive(&unzipped_dir, None)?;
assert!(archives.0.iter().len() > 0);

Ok(())
Expand Down

0 comments on commit 67cb275

Please sign in to comment.