Skip to content

Commit

Permalink
Merge pull request #59 from bgpkit/search-improvement
Browse files Browse the repository at this point in the history
Search improvement
  • Loading branch information
digizeph authored Jun 28, 2024
2 parents a5e5a96 + 263c5f0 commit fab32cb
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 48 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ name = "monocle"
path = "src/bin/monocle.rs"

[dependencies]
bgpkit-broker = "0.7.0-beta.5"
bgpkit-parser = { version = "0.10.5", features = ["serde"] }
bgpkit-broker = "0.7.0"
bgpkit-parser = { version = "0.10.9", features = ["serde"] }
oneio = { version = "0.16.7", default-features = false, features = ["remote", "gz", "bz"] }

clap = { version = "4.1", features = ["derive"] }
itertools = "0.12"
itertools = "0.13.0"
rayon = "1.8"
tracing = "0.1"
tracing-subscriber = "0.3"
Expand All @@ -35,7 +35,7 @@ anyhow = "1.0"
tabled = "0.14"
config = { version = "0.13", features = ["toml"] }
dirs = "5"
rusqlite = { version = "0.30", features = ["bundled"] }
rusqlite = { version = "0.31", features = ["bundled"] }
ureq = { version = "2.9", features = ["json"] }
regex = "1.10"
rpki = { version = "0.16.1", features = ["repository"] }
Expand Down
53 changes: 31 additions & 22 deletions src/bin/monocle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(clippy::type_complexity)]
use std::io::Write;
use std::net::IpAddr;
use std::path::PathBuf;
Expand Down Expand Up @@ -363,16 +364,19 @@ enum RadarCommands {
},
}

fn elem_to_string(elem: &BgpElem, json: bool, pretty: bool) -> String {
fn elem_to_string(elem: &BgpElem, json: bool, pretty: bool, collector: &str) -> String {
if json {
let val = json!(elem);
let mut val = json!(elem);
val.as_object_mut()
.unwrap()
.insert("collector".to_string(), collector.into());
if pretty {
serde_json::to_string_pretty(&val).unwrap()
} else {
val.to_string()
}
} else {
elem.to_string()
format!("{}|{}", elem, collector)
}
}

Expand Down Expand Up @@ -402,8 +406,9 @@ fn main() {
return;
}

let file_path = file_path.to_str().unwrap();
let parser = parser_with_filters(
file_path.to_str().unwrap(),
file_path,
&filters.origin_asn,
&filters.prefix,
&filters.include_super,
Expand All @@ -419,7 +424,7 @@ fn main() {

let mut stdout = std::io::stdout();
for elem in parser {
let output_str = elem_to_string(&elem, json, pretty);
let output_str = elem_to_string(&elem, json, pretty, "");
if let Err(e) = writeln!(stdout, "{}", &output_str) {
if e.kind() != std::io::ErrorKind::BrokenPipe {
eprintln!("{e}");
Expand Down Expand Up @@ -503,18 +508,19 @@ fn main() {
return;
}

let (sender, receiver): (Sender<BgpElem>, Receiver<BgpElem>) = channel();
let (sender, receiver): (Sender<(BgpElem, String)>, Receiver<(BgpElem, String)>) =
channel();
// progress bar
let (pb_sender, pb_receiver): (Sender<u8>, Receiver<u8>) = channel();
let (pb_sender, pb_receiver): (Sender<u32>, Receiver<u32>) = channel();

// dedicated thread for handling output of results
let writer_thread = thread::spawn(move || match sqlite_db {
Some(db) => {
let mut msg_cache = vec![];
let mut msg_count = 0;
for elem in receiver {
for (elem, collector) in receiver {
msg_count += 1;
msg_cache.push(elem);
msg_cache.push((elem, collector));
if msg_cache.len() >= 100000 {
db.insert_elems(&msg_cache);
msg_cache.clear();
Expand All @@ -527,8 +533,8 @@ fn main() {
println!("processed {total_items} files, found {msg_count} messages, written into file {sqlite_path_str}");
}
None => {
for elem in receiver {
let output_str = elem_to_string(&elem, json, pretty);
for (elem, collector) in receiver {
let output_str = elem_to_string(&elem, json, pretty, collector.as_str());
println!("{output_str}");
}
}
Expand All @@ -541,24 +547,25 @@ fn main() {
}

let sty = indicatif::ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {eta}",
"[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {eta} left; {msg}",
)
.unwrap()
.progress_chars("##-");
let pb = indicatif::ProgressBar::new(total_items as u64);
pb.set_style(sty);
for _ in pb_receiver.iter() {
let mut total_count: u64 = 0;
for count in pb_receiver.iter() {
total_count += count as u64;
pb.set_message(format!("found {total_count} messages"));
pb.inc(1);
}
});

let urls = items
.iter()
.map(|x| x.url.to_string())
.collect::<Vec<String>>();

urls.into_par_iter()
.for_each_with((sender, pb_sender), |(s, pb_sender), url| {
items
.into_par_iter()
.for_each_with((sender, pb_sender), |(s, pb_sender), item| {
let url = item.url;
let collector = item.collector_id;
info!("start parsing {}", url.as_str());
let parser = parser_with_filters(
url.as_str(),
Expand All @@ -575,12 +582,14 @@ fn main() {
)
.unwrap();

let mut elems_count = 0;
for elem in parser {
s.send(elem).unwrap()
s.send((elem, collector.clone())).unwrap();
elems_count += 1;
}

if show_progress {
pb_sender.send(0).unwrap();
pb_sender.send(elems_count).unwrap();
}
info!("finished parsing {}", url.as_str());
});
Expand Down
31 changes: 9 additions & 22 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ impl MsgStore {
}

fn initialize_msgs_db(db: &mut MonocleDatabase, reset: bool) {
if reset {
db.conn.execute("drop table if exists elems", []).unwrap();
}
db.conn
.execute(
r#"
create table if not exists elems (
timestamp INTEGER,
elem_type TEXT,
collector TEXT,
peer_ip TEXT,
peer_asn INTEGER,
prefix TEXT,
Expand All @@ -63,10 +67,6 @@ impl MsgStore {
[],
)
.unwrap();

if reset {
db.conn.execute("delete from elems", []).unwrap();
}
}

#[inline(always)]
Expand All @@ -78,11 +78,11 @@ impl MsgStore {
}
}

pub fn insert_elems(&self, elems: &[BgpElem]) {
pub fn insert_elems(&self, elems: &[(BgpElem, String)]) {
for elems in elems.chunks(10000) {
let values = elems
.iter()
.map(|elem| {
.map(|(elem, collector)| {
let t = match elem.elem_type {
// bgpkit_parser::ElemType::ANNOUNCE => "A",
// bgpkit_parser::ElemType::WITHDRAW => "W",
Expand All @@ -91,9 +91,10 @@ impl MsgStore {
};
let origin_string = elem.origin_asns.as_ref().map(|asns| asns.first().unwrap());
format!(
"('{}','{}','{}','{}','{}', {},{},{},{},{},{},{},'{}',{},{})",
"('{}','{}','{}', '{}','{}','{}', {},{},{},{},{},{},{},'{}',{},{})",
elem.timestamp as u32,
t,
collector,
elem.peer_ip,
elem.peer_asn,
elem.prefix,
Expand All @@ -116,7 +117,7 @@ impl MsgStore {
.to_string();
let query = format!(
"INSERT INTO elems (\
timestamp, elem_type, peer_ip, peer_asn, prefix, next_hop, \
timestamp, elem_type, collector, peer_ip, peer_asn, prefix, next_hop, \
as_path, origin_asns, origin, local_pref, med, communities,\
atomic, aggr_asn, aggr_ip)\
VALUES {values};"
Expand All @@ -125,17 +126,3 @@ impl MsgStore {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use bgpkit_parser::BgpkitParser;

#[test]
fn test_insert() {
let store = MsgStore::new(&Some("test.sqlite3".to_string()), false);
let url = "https://spaces.bgpkit.org/parser/update-example.gz";
let elems: Vec<BgpElem> = BgpkitParser::new(url).unwrap().into_elem_iter().collect();
store.insert_elems(&elems);
}
}

0 comments on commit fab32cb

Please sign in to comment.