From b31094742aeb293507e828e5a589e36b4507386f Mon Sep 17 00:00:00 2001 From: James McLaughlin Date: Wed, 25 Dec 2024 19:14:02 +0000 Subject: [PATCH] materialised queries in nextflow --- dataload/08_run_queries/run_queries.dockerpy | 2 ++ dataload/08_run_queries/run_queries.py | 6 ++--- .../docker_envs/Dockerfile.neo4j_with_extras | 2 +- dataload/nextflow/codon_nextflow.config | 10 ++++++- dataload/nextflow/load_subgraph.nf | 26 +++++++++---------- 5 files changed, 28 insertions(+), 18 deletions(-) diff --git a/dataload/08_run_queries/run_queries.dockerpy b/dataload/08_run_queries/run_queries.dockerpy index a49e875..03985b5 100644 --- a/dataload/08_run_queries/run_queries.dockerpy +++ b/dataload/08_run_queries/run_queries.dockerpy @@ -6,6 +6,8 @@ from pandas import DataFrame import json from timeit import default_timer as timer +os.system('echo "dbms.security.auth_enabled=false" >> /var/lib/neo4j/conf/neo4j.conf') + os.system("neo4j start") os.system("sleep 20") diff --git a/dataload/08_run_queries/run_queries.py b/dataload/08_run_queries/run_queries.py index 758f66a..93e664f 100644 --- a/dataload/08_run_queries/run_queries.py +++ b/dataload/08_run_queries/run_queries.py @@ -35,13 +35,13 @@ def main(): '--bind ' + shlex.quote(neo_data_path) + ':/data', '--bind ' + shlex.quote(neo_logs_path) + ':/logs', '--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py', - '--bind ' + os.path.abspath(os.environ['QUERY_YAMLS_PATH']) + ':/materialised_queries', + '--bind ' + os.path.abspath(os.environ['GREBI_QUERY_YAMLS_PATH']) + ':/materialised_queries', '--bind ' + os.path.abspath(args.out_sqlites_path) + ':/out', '--writable-tmpfs', '--network=none', '--env NEO4J_AUTH=none', 'docker://ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0', - 'python3 /run_queries.dockerpy' + 'python3 /run_queries.py' ]) else: cmd = ' '.join([ @@ -50,7 +50,7 @@ def main(): '-v ' + shlex.quote(neo_data_path) + ':/data', '-v ' + shlex.quote(neo_logs_path) + ':/logs', '-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py', - '-v ' + os.path.abspath(os.environ['QUERY_YAMLS_PATH']) + ':/materialised_queries', + '-v ' + os.path.abspath(os.environ['GREBI_QUERY_YAMLS_PATH']) + ':/materialised_queries', '-v ' + os.path.abspath(args.out_sqlites_path) + ':/out', '-e NEO4J_AUTH=none', 'ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0', diff --git a/dataload/docker_envs/Dockerfile.neo4j_with_extras b/dataload/docker_envs/Dockerfile.neo4j_with_extras index 8f6a6a1..f56907f 100644 --- a/dataload/docker_envs/Dockerfile.neo4j_with_extras +++ b/dataload/docker_envs/Dockerfile.neo4j_with_extras @@ -2,5 +2,5 @@ FROM neo4j:5.18.0 RUN apt-get update && apt-get install -y python python3-pip sqlite3 rsync -RUN pip install pandas py2neo +RUN pip install pandas py2neo pyyaml diff --git a/dataload/nextflow/codon_nextflow.config b/dataload/nextflow/codon_nextflow.config index 4ee99da..480e15d 100644 --- a/dataload/nextflow/codon_nextflow.config +++ b/dataload/nextflow/codon_nextflow.config @@ -7,7 +7,7 @@ process { } process { - withName: materialise { + withName: link { memory = 96.GB } } @@ -89,5 +89,13 @@ process { } } +process { + withName: run_materialised_queries { + cpus = 8 + memory = 1500.GB + } +} + + diff --git a/dataload/nextflow/load_subgraph.nf b/dataload/nextflow/load_subgraph.nf index 32e1631..c6ade6c 100644 --- a/dataload/nextflow/load_subgraph.nf +++ b/dataload/nextflow/load_subgraph.nf @@ -11,6 +11,7 @@ params.subgraph = "$GREBI_SUBGRAPH" params.timestamp = "$GREBI_TIMESTAMP" params.is_ebi = "$GREBI_IS_EBI" params.solr_mem = "140g" +params.neo_tmp_path = "/dev/shm" workflow { @@ -29,13 +30,13 @@ workflow { indexed = index(merged.collect()) - materialise(merged.flatten(), indexed.metadata_jsonl, indexed.summary_json, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt) - merge_summary_jsons(indexed.summary_json.collect() + materialise.out.mat_summary.collect()) + link(merged.flatten(), indexed.metadata_jsonl, indexed.summary_json, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt) + merge_summary_jsons(indexed.summary_json.collect() + link.out.mat_summary.collect()) - compressed_blobs = create_compressed_blobs(materialise.out.nodes.mix(materialise.out.edges)) + compressed_blobs = create_compressed_blobs(link.out.nodes.mix(link.out.edges)) sqlite = create_sqlite(compressed_blobs.collect()) - neo_input_dir = prepare_neo(indexed.summary_json, materialise.out.nodes, materialise.out.edges) + neo_input_dir = prepare_neo(indexed.summary_json, link.out.nodes, link.out.edges) ids_csv = create_neo_ids_csv(indexed.ids_txt) neo_db = create_neo( @@ -47,7 +48,7 @@ workflow { mat_queries_sqlites = run_materialised_queries(neo_db) - solr_inputs = prepare_solr(materialise.out.nodes, materialise.out.edges) + solr_inputs = prepare_solr(link.out.nodes, link.out.edges) solr_nodes_core = create_solr_nodes_core(prepare_solr.out.nodes.collect(), indexed.names_txt) solr_edges_core = create_solr_edges_core(prepare_solr.out.edges.collect(), indexed.names_txt) solr_autocomplete_core = create_solr_autocomplete_core(indexed.names_txt) @@ -227,7 +228,7 @@ process index { """ } -process materialise { +process link { cache "lenient" memory "4 GB" time "8h" @@ -244,8 +245,8 @@ process materialise { path(groups_txt) output: - path("materialised_nodes_${task.index}.jsonl"), emit: nodes - path("materialised_edges_${task.index}.jsonl"), emit: edges + path("linked_nodes_${task.index}.jsonl"), emit: nodes + path("linked_edges_${task.index}.jsonl"), emit: edges path("mat_summary_${task.index}.json"), emit: mat_summary script: @@ -257,11 +258,11 @@ process materialise { --in-metadata-jsonl ${metadata_jsonl} \ --in-summary-json ${index_summary_json} \ --groups-txt ${groups_txt} \ - --out-edges-jsonl materialised_edges_${task.index}.jsonl \ + --out-edges-jsonl linked_edges_${task.index}.jsonl \ --out-summary-json mat_summary_${task.index}.json \ --exclude ${exclude.iterator().join(",")} \ --exclude-self-referential ${exclude_self_referential.iterator().join(",")} \ - > materialised_nodes_${task.index}.jsonl + > linked_nodes_${task.index}.jsonl """ } @@ -440,7 +441,6 @@ process run_materialised_queries { memory "8 GB" time "8h" cpus "8" - neo_tmp_path "/dev/shm" publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true @@ -454,9 +454,9 @@ process run_materialised_queries { """ #!/usr/bin/env bash set -Eeuo pipefail - cp -r ${neo_db}/* ${task.neo_tmp_path} + cp -r ${neo_db}/* ${params.neo_tmp_path} PYTHONUNBUFFERED=true python3 ${params.home}/08_run_queries/run_queries.py \ - --in-db-path ${task.neo_tmp_path} \ + --in-db-path ${params.neo_tmp_path} \ --out-sqlites-path materialised_queries """ }