From d3d1eff78ce7cdb8c5a91d6b5c347670b849533c Mon Sep 17 00:00:00 2001 From: Alex Sickler Date: Fri, 29 Mar 2024 10:09:14 -0400 Subject: [PATCH 1/7] :art: move helper functions to a separate file and add pagination --- .../modules/dewrangle/helper_functions.py | 485 ++++++++++++++++++ d3b_dff_cli/modules/dewrangle/volume.py | 447 +--------------- 2 files changed, 497 insertions(+), 435 deletions(-) create mode 100644 d3b_dff_cli/modules/dewrangle/helper_functions.py diff --git a/d3b_dff_cli/modules/dewrangle/helper_functions.py b/d3b_dff_cli/modules/dewrangle/helper_functions.py new file mode 100644 index 0000000..e3a7d34 --- /dev/null +++ b/d3b_dff_cli/modules/dewrangle/helper_functions.py @@ -0,0 +1,485 @@ +"""Dewrangle helper functions""" + +import os +import sys +import traceback +import configparser +from gql import gql, Client +from gql.transport.aiohttp import AIOHTTPTransport +from datetime import datetime + + +def check_mutation_result(result): + """Check the result of a mutation and handle error(s)""" + + for my_key in result: + my_error = result[my_key]["errors"] + if my_error is not None: + raise RuntimeError( + "The following error occurred when running mutation:\n{}".format( + my_error + ) + ) + + return + + +def get_api_credential(): + """Get api token from credential file.""" + config = configparser.ConfigParser() + config.read(os.path.join(os.path.expanduser("~"), ".dewrangle", "credentials")) + return config["default"]["api_key"] + + +def get_cred_id(client, study_id, cred_name=None): + """Get credential id""" + + cred_id = None + + # get all credentials in study + credentials = get_study_credentials(client, study_id) + + cred_id = pick_external_id(cred_name, credentials, "credential") + + return cred_id + + +def create_gql_client(endpoint=None, api_key=None): + """Create GraphQL client connection""" + + # default endpoint + if endpoint is None: + endpoint = "https://dewrangle.com/api/graphql" + + if api_key: + req_header = {"X-Api-Key": api_key} + else: + req_header = {"X-Api-Key": get_api_credential()} + + transport = AIOHTTPTransport( + url=endpoint, + headers=req_header, + ) + client = Client(transport=transport, fetch_schema_from_transport=True) + + return client + + +def get_study_credentials(client, study_id): + """Get credential ids from a study.""" + + # query all studies and credentials the user has access to. + # in the future, this should be a simpler query to get study id from study name + credentials = {} + + # set up query to get all credentials in the study + query = gql( + """ + query Study_Query($id: ID!) { + study: node(id: $id) { + id + ... on Study { + credentials { + edges { + node { + id + name + key + } + } + } + } + } + } + """ + ) + + params = {"id": study_id} + + # run query + result = client.execute(query, variable_values=params) + + # loop through query results, find the study we're looking for and it's volumes + for study in result: + for cred_edge in result[study]["credentials"]["edges"]: + cred = cred_edge["node"] + cid = cred["id"] + name = cred["name"] + key = cred["key"] + credentials[cid] = {"name": name, "key": key} + + return credentials + + +def get_all_studies(client): + """Query all available studies, return study ids and names""" + + studies = {} + + # set up query to get all available studies + query = gql( + """ + query { + viewer { + organizationUsers { + edges { + node { + organization { + name + id + studies { + edges { + node { + name + id + globalId + } + } + } + } + } + } + } + } + } + """ + ) + + # run query + result = client.execute(query) + + for org_edge in result["viewer"]["organizationUsers"]["edges"]: + for study_edge in org_edge["node"]["organization"]["studies"]["edges"]: + study = study_edge["node"] + id = study["id"] + name = study["name"] + global_id = study["globalId"] + studies[id] = {"name": name, "global_id": global_id} + + return studies + + +def get_study_id(client, study_name): + """Query all available studies, return study id""" + + study_id = "" + study_ids = [] + + # get a dictionary of all study ids and names + studies = get_all_studies(client) + + # loop through query results, find the study we're looking for and it's volumes + for study in studies: + if study_name in [study, studies[study]["global_id"], studies[study]["name"]]: + study_ids.append(study) + + if len(study_ids) == 1: + study_id = study_ids[0] + elif len(study_ids) == 0: + raise ValueError("Study {} not found".format(study_name)) + else: + raise ValueError( + "Study {} found multiple times. Please delete or rename studies so there is only one {}".format( + study_name, study_name + ) + ) + + return study_id + + +def get_org_id_from_study(client, study_id): + """Query study id and get the id of the organization it's in""" + + org_id = "" + # set up query to get all available studies + query = gql( + """ + query Study_Query($id: ID!) { + study: node(id: $id) { + ... on Study { + organization { + id + } + } + } + } + """ + ) + + params = {"id": study_id} + + # run query + result = client.execute(query, params) + + org_id = result["study"]["organization"]["id"] + + return org_id + + +def get_billing_id(client, org_id, billing=None): + "Get billing group id. If a name is provided, check it exists. If not return org default." + + # first get a list of organizations and billing groups + billing_groups = get_billing_groups(client, org_id) + + billing_id = pick_external_id(billing, billing_groups, "billing_group") + + return billing_id + + +def get_study_volumes(client, study_id): + """Query study id, and return volumes in that study""" + study_volumes = {} + # set up query to get all available studies + query = gql( + """ + query Study_Query($id: ID!, $after: ID) { + study: node(id: $id) { + ... on Study { + volumes(first:100, after: $after) { + pageInfo { + hasNextPage + endCursor + } + edges { + cursor + node { + id + name + pathPrefix + } + } + } + } + } + } + """ + ) + + # set up initial parameter for query (just id) + params = {"id": study_id} + + # if there's still more results in the query, process the page of results and do it again + has_next_page = True + while has_next_page == True: + + # run query + result = client.execute(query, params) + + page_info = result["study"]["volumes"]["pageInfo"] + has_next_page = page_info["hasNextPage"] + after = page_info["endCursor"] + for volume_edge in result["study"]["volumes"]["edges"]: + volume = volume_edge["node"] + vid = volume["id"] + vname = volume["name"] + prefix = volume["pathPrefix"] + study_volumes[vid] = {"name": vname, "pathPrefix": prefix} + + # add the last cursor id to the query to get the next set of results + params["after"] = after + + return study_volumes + + +def process_volumes(study, volumes, **kwargs): + """Check if a volume is already loaded to a study. + Inputs: study id, dictionary of volumes in the study, optionally volume name or volume id. + Outputs: volume id""" + volume_id = kwargs.get("vid", None) + vname = kwargs.get("vname", None) + vpre = kwargs.get("prefix", None) + + if volume_id: + if volume_id not in volumes.keys(): + raise ValueError( + "Volume id not present in study. Ensure you are providing the whole volume id." + ) + else: + # see how many times the volume was added to the study + matching_volumes = [] + for vol in volumes: + if volumes[vol]["name"] == vname and volumes[vol]["pathPrefix"] == vpre: + matching_volumes.append(vol) + count = len(matching_volumes) + + if count == 0: + print("{} volume not found in {}".format(vname, study)) + elif count == 1: + volume_id = matching_volumes[0] + else: + print( + "==============================================================================================" + ) + print("Multiple volumes named {} found in {}".format(vname, study)) + print( + "Rerun this script using the '--vid' option with the volume id of the volume you want to delete" + ) + print("Matching volumes and ids are:") + for mvol in matching_volumes: + print("{}: {}".format(vname, mvol)) + print( + "==============================================================================================" + ) + + return volume_id + + +def get_volume_jobs(client, vid): + """Query volume for a list of jobs""" + jobs = {} + + query = gql( + """ + query Volume_Job_Query($id: ID!) { + volume: node(id: $id) { + id + ... on Volume { + jobs { + edges { + node { + id + operation + completedAt + createdAt + } + } + } + } + } + } + """ + ) + + params = {"id": vid} + + # run query + result = client.execute(query, variable_values=params) + + # format result + for vol in result: + for job in result[vol]["jobs"]: + for node in result[vol]["jobs"][job]: + id = node["node"]["id"] + # convert createdAt from string to datetime object + created = datetime.strptime( + node["node"]["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + op = node["node"]["operation"] + comp = datetime.strptime( + node["node"]["completedAt"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + jobs[id] = {"operation": op, "createdAt": created, "completedAt": comp} + + return jobs + + +def list_and_hash_volume(client, volume_id, billing_id): + """Run Dewrangle list and hash volume mutation.""" + + # prepare mutation + mutation = gql( + """ + mutation VolumeListHashMutation($id: ID!, $input: VolumeListAndHashInput!) { + volumeListAndHash(id: $id, input: $input) { + errors { + ... on MutationError { + message + field + } + } + job { + id + } + } + } + """ + ) + + params = {"id": volume_id} + params["input"] = {"billingGroupId": billing_id} + + # run mutation + result = client.execute(mutation, variable_values=params) + + check_mutation_result(result) + + job_id = result["volumeListAndHash"]["job"]["id"] + + return job_id + + +def pick_external_id(name, externals, external_type): + """From a dictionary of either credential or billing group ids, pick the one to use.""" + + ext_id = None + + org = "other" + if external_type.lower() == "billing_group": + org = "organization" + elif external_type.lower() == "credential": + org = "study" + + message = "" + + if len(externals) == 1 and name is None: + ((ext_id, info),) = externals.items() + ext_id = list(externals.keys())[0] + elif name: + for ext in externals: + if name == externals[ext]["name"]: + ext_id = ext + if ext_id is None: + message = "{} {} not found in {}".format( + external_type.capitalize(), name, org + ) + elif len(externals) == 0: + message = "No credentials in study." + else: + message = "Multiple {} found in {} but none provided. Please run again and provide one of the following crdentials ids:{}{}".format( + external_type, org, "\n", externals + ) + + if ext_id is None: + raise ValueError(message) + + return ext_id + + +def get_billing_groups(client, org_id): + """Get available billing groups for an organization.""" + + billing_groups = {} + + # query all organizations, studies, and billing groups the user has access to. + # set up query to get all available studies + query = gql( + """ + query Org_Query($id: ID!) { + organization: node(id: $id) { + ... on Organization { + billingGroups { + edges { + node { + name + id + } + } + } + } + } + } + """ + ) + + params = {"id": org_id} + + # run query + result = client.execute(query, params) + + for bg in result["organization"]["billingGroups"]["edges"]: + name = bg["node"]["name"] + id = bg["node"]["id"] + billing_groups[id] = {"name": name} + + return billing_groups diff --git a/d3b_dff_cli/modules/dewrangle/volume.py b/d3b_dff_cli/modules/dewrangle/volume.py index dc74de0..016e61c 100644 --- a/d3b_dff_cli/modules/dewrangle/volume.py +++ b/d3b_dff_cli/modules/dewrangle/volume.py @@ -7,6 +7,7 @@ from gql import gql, Client from gql.transport.aiohttp import AIOHTTPTransport from datetime import datetime +from . import helper_functions as hf def parse_hash_args(args): @@ -23,356 +24,6 @@ def parse_hash_args(args): return (prefix, region, study, bucket, aws_cred, billing) -def check_mutation_result(result): - """Check the result of a mutation and handle error(s)""" - - for my_key in result: - my_error = result[my_key]["errors"] - if my_error is not None: - raise RuntimeError( - "The following error occurred when running mutation:\n{}".format( - my_error - ) - ) - - return - - -def get_api_credential(): - """Get api token from credential file.""" - config = configparser.ConfigParser() - config.read(os.path.join(os.path.expanduser("~"), ".dewrangle", "credentials")) - return config["default"]["api_key"] - - -def get_cred_id(client, study_id, cred_name=None): - """Get credential id""" - - cred_id = None - - # get all credentials in study - credentials = get_study_credentials(client, study_id) - - cred_id = pick_external_id(cred_name, credentials, "credential") - - return cred_id - - -def create_gql_client(endpoint=None, api_key=None): - """Create GraphQL client connection""" - - # default endpoint - if endpoint is None: - endpoint = "https://dewrangle.com/api/graphql" - - if api_key: - req_header = {"X-Api-Key": api_key} - else: - req_header = {"X-Api-Key": get_api_credential()} - - transport = AIOHTTPTransport( - url=endpoint, - headers=req_header, - ) - client = Client(transport=transport, fetch_schema_from_transport=True) - - return client - - -def get_study_credentials(client, study_id): - """Get credential ids from a study.""" - - # query all studies and credentials the user has access to. - # in the future, this should be a simpler query to get study id from study name - credentials = {} - - # set up query to get all credentials in the study - query = gql( - """ - query Study_Query($id: ID!) { - study: node(id: $id) { - id - ... on Study { - credentials { - edges { - node { - id - name - key - } - } - } - } - } - } - """ - ) - - params = {"id": study_id} - - # run query - result = client.execute(query, variable_values=params) - - # loop through query results, find the study we're looking for and it's volumes - for study in result: - for cred_edge in result[study]["credentials"]["edges"]: - cred = cred_edge["node"] - cid = cred["id"] - name = cred["name"] - key = cred["key"] - credentials[cid] = {"name": name, "key": key} - - return credentials - - -def get_all_studies(client): - """Query all available studies, return study ids and names""" - - studies = {} - - # set up query to get all available studies - query = gql( - """ - query { - viewer { - organizationUsers { - edges { - node { - organization { - name - id - studies { - edges { - node { - name - id - globalId - } - } - } - } - } - } - } - } - } - """ - ) - - # run query - result = client.execute(query) - - for org_edge in result["viewer"]["organizationUsers"]["edges"]: - for study_edge in org_edge["node"]["organization"]["studies"]["edges"]: - study = study_edge["node"] - id = study["id"] - name = study["name"] - global_id = study["globalId"] - studies[id] = {"name": name, "global_id": global_id} - - return studies - - -def get_study_id(client, study_name): - """Query all available studies, return study id""" - - study_id = "" - study_ids = [] - - # get a dictionary of all study ids and names - studies = get_all_studies(client) - - # loop through query results, find the study we're looking for and it's volumes - for study in studies: - if study_name in [study, studies[study]["global_id"], studies[study]["name"]]: - study_ids.append(study) - - if len(study_ids) == 1: - study_id = study_ids[0] - elif len(study_ids) == 0: - raise ValueError("Study {} not found".format(study_name)) - else: - raise ValueError( - "Study {} found multiple times. Please delete or rename studies so there is only one {}".format( - study_name, study_name - ) - ) - - return study_id - - -def get_org_id_from_study(client, study_id): - """Query study id and get the id of the organization it's in""" - - org_id = "" - # set up query to get all available studies - query = gql( - """ - query Study_Query($id: ID!) { - study: node(id: $id) { - ... on Study { - organization { - id - } - } - } - } - """ - ) - - params = {"id": study_id} - - # run query - result = client.execute(query, params) - - org_id = result["study"]["organization"]["id"] - - return org_id - - -def get_billing_id(client, org_id, billing=None): - "Get billing group id. If a name is provided, check it exists. If not return org default." - - # first get a list of organizations and billing groups - billing_groups = get_billing_groups(client, org_id) - - billing_id = pick_external_id(billing, billing_groups, "billing_group") - - return billing_id - - -def get_study_volumes(client, study_id): - """Query study id, and return volumes in that study""" - study_volumes = {} - # set up query to get all available studies - query = gql( - """ - query Study_Query($id: ID!) { - study: node(id: $id) { - ... on Study { - volumes { - edges { - node { - id - name - pathPrefix - } - } - } - } - } - } - """ - ) - - params = {"id": study_id} - - # run query - result = client.execute(query, params) - - for study in result: - for volume_edge in result["study"]["volumes"]["edges"]: - volume = volume_edge["node"] - vid = volume["id"] - vname = volume["name"] - prefix = volume["pathPrefix"] - study_volumes[vid] = {"name": vname, "pathPrefix": prefix} - - return study_volumes - - -def process_volumes(study, volumes, **kwargs): - """Check if a volume is already loaded to a study. - Inputs: study id, dictionary of volumes in the study, optionally volume name or volume id. - Outputs: volume id""" - volume_id = kwargs.get("vid", None) - vname = kwargs.get("vname", None) - vpre = kwargs.get("prefix", None) - - if volume_id: - if volume_id not in volumes.keys(): - raise ValueError( - "Volume id not present in study. Ensure you are providing the whole volume id." - ) - else: - # see how many times the volume was added to the study - matching_volumes = [] - for vol in volumes: - if volumes[vol]["name"] == vname and volumes[vol]["pathPrefix"] == vpre: - matching_volumes.append(vol) - count = len(matching_volumes) - - if count == 0: - print("{} volume not found in {}".format(vname, study)) - elif count == 1: - volume_id = matching_volumes[0] - else: - print( - "==============================================================================================" - ) - print("Multiple volumes named {} found in {}".format(vname, study)) - print( - "Rerun this script using the '--vid' option with the volume id of the volume you want to delete" - ) - print("Matching volumes and ids are:") - for mvol in matching_volumes: - print("{}: {}".format(vname, mvol)) - print( - "==============================================================================================" - ) - - return volume_id - - -def get_volume_jobs(client, vid): - """Query volume for a list of jobs""" - jobs = {} - - query = gql( - """ - query Volume_Job_Query($id: ID!) { - volume: node(id: $id) { - id - ... on Volume { - jobs { - edges { - node { - id - operation - completedAt - createdAt - } - } - } - } - } - } - """ - ) - - params = {"id": vid} - - # run query - result = client.execute(query, variable_values=params) - - # format result - for vol in result: - for job in result[vol]["jobs"]: - for node in result[vol]["jobs"][job]: - id = node["node"]["id"] - # convert createdAt from string to datetime object - created = datetime.strptime( - node["node"]["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ" - ) - op = node["node"]["operation"] - comp = datetime.strptime( - node["node"]["completedAt"], "%Y-%m-%dT%H:%M:%S.%fZ" - ) - jobs[id] = {"operation": op, "createdAt": created, "completedAt": comp} - - return jobs - - def add_volume(client, study_id, prefix, region, bucket, aws_cred): """Run Dewrangle create volume mutation.""" @@ -412,7 +63,7 @@ def add_volume(client, study_id, prefix, region, bucket, aws_cred): # run mutation result = client.execute(mutation, variable_values=params) - check_mutation_result(result) + hf.check_mutation_result(result) volume_id = result["volumeCreate"]["volume"]["id"] @@ -447,89 +98,13 @@ def list_and_hash_volume(client, volume_id, billing_id): # run mutation result = client.execute(mutation, variable_values=params) - check_mutation_result(result) + hf.check_mutation_result(result) job_id = result["volumeListAndHash"]["job"]["id"] return job_id -def pick_external_id(name, externals, external_type): - """From a dictionary of either credential or billing group ids, pick the one to use.""" - - ext_id = None - - org = "other" - if external_type.lower() == "billing_group": - org = "organization" - elif external_type.lower() == "credential": - org = "study" - - message = "" - - if len(externals) == 1 and name is None: - ((ext_id, info),) = externals.items() - ext_id = list(externals.keys())[0] - elif name: - for ext in externals: - if name == externals[ext]["name"]: - ext_id = ext - if ext_id is None: - message = "{} {} not found in {}".format( - external_type.capitalize(), name, org - ) - elif len(externals) == 0: - message = "No credentials in study." - else: - message = "Multiple {} found in {} but none provided. Please run again and provide one of the following crdentials ids:{}{}".format( - external_type, org, "\n", externals - ) - - if ext_id is None: - raise ValueError(message) - - return ext_id - - -def get_billing_groups(client, org_id): - """Get available billing groups for an organization.""" - - billing_groups = {} - - # query all organizations, studies, and billing groups the user has access to. - # set up query to get all available studies - query = gql( - """ - query Org_Query($id: ID!) { - organization: node(id: $id) { - ... on Organization { - billingGroups { - edges { - node { - name - id - } - } - } - } - } - } - """ - ) - - params = {"id": org_id} - - # run query - result = client.execute(query, params) - - for bg in result["organization"]["billingGroups"]["edges"]: - name = bg["node"]["name"] - id = bg["node"]["id"] - billing_groups[id] = {"name": name} - - return billing_groups - - def load_and_hash_volume( bucket_name, study_name, @@ -545,25 +120,27 @@ def load_and_hash_volume( Output: job id of parent job created when volume is hashed. """ - client = create_gql_client(api_key=token) + client = hf.create_gql_client(api_key=token) job_id = None try: # get study and org ids - study_id = get_study_id(client, study_name) - org_id = get_org_id_from_study(client, study_id) + study_id = hf.get_study_id(client, study_name) + org_id = hf.get_org_id_from_study(client, study_id) # get billing group id - billing_group_id = get_billing_id(client, org_id, billing) + billing_group_id = hf.get_billing_id(client, org_id, billing) # check if volume loaded to study - study_volumes = get_study_volumes(client, study_id) - volume_id = process_volumes(study_id, study_volumes, vname=bucket_name, prefix=prefix) + study_volumes = hf.get_study_volumes(client, study_id) + volume_id = hf.process_volumes( + study_id, study_volumes, vname=bucket_name, prefix=prefix + ) if volume_id is None: # need to load, get credential - aws_cred_id = get_cred_id(client, study_id, aws_cred) + aws_cred_id = hf.get_cred_id(client, study_id, aws_cred) # load it volume_id = add_volume( From 20c1d9c24ac8252cb47a8d1f971644bba9a83227 Mon Sep 17 00:00:00 2001 From: Alex Sickler Date: Fri, 29 Mar 2024 10:55:55 -0400 Subject: [PATCH 2/7] :sparkles: add list_jobs and download subcommands to cli --- d3b_dff_cli/cli.py | 55 +++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/d3b_dff_cli/cli.py b/d3b_dff_cli/cli.py index 43fa831..ff31e03 100644 --- a/d3b_dff_cli/cli.py +++ b/d3b_dff_cli/cli.py @@ -5,50 +5,47 @@ from .modules.validation.check_readgroup import main as check_readgroup from .modules.validation.check_url import main as check_url from .modules.dewrangle.volume import main as hash_volume +from .modules.dewrangle.list_jobs import main as list_jobs +from .modules.dewrangle.download_job import main as download_job -def add_hash_arguments(my_parser): +def add_dewrangle_arguments(my_parser): """ - Create parser for volume hash subcommand. + Add standard arguments for Dewrangle subcommands. Input: - my_parser: argparse parser being added to Output: - original parser with added arguments """ - hash_parser = my_parser.add_parser( - "hash", help="Hash volume in Dewrangle" - ) - hash_parser.add_argument( + my_parser.add_argument( "-prefix", help="Optional, Path prefix. Default: None", default=None, required=False, ) - hash_parser.add_argument( + my_parser.add_argument( "-region", help="Optional, Bucket AWS region code. Default: us-east-1", default="us-east-1", required=False, ) - hash_parser.add_argument( + my_parser.add_argument( "-billing", help="Optional, billing group name. When not provided, use default billing group for organization", default=None, required=False, ) - hash_parser.add_argument( + my_parser.add_argument( "-credential", help="Dewrangle AWS credential name. Default, try to find available credential.", required=False, ) - hash_parser.add_argument( + my_parser.add_argument( "-study", help="Study name, global id, or study id", required=True ) - hash_parser.add_argument("-bucket", help="Bucket name", required=True) - hash_parser.set_defaults(func=hash_volume) - - return hash_parser + my_parser.add_argument("-bucket", help="Bucket name", required=True) + return my_parser def main(): @@ -104,14 +101,37 @@ def main(): parser_url.add_argument("urls", nargs="+", help="One or more URLs to validate") parser_url.set_defaults(func=check_url) - # Volume Command + # Dewrangle commands + # hash: load a bucket to Dewrangle and hash it + # list_jobs: list jobs run on a bucket + # download: download the results of a job dewrangle_parser = subparsers.add_parser("dewrangle", help="Dewrangle commands") dewrangle_subparsers = dewrangle_parser.add_subparsers( title="Dewrangle Subcommands", dest="dewrangle_command" ) - # volume hash subcommand - hash_parser = add_hash_arguments(dewrangle_subparsers) + # hash subcommand + hash_parser = dewrangle_subparsers.add_parser("hash", help="Hash volume in Dewrangle") + hash_parser = add_dewrangle_arguments(hash_parser) + hash_parser.set_defaults(func=hash_volume) + + # list_jobs subcommand + list_parser = dewrangle_subparsers.add_parser( + "list_jobs", help="List volume jobs in Dewrangle" + ) + list_parser = add_dewrangle_arguments(list_parser) + list_parser.set_defaults(func=list_jobs) + + # download subcommand + dl_parser = dewrangle_subparsers.add_parser( + "download", help="Download job results from Dewrangle" + ) + dl_parser.add_argument( + "-jobid", + help="Dewrangle jobid", + required=True, + ) + dl_parser.set_defaults(func=download_job) args = parser.parse_args() @@ -140,6 +160,5 @@ def main(): sys.exit(2) - if __name__ == "__main__": main() From d59260cbc703a6e8b219c6260dae042f42904509 Mon Sep 17 00:00:00 2001 From: Alex Sickler Date: Fri, 29 Mar 2024 13:58:52 -0400 Subject: [PATCH 3/7] :sparkles: add list_jobs subcommand --- .../modules/dewrangle/helper_functions.py | 158 ++++++++++++++++++ d3b_dff_cli/modules/dewrangle/list_jobs.py | 75 +++++++++ 2 files changed, 233 insertions(+) create mode 100644 d3b_dff_cli/modules/dewrangle/list_jobs.py diff --git a/d3b_dff_cli/modules/dewrangle/helper_functions.py b/d3b_dff_cli/modules/dewrangle/helper_functions.py index e3a7d34..477fbde 100644 --- a/d3b_dff_cli/modules/dewrangle/helper_functions.py +++ b/d3b_dff_cli/modules/dewrangle/helper_functions.py @@ -483,3 +483,161 @@ def get_billing_groups(client, org_id): billing_groups[id] = {"name": name} return billing_groups + + +def get_volume_jobs(client, vid): + """Query volume for a list of jobs""" + jobs = {} + + query = gql( + """ + query Volume_Job_Query($id: ID!) { + volume: node(id: $id) { + id + ... on Volume { + jobs { + edges { + node { + id + operation + completedAt + createdAt + } + } + } + } + } + } + """ + ) + + params = {"id": vid} + + # run query + result = client.execute(query, variable_values=params) + + # format result + for vol in result: + for job in result[vol]["jobs"]: + for node in result[vol]["jobs"][job]: + id = node["node"]["id"] + # convert createdAt from string to datetime object + created = datetime.strptime( + node["node"]["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + op = node["node"]["operation"] + comp = datetime.strptime( + node["node"]["completedAt"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + jobs[id] = {"operation": op, "createdAt": created, "completedAt": comp} + + return jobs + + +def get_most_recent_job(client, vid, job_type): + """Query volume and get most recent job""" + jid = None + recent_date = None + + jobs = get_volume_jobs(client, vid) + + if job_type.upper() in ["HASH", "VOLUME_HASH"]: + job_type = "VOLUME_HASH" + elif job_type.upper() in ["LIST", "VOLUME_LIST"]: + job_type = "VOLUME_LIST" + else: + raise ValueError("Unsupported job type: {}".format(job_type)) + + for job in jobs: + if jobs[job]["operation"] == job_type: + # check if date is most recent + if recent_date is None or jobs[job]["createdAt"] > recent_date: + recent_date = jobs[job]["createdAt"] + jid = job + + if jid is None: + raise ValueError( + "no job(s) matching job type: {} found in volume".format(job_type) + ) + + return jid + + +def get_job_info(jobid, client=None): + """Query job info with job id""" + + if client is None: + client = create_gql_client() + + query = gql( + """ + query Job_Query($id: ID!) { + job: node(id: $id) { + id + ... on Job { + operation + createdAt + completedAt + errors { + edges { + node { + message + id + } + } + billingGroup { + name + } + cost { + cents + } + parentJob { + id + operation + createdAt + completedAt + errors { + edges { + node { + message + id + } + } + billingGroup { + name + } + cost { + cents + } + } + children { + id + operation + createdAt + completedAt + errors { + edges { + node { + message + id + } + } + billingGroup { + name + } + cost { + cents + } + } + } + } + } + """ + ) + + params = {"id": jobid} + + # run query + result = client.execute(query, variable_values=params) + + return result diff --git a/d3b_dff_cli/modules/dewrangle/list_jobs.py b/d3b_dff_cli/modules/dewrangle/list_jobs.py new file mode 100644 index 0000000..dfc681c --- /dev/null +++ b/d3b_dff_cli/modules/dewrangle/list_jobs.py @@ -0,0 +1,75 @@ +"""List jobs run on a bucket using the Dewrangle GraphQL API.""" + +from . import helper_functions as hf + +def list_volume_jobs( + bucket_name, + study_name, + prefix=None, + token=None, +): + """ + Wrapper function that checks if a volume is loaded, and hashes it. + Inputs: AWS bucket name, study name, aws region, and optional volume prefix. + Output: job id of parent job created when volume is hashed. + """ + + client = hf.create_gql_client(api_key=token) + + # get study and org ids + study_id = hf.get_study_id(client, study_name) + org_id = hf.get_org_id_from_study(client, study_id) + + # check if volume loaded to study + study_volumes = hf.get_study_volumes(client, study_id) + volume_id = hf.process_volumes(study_id, study_volumes, vname=bucket_name, prefix=prefix) + + print(volume_id) + + jobs = hf.get_volume_jobs(client, volume_id) + + # print all jobs + print( + "========================================================================================" + ) + print("All jobs in volume:") + print("JobID|createdAt|completedAt|Job_Type") + for job in jobs: + print( + "{} | {} | {} | {}".format( + job, + jobs[job]["createdAt"], + jobs[job]["completedAt"], + jobs[job]["operation"], + ) + ) + + print( + "========================================================================================" + ) + + # get most recent job and print id + print( + "Most recent hash job id: {}".format( + hf.get_most_recent_job(client, volume_id, "hash") + ) + ) + print( + "Most recent list job id: {}".format( + hf.get_most_recent_job(client, volume_id, "list") + ) + ) + + print("Done!") + + return + + +def main(args): + """Main function.""" + + list_volume_jobs( + args.bucket, + args.study, + args.prefix, + ) From 15c37bba3c469adc5b1a381f2fa59df1b6083084 Mon Sep 17 00:00:00 2001 From: Alex Sickler Date: Mon, 1 Apr 2024 14:22:10 -0400 Subject: [PATCH 4/7] :sparkle: added output file option to download subcommand --- d3b_dff_cli/cli.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/d3b_dff_cli/cli.py b/d3b_dff_cli/cli.py index ff31e03..f4225ce 100644 --- a/d3b_dff_cli/cli.py +++ b/d3b_dff_cli/cli.py @@ -6,7 +6,7 @@ from .modules.validation.check_url import main as check_url from .modules.dewrangle.volume import main as hash_volume from .modules.dewrangle.list_jobs import main as list_jobs -from .modules.dewrangle.download_job import main as download_job +from .modules.dewrangle.download_job import main as download_dewrangle_job def add_dewrangle_arguments(my_parser): @@ -131,7 +131,12 @@ def main(): help="Dewrangle jobid", required=True, ) - dl_parser.set_defaults(func=download_job) + dl_parser.add_argument( + "-outfile", + help="Output file name", + required=True, + ) + dl_parser.set_defaults(func=download_dewrangle_job) args = parser.parse_args() From d39399b06c8027e68405bbec3c0d344583f89163 Mon Sep 17 00:00:00 2001 From: Alex Sickler Date: Mon, 1 Apr 2024 14:22:37 -0400 Subject: [PATCH 5/7] :fire: remove unused code --- d3b_dff_cli/modules/dewrangle/list_jobs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/d3b_dff_cli/modules/dewrangle/list_jobs.py b/d3b_dff_cli/modules/dewrangle/list_jobs.py index dfc681c..784f75e 100644 --- a/d3b_dff_cli/modules/dewrangle/list_jobs.py +++ b/d3b_dff_cli/modules/dewrangle/list_jobs.py @@ -18,7 +18,6 @@ def list_volume_jobs( # get study and org ids study_id = hf.get_study_id(client, study_name) - org_id = hf.get_org_id_from_study(client, study_id) # check if volume loaded to study study_volumes = hf.get_study_volumes(client, study_id) From 95fd4036f0f057fe55c66b04b848b89c40957267 Mon Sep 17 00:00:00 2001 From: Alex Sickler Date: Mon, 1 Apr 2024 14:24:01 -0400 Subject: [PATCH 6/7] :sparkles: add code to download job results --- .../modules/dewrangle/helper_functions.py | 141 +++++++++--------- 1 file changed, 74 insertions(+), 67 deletions(-) diff --git a/d3b_dff_cli/modules/dewrangle/helper_functions.py b/d3b_dff_cli/modules/dewrangle/helper_functions.py index 477fbde..230a44f 100644 --- a/d3b_dff_cli/modules/dewrangle/helper_functions.py +++ b/d3b_dff_cli/modules/dewrangle/helper_functions.py @@ -4,6 +4,8 @@ import sys import traceback import configparser +import requests +import pandas as pd from gql import gql, Client from gql.transport.aiohttp import AIOHTTPTransport from datetime import datetime @@ -65,6 +67,20 @@ def create_gql_client(endpoint=None, api_key=None): return client +def create_rest_creds(endpoint=None, api_key=None): + """Create Rest connection""" + + # default endpoint + if endpoint is None: + endpoint = "https://dewrangle.com/api/rest/jobs/" + + if api_key: + req_header = {"X-Api-Key": api_key} + else: + req_header = {"X-Api-Key": get_api_credential()} + return endpoint, req_header + + def get_study_credentials(client, study_id): """Get credential ids from a study.""" @@ -485,55 +501,6 @@ def get_billing_groups(client, org_id): return billing_groups -def get_volume_jobs(client, vid): - """Query volume for a list of jobs""" - jobs = {} - - query = gql( - """ - query Volume_Job_Query($id: ID!) { - volume: node(id: $id) { - id - ... on Volume { - jobs { - edges { - node { - id - operation - completedAt - createdAt - } - } - } - } - } - } - """ - ) - - params = {"id": vid} - - # run query - result = client.execute(query, variable_values=params) - - # format result - for vol in result: - for job in result[vol]["jobs"]: - for node in result[vol]["jobs"][job]: - id = node["node"]["id"] - # convert createdAt from string to datetime object - created = datetime.strptime( - node["node"]["createdAt"], "%Y-%m-%dT%H:%M:%S.%fZ" - ) - op = node["node"]["operation"] - comp = datetime.strptime( - node["node"]["completedAt"], "%Y-%m-%dT%H:%M:%S.%fZ" - ) - jobs[id] = {"operation": op, "createdAt": created, "completedAt": comp} - - return jobs - - def get_most_recent_job(client, vid, job_type): """Query volume and get most recent job""" jid = None @@ -566,9 +533,6 @@ def get_most_recent_job(client, vid, job_type): def get_job_info(jobid, client=None): """Query job info with job id""" - if client is None: - client = create_gql_client() - query = gql( """ query Job_Query($id: ID!) { @@ -585,11 +549,6 @@ def get_job_info(jobid, client=None): id } } - billingGroup { - name - } - cost { - cents } parentJob { id @@ -603,11 +562,6 @@ def get_job_info(jobid, client=None): id } } - billingGroup { - name - } - cost { - cents } } children { @@ -622,11 +576,6 @@ def get_job_info(jobid, client=None): id } } - billingGroup { - name - } - cost { - cents } } } @@ -641,3 +590,61 @@ def get_job_info(jobid, client=None): result = client.execute(query, variable_values=params) return result + + +def request_to_df(url, **kwargs): + """Call api and return response as a pandas dataframe.""" + my_data = [] + with requests.get(url, **kwargs) as response: + # check if the request was successful + if response.status_code == 200: + for line in response.iter_lines(): + my_data.append(line.decode().split(",")) + else: + print(f"Failed to fetch the CSV. Status code: {response.status_code}") + + my_cols = my_data.pop(0) + df = pd.DataFrame(my_data, columns=my_cols) + return df + + +def download_job_result(jobid, client=None, api_key=None): + """Check if a job is complete, download results if it is. + If the job is a list and hash job, only download the hash result.""" + + endpoint, req_header = create_rest_creds(api_key=api_key) + + job_status = None + + job_result = None + + job_info = get_job_info(jobid, client) + + # check if it's done + if ( + job_info["job"]["completedAt"] != "" + and job_info["job"]["completedAt"] is not None + ): + job_status = "Complete" + + else: + job_status = "Incomplete" + + if job_status == "Complete": + job_type = job_info["job"]["operation"] + # we can only download results for hash or list jobs so check that the job is one of those + if job_type in ["VOLUME_LIST", "VOLUME_HASH", "VOLUME_LIST_AND_HASH"]: + # if the job is a parent job, find the hash job to get it's result + if ( + job_type == "VOLUME_LIST_AND_HASH" + and len(job_info["job"]["children"]) != 0 + ): + for child_job in job_info["job"]["children"]: + if child_job["operation"] == "VOLUME_HASH": + jobid = child_job["id"] + url = endpoint + jobid + "/result" + job_result = request_to_df(url, headers=req_header, stream=True) + else: + print("Job type {} does not have results to download".format(job_type)) + + return job_status, job_result From da7b8519e2158ab0f3ee3afb1a95022336618197 Mon Sep 17 00:00:00 2001 From: Alex Sickler Date: Mon, 1 Apr 2024 14:24:21 -0400 Subject: [PATCH 7/7] :sparkles: add download job result subcommand --- d3b_dff_cli/modules/dewrangle/download_job.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 d3b_dff_cli/modules/dewrangle/download_job.py diff --git a/d3b_dff_cli/modules/dewrangle/download_job.py b/d3b_dff_cli/modules/dewrangle/download_job.py new file mode 100644 index 0000000..68ccf5c --- /dev/null +++ b/d3b_dff_cli/modules/dewrangle/download_job.py @@ -0,0 +1,24 @@ +"""Download job results from Dewrangle.""" + +from . import helper_functions as hf + +def download_job(jobid, token=None): + """ + Function to download results from Dewrangle + Input: Dewrangle job id + Output: object with job resuls + """ + + client = hf.create_gql_client(api_key=token) + + return hf.download_job_result(jobid, client=client, api_key=token) + + +def main(args): + """Main function.""" + + status, job_df = download_job(args.jobid) + if status == "Complete": + job_df.to_csv(args.outfile) + else: + print("Job incomplete, please check again later.")