Skip to content

Commit

Permalink
queue files with no records
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 28, 2024
1 parent 400b066 commit 28bf9d5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
11 changes: 6 additions & 5 deletions src/json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
queue::{self},
};
use anyhow::Result;
use log::{error, info, warn};
use log::{error, info, trace, warn};
use neo4rs::Graph;
use std::sync::Arc;
use std::{path::Path, thread::available_parallelism};
Expand All @@ -32,7 +32,7 @@ pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) ->
let archive_id = j.file_name().unwrap().to_str().unwrap();
let complete = queue::are_all_completed(pool, archive_id).await?;
if complete {
info!("skip parsing, this file was loaded successfully");
trace!("skip parsing, this file was loaded successfully");
continue;
}

Expand Down Expand Up @@ -61,7 +61,7 @@ pub async fn rip_concurrent_limited(
start_dir: &Path,
pool: &Graph,
threads: Option<usize>,
) -> Result<()> {
) -> Result<u64> {
let threads = threads.unwrap_or(available_parallelism().unwrap().get());
info!("concurrent threads used: {}", threads);

Expand Down Expand Up @@ -90,8 +90,9 @@ pub async fn rip_concurrent_limited(

for (i, result) in results.into_iter().enumerate() {
match result {
Ok(Ok(_)) => {
Ok(Ok(n)) => {
info!("Task {} completed successfully.", i);
return Ok(n);
}
Ok(Err(e)) => {
error!("Task {} failed: {:?}", i, e);
Expand All @@ -102,5 +103,5 @@ pub async fn rip_concurrent_limited(
}
}

Ok(())
Ok(0)
}
8 changes: 7 additions & 1 deletion src/load_tx_cypher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ pub async fn tx_batch(
batch_size: usize,
archive_id: &str,
) -> Result<BatchTxReturn> {
info!("archive: {}", archive_id);

if txs.is_empty() {
// mark as complete so we don't retry
queue::update_task(pool, archive_id, true, 0).await?;
}

let chunks: Vec<&[WarehouseTxMaster]> = txs.chunks(batch_size).collect();
let mut all_results = BatchTxReturn::new();
info!("archive: {}", archive_id);

for (i, c) in chunks.into_iter().enumerate() {
info!("batch #{}", i);
Expand Down
29 changes: 26 additions & 3 deletions tests/test_json_rescue_v5_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,32 @@ async fn test_load_entrypoint() -> anyhow::Result<()> {

let path = fixtures::v5_json_tx_path();

json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?;
// dbg!(&tx_count);
// assert!(tx_count == 13);
let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?;
assert!(tx_count == 13);

Ok(())
}

#[tokio::test]
async fn test_load_queue() -> anyhow::Result<()> {
libra_forensic_db::log_setup();

let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let pool = get_neo4j_localhost_pool(port)
.await
.expect("could not get neo4j connection pool");
maybe_create_indexes(&pool)
.await
.expect("could start index");

let path = fixtures::v5_json_tx_path();

let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?;
assert!(tx_count == 13);

let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?;
assert!(tx_count == 0);

Ok(())
}
Expand Down

0 comments on commit 28bf9d5

Please sign in to comment.