Skip to content

Commit

Permalink
add simple task queue for loading
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 18, 2024
1 parent 7e6b832 commit 4c29675
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 29 deletions.
7 changes: 1 addition & 6 deletions warehouse/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
// pub mod age_init;
pub mod cypher_templates;
pub mod extract_snapshot;
pub mod extract_transactions;
// pub mod load_account;
// pub mod load_coin;
pub mod load;
pub mod load_supporting_data;
pub mod load_tx_cypher;
// pub mod migrate;
pub mod neo4j_init;
// pub mod query_balance;
// pub mod restaurant;
pub mod queue;
pub mod scan;
pub mod supporting_data;
pub mod table_structs;
Expand Down
2 changes: 1 addition & 1 deletion warehouse/src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn try_load_one_archive(man: &ManifestInfo, pool: &Graph) -> Result<Ba
crate::scan::BundleContent::StateSnapshot => todo!(),
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&man.archive_dir).await?;
let batch_res = load_tx_cypher::tx_batch(&txs, pool, 100).await?;
let batch_res = load_tx_cypher::tx_batch(&txs, pool, 1000, &man.archive_id).await?;
all_results.increment(&batch_res);
// TODO: make debug log
// println!("transactions updated: {}, ignored: {}", merged, ignored);
Expand Down
28 changes: 26 additions & 2 deletions warehouse/src/load_supporting_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use anyhow::{Context, Result};
use log::info;
use neo4rs::{query, Graph};

use crate::supporting_data::{read_orders_from_file, SwapOrder};
use crate::{
queue,
supporting_data::{read_orders_from_file, SwapOrder},
};

pub async fn swap_batch(
txs: &[SwapOrder],
Expand All @@ -16,16 +19,37 @@ pub async fn swap_batch(
let mut merged_count = 0u64;
let mut ignored_count = 0u64;

let archive_id = "swap_orders";
info!("archive: {}", archive_id);

for (i, c) in chunks.iter().enumerate() {
if let Some(skip) = skip_to_batch {
if i < skip {
continue;
};
};

info!("batch #{}", i);

match queue::is_complete(pool, archive_id, i).await {
Ok(Some(true)) => {
info!("...skipping, already loaded.");
// skip this one
continue;
}
Ok(Some(false)) => {
// keep going
info!("...loading to db");
}
_ => {
info!("...not found in queue, adding to queue.");

// no task found in db, add to queue
queue::update_task(pool, archive_id, false, i).await?;
}
}

let (m, ig) = impl_batch_tx_insert(pool, c).await?;
info!("...success");
info!("merged {}", m);
info!("ignored {}", ig);

Expand Down
30 changes: 28 additions & 2 deletions warehouse/src/load_tx_cypher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::{Context, Result};
use log::info;
use neo4rs::{query, Graph};
use std::fmt::Display;

use crate::{cypher_templates::write_batch_tx_string, table_structs::WarehouseTxMaster};
use crate::{cypher_templates::write_batch_tx_string, queue, table_structs::WarehouseTxMaster};

/// response for the batch insert tx
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -51,13 +52,38 @@ pub async fn tx_batch(
txs: &[WarehouseTxMaster],
pool: &Graph,
batch_len: usize,
archive_id: &str,
) -> Result<BatchTxReturn> {
let chunks: Vec<&[WarehouseTxMaster]> = txs.chunks(batch_len).collect();
let mut all_results = BatchTxReturn::new();
info!("archive: {}", archive_id);

for (i, c) in chunks.into_iter().enumerate() {
info!("batch #{}", i);
// check if this is already completed, or should be inserted.
match queue::is_complete(pool, archive_id, i).await {
Ok(Some(true)) => {
info!("...skipping, already loaded.");
// skip this one
continue;
}
Ok(Some(false)) => {
// keep going
info!("...loading to db");
}
_ => {
info!("...not found in queue, adding to queue.");

// no task found in db, add to queue
queue::update_task(pool, archive_id, false, i).await?;
}
}

for c in chunks {
let batch = impl_batch_tx_insert(pool, c).await?;

all_results.increment(&batch);
queue::update_task(pool, archive_id, true, i).await?;
info!("...success");
}

Ok(all_results)
Expand Down
96 changes: 96 additions & 0 deletions warehouse/src/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use anyhow::{bail, Context, Result};
use neo4rs::Graph;

pub async fn update_task(
pool: &Graph,
archive_id: &str,
completed: bool,
batch: usize,
) -> Result<String> {
let cypher_string = format!(
r#"MERGE (a:Queue {{ archive_id: "{}", batch: {} }})
SET a.completed = {}
RETURN a.archive_id AS archive_id"#,
archive_id,
batch,
completed.to_string().to_lowercase(),
);

let cypher_query = neo4rs::query(&cypher_string);

let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;

let row = res.next().await?.context("no row returned")?;
let task_id: String = row.get("archive_id").context("no created_accounts field")?;
Ok(task_id)
}

pub async fn get_queued(pool: &Graph) -> Result<Vec<String>> {
let cypher_string = r#"
MATCH (a:Queue)
WHERE a.completed = false
RETURN DISTINCT a.archive_id
"#;

let cypher_query = neo4rs::query(cypher_string);

let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;

let mut archive_ids: Vec<String> = vec![];

while let Some(row) = res.next().await? {
// Extract `archive_id` as a String
if let Ok(archive_name) = row.get::<String>("a.archive_id") {
archive_ids.push(archive_name);
}
}

Ok(archive_ids)
}

// Three options: Not found in DB, found and complete, found and incomplete
pub async fn is_complete(pool: &Graph, archive_id: &str, batch: usize) -> Result<Option<bool>> {
let cypher_string = format!(
r#"
MATCH (a:Queue {{ archive_id: "{}", batch: {} }})
RETURN DISTINCT a.completed
"#,
archive_id, batch
);

let cypher_query = neo4rs::query(&cypher_string);

let mut res = pool
.execute(cypher_query)
.await
.context("execute query error")?;

if let Some(row) = res.next().await? {
// Extract `archive_id` as a String
Ok(row.get::<bool>("a.completed").ok())
} else {
bail!("not found")
}
}

// clear queue
pub async fn clear(pool: &Graph) -> Result<()> {
let cypher_string = r#"
MATCH (a:Queue)
DELETE a
"#.to_string();

let cypher_query = neo4rs::query(&cypher_string);

let mut _res = pool
.execute(cypher_query)
.await
.context("execute query error")?;
Ok(())
}
6 changes: 5 additions & 1 deletion warehouse/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ pub struct ArchiveMap(pub BTreeMap<PathBuf, ManifestInfo>);
#[derive(Clone, Debug)]

pub struct ManifestInfo {
/// the enclosing directory of the .manifest file
/// the enclosing directory of the local .manifest file
pub archive_dir: PathBuf,
/// the name of the directory, as a unique archive identifier
pub archive_id: String,
/// what libra version were these files encoded with (v5 etc)
pub version: EncodingVersion,
/// contents of the manifest
Expand Down Expand Up @@ -75,8 +77,10 @@ pub fn scan_dir_archive(
Ok(path) => {
let dir = path.parent().context("no parent dir found")?.to_owned();
let contents = test_content(&path);
let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned();
let m = ManifestInfo {
archive_dir: dir.clone(),
archive_id,
version: test_version(&contents, &path),
contents,
processed: false,
Expand Down
1 change: 0 additions & 1 deletion warehouse/tests/support/neo4j_testcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub fn start_neo4j_container<'a>() -> Container<'a, GenericImage> {
// .with_env_var("NEO4J_PLUGINS".to_owned(), r#"["graph-data-science"]"#);
.with_wait_for(WaitFor::message_on_stdout("Started."));


let image = RunnableImage::from(container);
// need to wrap the docker cli in a once_cell so that the borrow does not cause issues when container is passed along
let container = CLI.run(image);
Expand Down
3 changes: 2 additions & 1 deletion warehouse/tests/test_e2e_tx_neo4j.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ async fn test_parse_archive_into_neo4j() -> anyhow::Result<()> {
.expect("could start index");

// load in batches
let res = tx_batch(&txs, &graph, 100).await?;
let archive_id = archive_path.file_name().unwrap().to_str().unwrap();
let res = tx_batch(&txs, &graph, 100, archive_id).await?;
assert!(res.created_accounts == 118);
assert!(res.modified_accounts == 0);
assert!(res.unchanged_accounts == 0);
Expand Down
44 changes: 44 additions & 0 deletions warehouse/tests/test_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
mod support;

use anyhow::Result;
use libra_warehouse::{
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
queue,
scan::scan_dir_archive,
};

use support::{fixtures, neo4j_testcontainer::start_neo4j_container};

#[tokio::test]
async fn test_queue() -> Result<()> {
let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let pool = get_neo4j_localhost_pool(port)
.await
.expect("could not get neo4j connection pool");
maybe_create_indexes(&pool).await?;

let start_here = fixtures::v7_tx_manifest_fixtures_path();

let s = scan_dir_archive(&start_here, None)?;
let (_, man_info) = s.0.first_key_value().unwrap();
let batch = 0usize;

let id = queue::update_task(&pool, &man_info.archive_id, false, batch).await?;
assert!(id == man_info.archive_id);

let list = queue::get_queued(&pool).await?;

assert!(*"transaction_38100001-.541f" == list[0]);

let c = queue::is_complete(&pool, "transaction_38100001-.541f", batch).await;
assert!(!c?.unwrap());

// Now we update the task, with ID and batch
let _id = queue::update_task(&pool, &man_info.archive_id, true, batch).await?;

let c = queue::is_complete(&pool, "transaction_38100001-.541f", batch).await;
assert!(c?.unwrap());

Ok(())
}
20 changes: 5 additions & 15 deletions warehouse/tests/test_supporting_data.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
mod support;
use log::error;

use support::neo4j_testcontainer::start_neo4j_container;

use std::path::PathBuf;

use anyhow::Result;
use libra_warehouse::{
load_supporting_data, log_setup,
load_supporting_data,
neo4j_init::{get_neo4j_localhost_pool, maybe_create_indexes},
supporting_data::{read_orders_from_file, SwapOrder},
};
Expand Down Expand Up @@ -65,28 +65,19 @@ async fn test_swap_batch_cypher() -> Result<()> {

#[tokio::test]
async fn e2e_swap_data() -> Result<()> {
log_setup();
error!("start container");

let start = std::time::Instant::now();

let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_localhost_pool(port).await?;
maybe_create_indexes(&graph).await?;
dbg!(&start.elapsed());

let start = std::time::Instant::now();

let path = env!("CARGO_MANIFEST_DIR");
let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json");
let orders = read_orders_from_file(buf).unwrap();

assert!(orders.len() == 25450);
dbg!(&start.elapsed());

// load 100 orders
load_supporting_data::swap_batch(&orders[..1000], &graph, 1000).await?;
// load 1000 orders
load_supporting_data::swap_batch(&orders[..1000], &graph, 1000, None).await?;

// now check data was loaded
let mut result = graph
Expand All @@ -96,8 +87,7 @@ async fn e2e_swap_data() -> Result<()> {
// check accounts should have been inserted
while let Some(row) = result.next().await? {
let num: i64 = row.get("num").unwrap();
dbg!(&num);
// assert!(num == 3);
assert!(num == 850);
}

Ok(())
Expand Down

0 comments on commit 4c29675

Please sign in to comment.