diff --git a/src/json_rescue_v5_load.rs b/src/json_rescue_v5_load.rs index 998cec7..11b7943 100644 --- a/src/json_rescue_v5_load.rs +++ b/src/json_rescue_v5_load.rs @@ -6,7 +6,7 @@ use crate::{ queue::{self}, }; use anyhow::Result; -use log::{error, info, warn}; +use log::{error, info, trace, warn}; use neo4rs::Graph; use std::sync::Arc; use std::{path::Path, thread::available_parallelism}; @@ -32,7 +32,7 @@ pub async fn single_thread_decompress_extract(tgz_file: &Path, pool: &Graph) -> let archive_id = j.file_name().unwrap().to_str().unwrap(); let complete = queue::are_all_completed(pool, archive_id).await?; if complete { - info!("skip parsing, this file was loaded successfully"); + trace!("skip parsing, this file was loaded successfully"); continue; } @@ -61,7 +61,7 @@ pub async fn rip_concurrent_limited( start_dir: &Path, pool: &Graph, threads: Option, -) -> Result<()> { +) -> Result { let threads = threads.unwrap_or(available_parallelism().unwrap().get()); info!("concurrent threads used: {}", threads); @@ -90,8 +90,9 @@ pub async fn rip_concurrent_limited( for (i, result) in results.into_iter().enumerate() { match result { - Ok(Ok(_)) => { + Ok(Ok(n)) => { info!("Task {} completed successfully.", i); + return Ok(n); } Ok(Err(e)) => { error!("Task {} failed: {:?}", i, e); @@ -102,5 +103,5 @@ pub async fn rip_concurrent_limited( } } - Ok(()) + Ok(0) } diff --git a/src/load_tx_cypher.rs b/src/load_tx_cypher.rs index f039f14..1906381 100644 --- a/src/load_tx_cypher.rs +++ b/src/load_tx_cypher.rs @@ -16,9 +16,15 @@ pub async fn tx_batch( batch_size: usize, archive_id: &str, ) -> Result { + info!("archive: {}", archive_id); + + if txs.is_empty() { + // mark as complete so we don't retry + queue::update_task(pool, archive_id, true, 0).await?; + } + let chunks: Vec<&[WarehouseTxMaster]> = txs.chunks(batch_size).collect(); let mut all_results = BatchTxReturn::new(); - info!("archive: {}", archive_id); for (i, c) in chunks.into_iter().enumerate() { info!("batch #{}", i); diff --git a/tests/test_json_rescue_v5_load.rs b/tests/test_json_rescue_v5_load.rs index b506734..cf42244 100644 --- a/tests/test_json_rescue_v5_load.rs +++ b/tests/test_json_rescue_v5_load.rs @@ -45,9 +45,32 @@ async fn test_load_entrypoint() -> anyhow::Result<()> { let path = fixtures::v5_json_tx_path(); - json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; - // dbg!(&tx_count); - // assert!(tx_count == 13); + let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; + assert!(tx_count == 13); + + Ok(()) +} + +#[tokio::test] +async fn test_load_queue() -> anyhow::Result<()> { + libra_forensic_db::log_setup(); + + let c = start_neo4j_container(); + let port = c.get_host_port_ipv4(7687); + let pool = get_neo4j_localhost_pool(port) + .await + .expect("could not get neo4j connection pool"); + maybe_create_indexes(&pool) + .await + .expect("could start index"); + + let path = fixtures::v5_json_tx_path(); + + let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; + assert!(tx_count == 13); + + let tx_count = json_rescue_v5_load::rip_concurrent_limited(&path, &pool, None).await?; + assert!(tx_count == 0); Ok(()) }