Skip to content

Commit

Permalink
more work on hett and lots of pipeline fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl committed Jul 26, 2024
1 parent c4e4976 commit 484d549
Show file tree
Hide file tree
Showing 21 changed files with 862 additions and 90 deletions.
2 changes: 1 addition & 1 deletion 01_ingest/hett_pesticides_appril.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

for obj in df.to_dict(orient='records'):
obj = {k: v for k, v in obj.items() if pd.notna(v)}
obj = {re.sub(r'[^\w\s:]', '',k): v for k, v in obj.items() if pd.notna(v)}

if 'PESTS' in obj:
obj['PESTS'] = list(map(lambda p: p.strip(), obj['PESTS'].split(',')))
Expand Down
2 changes: 1 addition & 1 deletion 01_ingest/hett_pesticides_eu.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

for obj in df.to_dict(orient='records'):
obj = {k: v for k, v in obj.items() if pd.notna(v)}
obj = {re.sub(r'[^\w\s:]', '',k): v for k, v in obj.items() if pd.notna(v)}

if 'Authorised' in obj:
obj['Authorised'] = list(map(lambda p: p.strip(), obj['Authorised'].split(',')))
Expand Down
6 changes: 5 additions & 1 deletion 01_ingest/hett_pesticides_gb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
df.rename(columns={col: 'Category' for col in df.columns if col.startswith('Category')}, inplace=True)

df['id'] = df['Substance Name']

# remove any rows with empty id (in this case substance name)
df = df[df['id'].notna() & df['id'].str.strip().ne('')]

df.rename(columns={col: 'grebi:name' for col in df.columns if col == 'Substance Name'}, inplace=True)

df['grebi:type'] = 'hett:AgroSubstance'
Expand All @@ -25,7 +29,7 @@
df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

for obj in df.to_dict(orient='records'):
obj = {k: v for k, v in obj.items() if pd.notna(v)}
obj = { re.sub(r'[^\w\s:]', '',k): v for k, v in obj.items() if pd.notna(v)}

if 'Category' in obj:
obj['Category'] = obj['Category'].split(',')
Expand Down
2 changes: 1 addition & 1 deletion 02_equivalences/grebi_assign_ids/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use grebi_shared::find_strings;
struct Args {

#[arg(long)]
add_prefix: String, // used to prepend the subgraph name like hra_kg:
add_prefix: String, // used to prepend the subgraph name like hra_kg:g:

#[arg(long)]
groups_txt: String,
Expand Down
10 changes: 6 additions & 4 deletions 03_merge/grebi_merge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct Args {
_files: Vec<String>,
}

#[derive(Debug)]
struct BufferedLine {
input_index:usize,
line:Vec<u8>
Expand Down Expand Up @@ -105,7 +106,10 @@ fn main() -> std::io::Result<()> {

cur_lines.make_contiguous()
.sort_by(|a, b| {
return get_id(&a.line).cmp(&get_id(&b.line)); });
return a.line.cmp(&b.line); });

//eprintln!("cur_lines: {:?}", cur_lines.iter().map(|line| String::from_utf8(line.line.clone()).unwrap()).collect::<Vec<_>>() );
//eprintln!("cur_lines values: {:?}", cur_lines);

loop {

Expand Down Expand Up @@ -138,9 +142,7 @@ fn main() -> std::io::Result<()> {
break;
}
} else {
let new_id = get_id(&line_buf);

match cur_lines.binary_search_by(|probe| { return get_id(&probe.line).cmp(&new_id); }) {
match cur_lines.binary_search_by(|probe| { return probe.line.cmp(&line_buf); }) {
Ok(pos) => cur_lines.insert(pos, BufferedLine { input_index, line: line_buf }),
Err(pos) => cur_lines.insert(pos, BufferedLine { input_index, line: line_buf })
}
Expand Down
17 changes: 16 additions & 1 deletion 04_index/grebi_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ struct Args {

#[arg(long)]
out_names_txt: String,

#[arg(long)]
out_ids_txt: String
}

fn main() {
Expand All @@ -51,10 +54,12 @@ fn main() {
let mut entity_props_to_count:HashMap<Vec<u8>,i64> = HashMap::new();
let mut edge_props_to_count:HashMap<Vec<u8>,i64> = HashMap::new();
let mut all_names:BTreeSet<Vec<u8>> = BTreeSet::new();
let mut all_ids:BTreeSet<Vec<u8>> = BTreeSet::new();

let mut summary_writer = BufWriter::new(File::create(&args.out_summary_json_path).unwrap());
let mut metadata_writer = BufWriter::new(File::create(&args.out_metadata_jsonl_path).unwrap());
let mut names_writer = BufWriter::new(File::create(&args.out_names_txt).unwrap());
let mut ids_writer = BufWriter::new(File::create(&args.out_ids_txt).unwrap());

let mut line:Vec<u8> = Vec::new();
let mut n:i64 = 0;
Expand Down Expand Up @@ -169,12 +174,17 @@ fn main() {
if reified_u.value_kind == JsonTokenType::StartString {
all_names.insert(reified_u.value[1..reified_u.value.len()-1].to_vec());
}
} else if prop_key.eq(b"id") {
if reified_u.value_kind == JsonTokenType::StartString {
all_ids.insert(reified_u.value[1..reified_u.value.len()-1].to_vec());
}
}

}
} else if val.kind == JsonTokenType::StartString {
if prop_key.eq(b"grebi:name") || prop.key.eq(b"grebi:synonym") {
all_names.insert(val.value[1..val.value.len()-1].to_vec());
} else if prop_key.eq(b"id") {
all_ids.insert(val.value[1..val.value.len()-1].to_vec());
}
}
}
Expand Down Expand Up @@ -215,6 +225,11 @@ fn main() {
names_writer.write_all(&name).unwrap();
names_writer.write_all(b"\n").unwrap();
}

for id in all_ids {
ids_writer.write_all(&id).unwrap();
ids_writer.write_all(b"\n").unwrap();
}

eprintln!("Building metadata took {} seconds", start_time3.elapsed().as_secs());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "grebi_make_csv"
name = "grebi_make_neo_csv"
version = "0.1.0"
edition = "2021"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ struct Args {
#[arg(long)]
out_edges_csv_path: String,

#[arg(long)]
out_id_nodes_csv_path: String,

#[arg(long)]
out_id_edges_csv_path: String,
}
Expand All @@ -67,7 +64,6 @@ fn main() -> std::io::Result<()> {
}



let mut nodes_reader = BufReader::new(File::open(args.in_nodes_jsonl).unwrap());
let mut edges_reader = BufReader::new(File::open(args.in_edges_jsonl).unwrap());

Expand All @@ -83,12 +79,6 @@ fn main() -> std::io::Result<()> {
&edges_file
);

let mut id_nodes_file = File::create(args.out_id_nodes_csv_path).unwrap();
let mut id_nodes_writer =
BufWriter::with_capacity(1024*1024*32,
&id_nodes_file
);

let mut id_edges_file = File::create(args.out_id_edges_csv_path).unwrap();
let mut id_edges_writer =
BufWriter::with_capacity(1024*1024*32,
Expand All @@ -115,7 +105,6 @@ fn main() -> std::io::Result<()> {
edges_writer.write_all("\n".as_bytes()).unwrap();


id_nodes_writer.write_all("id:ID,:LABEL\n".as_bytes()).unwrap();
id_edges_writer.write_all(":START_ID,:TYPE,:END_ID\n".as_bytes()).unwrap();


Expand All @@ -134,7 +123,7 @@ fn main() -> std::io::Result<()> {

let sliced = SlicedEntity::from_json(&line);

write_node(&line, &sliced, &all_entity_props, &mut nodes_writer, &mut id_nodes_writer, &mut id_edges_writer);
write_node(&line, &sliced, &all_entity_props, &mut nodes_writer, &mut id_edges_writer);

n_nodes = n_nodes + 1;
if n_nodes % 1000000 == 0 {
Expand Down Expand Up @@ -176,7 +165,7 @@ fn main() -> std::io::Result<()> {
Ok(())
}

fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<String>, nodes_writer:&mut BufWriter<&File>, id_nodes_writer:&mut BufWriter<&File>, id_edges_writer:&mut BufWriter<&File>) {
fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<String>, nodes_writer:&mut BufWriter<&File>, id_edges_writer:&mut BufWriter<&File>) {

let refs:Map<String,Value> = serde_json::from_slice(entity._refs.unwrap()).unwrap();

Expand Down Expand Up @@ -225,7 +214,7 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<Stri
}
if row_prop.key == "id".as_bytes() {
for val in row_prop.values.iter() {
write_id_row(val, id_nodes_writer, id_edges_writer, &entity.id);
write_id_row(val, id_edges_writer, &entity.id);
}
}
if header_prop.as_bytes() == row_prop.key {
Expand Down Expand Up @@ -351,7 +340,7 @@ fn parse_json_and_write(buf:&[u8], refs:&Map<String,Value>, writer:&mut BufWrite
}
}

fn write_id_row(val:&SlicedPropertyValue, id_nodes_writer:&mut BufWriter<&File>, id_edges_writer:&mut BufWriter<&File>, grebi_node_id:&[u8]) {
fn write_id_row(val:&SlicedPropertyValue, id_edges_writer:&mut BufWriter<&File>, grebi_node_id:&[u8]) {

let actual_id = {
if val.kind == JsonTokenType::StartObject {
Expand All @@ -366,20 +355,12 @@ fn write_id_row(val:&SlicedPropertyValue, id_nodes_writer:&mut BufWriter<&File>,
}
};

id_nodes_writer.write_all(b"\"").unwrap();
write_escaped_value(actual_id, id_nodes_writer);
id_nodes_writer.write_all(b"\",\"").unwrap();
write_escaped_value(b"Id", id_nodes_writer);
id_nodes_writer.write_all(b"\"\n").unwrap();


id_nodes_writer.write_all(b"\"").unwrap();
id_edges_writer.write_all(b"\"").unwrap();
write_escaped_value(grebi_node_id, id_edges_writer);
id_nodes_writer.write_all(b"\",\"").unwrap();
write_escaped_value(b"id", id_nodes_writer);
id_nodes_writer.write_all(b"\",\"").unwrap();
write_escaped_value(actual_id, id_nodes_writer);
id_nodes_writer.write_all(b"\"\n").unwrap();

id_edges_writer.write_all(b"\",\"").unwrap();
write_escaped_value(b"id", id_edges_writer);
id_edges_writer.write_all(b"\",\"").unwrap();
write_escaped_value(actual_id, id_edges_writer);
id_edges_writer.write_all(b"\"\n").unwrap();
}

Loading

0 comments on commit 484d549

Please sign in to comment.