diff --git a/CITATION.cff b/CITATION.cff index 4db6f7c..e04c860 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -1,8 +1,8 @@ cff-version: "1.1.0" message: "If you use this software, please cite it using these metadata." title: ElasticBLAST -version: "0.2.6" -date-released: 2022-06-08 +version: "0.2.7" +date-released: 2022-08-11 license: "NCBI Public Domain" repository-code: "https://github.com/ncbi/elastic-blast/" authors: diff --git a/bin/aws-get-auto-scaling-events.sh b/bin/aws-get-auto-scaling-events.sh new file mode 100755 index 0000000..5c3fa65 --- /dev/null +++ b/bin/aws-get-auto-scaling-events.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# aws-get-auto-scaling-events.sh: Get autoscaling events for ElasticBLAST's +# AWS Batch compute environment +# +# Author: Greg Boratyn (borayng@ncbi.nlm.nih.gov) +# Created: Fri Aug 12 17:28:20 EDT 2022 + +# The script assumes that elastic-blast.log file exists +logfile=${1:-elastic-blast.log} +COMP_ENV_NAME=$(grep ComputeEnvName $logfile | tr '/' '\t' | cut -f 2 | tail -n 1) +if [ ! -z "${COMP_ENV_NAME}" ] ; then + AUTO_SCALE_GRP_NAME=$(aws autoscaling describe-auto-scaling-groups --output json | jq -Mr '.AutoScalingGroups[].AutoScalingGroupName' | grep $COMP_ENV_NAME) + if [ $? -eq 0 ] ; then + aws autoscaling describe-scaling-activities --auto-scaling-group-name $AUTO_SCALE_GRP_NAME + else + echo "Failed to find an AWS auto scaling group for the AWS Batch Compute environment $COMP_ENV_NAME" + exit 1 + fi +else + echo "Failed to find an AWS Batch Compute environment in $logfile" + exit 1 +fi diff --git a/bin/blast-tuner.py b/bin/blast-tuner.py index daa45a2..bbddc0b 100755 --- a/bin/blast-tuner.py +++ b/bin/blast-tuner.py @@ -75,7 +75,7 @@ def main(): if args.db is not None: try: db_metadata = get_db_metadata(args.db, sp.get_db_mol_type(args.program), - db_source) + db_source, gcp_prj=args.gcp_project) except FileNotFoundError: raise UserReportError(returncode=BLASTDB_ERROR, message=f'Metadata for BLAST database "{args.db}" was not found or database molecular type is not the same as required by BLAST program: "{args.program}"') @@ -169,6 +169,7 @@ def create_arg_parser(): help='Number of residues or bases in query sequecnes') optional.add_argument("--db-source", type=str, help="Where NCBI-provided databases are downloaded from, default: AWS", choices=['AWS', 'GCP', 'NCBI']) optional.add_argument("--region", type=str, help=f'Cloud Service Provider region. Defaults: {ELB_DFLT_AWS_REGION} for AWS; {ELB_DFLT_GCP_REGION} for GCP') + optional.add_argument("--gcp-project", type=str, help=f'GCP project, required if --db-source or --csp-target is GCP') optional.add_argument("--options", type=str, help='BLAST options', default='') optional.add_argument("--db-mem-limit-factor", type=float, help='This number times database bytes-to-cache will produce memory limit for a BLAST search. (default: 0.0: for AWS, 1.1 for GCP)') diff --git a/docker-blast/Dockerfile-build-from-local-sources b/docker-blast/Dockerfile-build-from-local-sources index 86b67dd..90b4be6 100644 --- a/docker-blast/Dockerfile-build-from-local-sources +++ b/docker-blast/Dockerfile-build-from-local-sources @@ -52,6 +52,7 @@ RUN apt-get -y -m update && \ RUN mkdir -p /blast/bin /blast/lib COPY --from=blast /blast/bin /blast/bin +COPY update_blastdb.pl /blast/bin COPY --from=blast /blast/lib /blast/lib COPY --from=blast /root/edirect /root/edirect COPY splitq_download_db_search /blast/bin/ diff --git a/docker-blast/Makefile b/docker-blast/Makefile index aaa558c..020f994 100644 --- a/docker-blast/Makefile +++ b/docker-blast/Makefile @@ -30,7 +30,7 @@ GCP_IMG?=gcr.io/ncbi-sandbox-blast/${IMG} AWS_SERVER?=public.ecr.aws/i6v3i0i9 AWS_IMG?=${AWS_SERVER}/elasticblast-elb AWS_REGION?=us-east-1 -VERSION?=1.1.0 +VERSION?=1.1.1 ifeq (, $(shell which vmtouch 2>/dev/null)) NOVMTOUCH?=--no-vmtouch diff --git a/docker-blast/splitq_download_db_search b/docker-blast/splitq_download_db_search index ca6dc9a..6817c2c 100755 --- a/docker-blast/splitq_download_db_search +++ b/docker-blast/splitq_download_db_search @@ -249,15 +249,15 @@ def _download_database(args, is_user, db_done): verbose = ' --verbose --verbose --verbose --verbose --verbose --verbose' if args.verbose else '' creds = ' --no-sign-request' if args.no_creds else '' nprocs_to_download_db = min(MAX_PROCS_TO_DOWNLOAD_DB, int(os.cpu_count()/args.num_threads)) + p = safe_exec(f"time update_blastdb.pl taxdb --decompress --source ncbi {verbose} --num_threads {nprocs_to_download_db}") + print(p.stdout.decode(), end='') + print(p.stderr.decode(), end='') if is_user: - p = safe_exec(f"time update_blastdb.pl taxdb --source {args.source}{verbose} --num_threads {nprocs_to_download_db}") - print(p.stdout.decode(), end='') - print(p.stderr.decode(), end='') p = safe_exec(f"time aws s3 cp --only-show-errors{creds} {os.path.join(args.db_path,'')} . --recursive --exclude * --include {args.db}.* --include taxdb.*") print(p.stdout.decode(), end='') print(p.stderr.decode(), end='') else: - p = safe_exec(f"time update_blastdb.pl {args.db} --source {args.source}{verbose} --num_threads {nprocs_to_download_db}") + p = safe_exec(f"time update_blastdb.pl {args.db} --decompress --source {args.source}{verbose} --num_threads {nprocs_to_download_db}") print(p.stdout.decode(), end='') print(p.stderr.decode(), end='') print('End database download') diff --git a/docker-blast/update_blastdb.pl b/docker-blast/update_blastdb.pl new file mode 100755 index 0000000..84d4844 --- /dev/null +++ b/docker-blast/update_blastdb.pl @@ -0,0 +1,904 @@ +#!/usr/bin/env perl +# $Id: update_blastdb.pl 653871 2022-08-04 18:06:43Z camacho $ +# =========================================================================== +# +# PUBLIC DOMAIN NOTICE +# National Center for Biotechnology Information +# +# This software/database is a "United States Government Work" under the +# terms of the United States Copyright Act. It was written as part of +# the author's official duties as a United States Government employee and +# thus cannot be copyrighted. This software/database is freely available +# to the public for use. The National Library of Medicine and the U.S. +# Government have not placed any restriction on its use or reproduction. +# +# Although all reasonable efforts have been taken to ensure the accuracy +# and reliability of the software and data, the NLM and the U.S. +# Government do not and cannot warrant the performance or results that +# may be obtained by using this software or data. The NLM and the U.S. +# Government disclaim all warranties, express or implied, including +# warranties of performance, merchantability or fitness for any particular +# purpose. +# +# Please cite the author in any work or product based on this material. +# +# =========================================================================== +# +# Author: Christiam Camacho +# +# File Description: +# Script to download the pre-formatted BLAST databases. +# +# =========================================================================== + +use strict; +use warnings; +use Net::FTP; +use Getopt::Long; +use Pod::Usage; +use File::stat; +use Digest::MD5; +use Archive::Tar; +use File::Temp; +use JSON::PP; + +use constant NCBI_FTP => "ftp.ncbi.nlm.nih.gov"; +use constant BLAST_DB_DIR => "/blast/db"; +use constant USER => "anonymous"; +use constant PASSWORD => "anonymous"; +use constant DEBUG => 0; +use constant MAX_DOWNLOAD_ATTEMPTS => 3; +use constant EXIT_FAILURE => 1; +use constant LEGACY_EXIT_FAILURE => 2; + +use constant AWS_URL => "http://s3.amazonaws.com"; +use constant AMI_URL => "http://169.254.169.254/latest/meta-data/local-hostname"; +use constant AWS_BUCKET => "ncbi-blast-databases"; + +use constant GCP_URL => "http://metadata.google.internal/computeMetadata/v1/instance/id"; +use constant GCP_BUCKET => "gs://blast-db"; + +# TODO: deprecate this in the next release 2.14.x +#use constant BLASTDB_MANIFEST => "blastdb-manifest.json"; +use constant BLASTDB_MANIFEST_VERSION => "1.0"; + +use constant BLASTDB_METADATA => "blastdb-metadata-1-1.json"; +use constant BLASTDB_METADATA_VERSION => "1.1"; + +# Process command line options +my $opt_verbose = 1; +my $opt_quiet = 0; +my $opt_force_download = 0; +my $opt_help = 0; +my $opt_passive = 1; +my $opt_blastdb_ver = undef; +my $opt_timeout = 120; +my $opt_showall = undef; +my $opt_show_version = 0; +my $opt_decompress = 0; +my $opt_source; +my $opt_legacy_exit_code = 0; +my $opt_nt = &get_num_cores(); +my $result = GetOptions("verbose+" => \$opt_verbose, + "quiet" => \$opt_quiet, + "force" => \$opt_force_download, + "passive:s" => \$opt_passive, + "timeout=i" => \$opt_timeout, + "showall:s" => \$opt_showall, + "version" => \$opt_show_version, + "blastdb_version:i" => \$opt_blastdb_ver, + "decompress" => \$opt_decompress, + "source=s" => \$opt_source, + "num_threads=i" => \$opt_nt, + "legacy_exit_code" => \$opt_legacy_exit_code, + "help" => \$opt_help); +$opt_verbose = 0 if $opt_quiet; +die "Failed to parse command line options\n" unless $result; +pod2usage({-exitval => 0, -verbose => 2}) if $opt_help; +if (length($opt_passive) and ($opt_passive !~ /1|no/i)) { + pod2usage({-exitval => 1, -verbose => 0, + -msg => "Invalid value for passive option: '$opt_passive'"}); +} +pod2usage({-exitval => 0, -verbose => 2}) unless (scalar @ARGV or + defined($opt_showall) or + $opt_show_version); +if (defined $opt_blastdb_ver) { + pod2usage({-exitval => 1, -verbose => 0, + -msg => "Invalid BLAST database version: $opt_blastdb_ver. Supported values: 4, 5"}) + unless ($opt_blastdb_ver == 4 or $opt_blastdb_ver == 5); +} +pod2usage({-exitval => 1, -verbose => 0, -msg => "Invalid number of threads"}) + if ($opt_nt <= 0); +if (length($opt_passive) and $opt_passive =~ /n|no/i) { + $opt_passive = 0; +} else { + $opt_passive = 1; +} +my $exit_code = 0; +$|++; + +if ($opt_show_version) { + my $revision = '$Revision: 653871 $'; + $revision =~ s/\$Revision: | \$//g; + print "$0 version $revision\n"; + exit($exit_code); +} +my $curl = &get_curl_path(); +my $gsutil = &get_gsutil_path(); +my $gcloud = &get_gcloud_path(); + +my $location = "NCBI"; +# If provided, the source takes precedence over any attempts to determine the closest location +if (defined($opt_source)) { + if ($opt_source =~ /^ncbi/i) { + $location = "NCBI"; + } elsif ($opt_source =~ /^gc/i) { + $location = "GCP"; + } elsif ($opt_source =~ /^aws/i) { + $location = "AWS"; + } +} else { + # Try to auto-detect whether we're on the cloud + if (defined($curl)) { + my $tmpfile = File::Temp->new(); + my $gcp_cmd = "$curl --connect-timeout 3 --retry 3 --retry-max-time 30 -sfo $tmpfile -H 'Metadata-Flavor: Google' " . GCP_URL; + my $aws_cmd = "$curl --connect-timeout 3 --retry 3 --retry-max-time 30 -sfo /dev/null " . AMI_URL; + print "$gcp_cmd\n" if DEBUG; + if (system($gcp_cmd) == 0) { + # status not always reliable. Check that curl output is all digits. + my $tmpfile_content = do { local $/; <$tmpfile>}; + print "curl output $tmpfile_content\n" if DEBUG; + $location = "GCP" if ($tmpfile_content =~ m/^(\d+)$/); + } elsif (DEBUG) { + # Consult https://ec.haxx.se/usingcurl/usingcurl-returns + print "curl to GCP metadata server returned ", $?>>8, "\n"; + } + + print "$aws_cmd\n" if DEBUG; + if (system($aws_cmd) == 0) { + $location = "AWS"; + } elsif (DEBUG) { + # Consult https://ec.haxx.se/usingcurl/usingcurl-returns + print "curl to AWS metadata server returned ", $?>>8, "\n"; + } + print "Location is $location\n" if DEBUG; + } +} +if ($location =~ /aws/i and not defined $curl) { + print "Error: $0 depends on curl to fetch data from cloud storage, please install this utility to access this data source.\n"; + exit(EXIT_FAILURE); +} +if ($location =~ /gcp/i and (not defined $gsutil or not defined $gcloud)) { + print "Error: $0 depends on gsutil and gcloud to fetch data from cloud storage, please install these utilities to access this data source.\n"; + exit(EXIT_FAILURE); +} +my $gcp_prj = ($location =~ /gcp/i) ? &get_gcp_project() : undef; +if ($location =~ /gcp/i and not defined $gcp_prj) { + print "Error: $0 depends on gcloud being configured to fetch data from cloud storage, please configure it per the instructions in https://cloud.google.com/sdk/docs/initializing .\n"; + exit(EXIT_FAILURE); +} + +my $ftp; + +sub validate_metadata_file +{ + my $json = shift; + my $url = shift; + my $metadata = decode_json($json); + if (ref($metadata) eq 'HASH') { + unless (exists($$metadata{version}) and ($$metadata{version} eq BLASTDB_MANIFEST_VERSION)) { + print STDERR "ERROR: Invalid version in manifest file $url, please report to blast-help\@ncbi.nlm.nih.gov\n"; + exit(EXIT_FAILURE); + } + } elsif (not ref($metadata) eq 'ARRAY') { + print STDERR "ERROR: Invalid metadata format in $url, please report to blast-help\@ncbi.nlm.nih.gov\n"; + exit(EXIT_FAILURE); + } +} + +sub showall_from_metadata_file +{ + my $json = shift; + my $url = shift; + &validate_metadata_file($json, $url); + my $metadata = decode_json($json); + my $print_header = 1; + foreach my $db (sort keys %$metadata) { + next if ($db =~ /^version$/); + if ($opt_showall =~ /tsv/i) { + printf("%s\t%s\t%9.4f\t%s\n", $db, $$metadata{$db}{description}, + $$metadata{$db}{size}, $$metadata{$db}{last_updated}); + } elsif ($opt_showall =~ /pretty/i) { + if ($print_header) { + printf("%-60s %-120s %-11s %15s\n", "BLASTDB", + "DESCRIPTION", "SIZE (GB)", "LAST_UPDATED"); + $print_header = 0; + } + printf("%-60s %-120s %9.4f %15s\n", $db, $$metadata{$db}{description}, + $$metadata{$db}{size}, $$metadata{$db}{last_updated}); + } else { + print "$db\n"; + } + } +} + +# Display metadata from version 1.1 of BLASTDB metadata files +sub showall_from_metadata_file_1_1 +{ + my $json = shift; + my $url = shift; + &validate_metadata_file($json, $url); + my $metadata = decode_json($json); + my $print_header = 1; + foreach my $db (sort @$metadata) { + next if ($$db{version} ne BLASTDB_METADATA_VERSION); + my $gb_total = sprintf("%.4f", $$db{'bytes-total'} * 1e-9); + my $last_updated = $$db{'last-updated'}; + if ($opt_showall =~ /tsv/i) { + printf("%s\t%s\t%9.4f\t%s\n", $$db{dbname}, $$db{description}, + $gb_total, $last_updated); + } elsif ($opt_showall =~ /pretty/i) { + if ($print_header) { + printf("%-60s %-120s %-11s %15s\n", "BLASTDB", + "DESCRIPTION", "SIZE (GB)", "LAST_UPDATED"); + $print_header = 0; + } + $last_updated = $$db{'last-updated'} =~ s/T.*//r; + printf("%-60s %-120s %9.4f %15s\n", $$db{dbname}, $$db{description}, + $gb_total, $last_updated); + } else { + print "$$db{dbname}\n"; + } + } +} + +if ($location ne "NCBI") { + die "Only BLASTDB version 5 is supported at GCP and AWS\n" if (defined $opt_blastdb_ver and $opt_blastdb_ver != 5); + my $latest_dir = &get_latest_dir($location); + my ($json, $url) = &get_blastdb_metadata($location, $latest_dir); + unless (length($json)) { + print STDERR "ERROR: Missing manifest file $url, please report to blast-help\@ncbi.nlm.nih.gov\n"; + exit(EXIT_FAILURE); + } + print "Connected to $location\n" if $opt_verbose; + print "Metadata source $url\n" if ($opt_verbose > 3); + &validate_metadata_file($json, $url); + my $metadata = decode_json($json); + if (defined($opt_showall)) { + &showall_from_metadata_file_1_1($json, $url); + } else { + &ensure_available_disk_space($json, $url); + my @files2download; + for my $requested_db (@ARGV) { + my $found = 0; + foreach my $dbm (sort @$metadata) { + if ($$dbm{'dbname'} eq $requested_db) { + push @files2download, @{$$dbm{files}}; + $found = 1; + last; + } + } + if (not $found) { + print STDERR "ERROR: $requested_db does not exist in $location ($latest_dir)\n"; + my $exit_code = ($opt_legacy_exit_code == 1 ? LEGACY_EXIT_FAILURE : EXIT_FAILURE); + exit $exit_code; + } + } + if (@files2download) { + my $awscli = &get_awscli_path(); + my $cmd; + my $fh = File::Temp->new(); + if ($location eq "GCP") { + $cmd = "$gsutil -u $gcp_prj "; + if ($opt_nt > 1) { + $cmd .= "-m -q "; + $cmd .= "-o 'GSUtil:parallel_thread_count=1' -o 'GSUtil:parallel_process_count=$opt_nt' "; + $cmd .= "cp "; + } else { + $cmd .= "-q cp "; + } + $cmd .= join(" ", @files2download) . " ."; + } else { + if (defined ($awscli)) { + # https://registry.opendata.aws/ncbi-blast-databases/#usageexamples + my $aws_cmd = "$awscli s3 cp --no-sign-request "; + $aws_cmd .= "--only-show-errors " unless $opt_verbose >= 3; + print $fh join("\n", @files2download); + $cmd = "/usr/bin/xargs -P $opt_nt -n 1 -I{}"; + $cmd .= " -t" if $opt_verbose > 3; + $cmd .= " $aws_cmd {} ."; + $cmd .= " <$fh " ; + } else { # fall back to curl for AWS only + my $url = AWS_URL; + s,s3://,$url/, foreach (@files2download); + if ($opt_nt > 1 and -f "/usr/bin/xargs") { + print $fh join("\n", @files2download); + $cmd = "/usr/bin/xargs -P $opt_nt -n 1"; + $cmd .= " -t" if $opt_verbose > 3; + $cmd .= " $curl -sSOR"; + $cmd .= " <$fh " ; + } else { + $cmd = "$curl -sSR"; + $cmd .= " -O $_" foreach (@files2download); + } + } + } + print "$cmd\n" if $opt_verbose > 3; + system($cmd); + } + } + +} else { + # Connect and download files + $ftp = &connect_to_ftp(); + my ($json, $url) = &get_blastdb_metadata($location, '', $ftp); + unless (length($json)) { + print STDERR "ERROR: Missing manifest file $url, please report to blast-help\@ncbi.nlm.nih.gov\n"; + exit(EXIT_FAILURE); + } + print "Metadata source $url\n" if ($opt_verbose > 3); + + if (defined $opt_showall) { + &showall_from_metadata_file_1_1($json, $url) + } else { + &ensure_available_disk_space($json, $url); + my @files = sort(&get_files_to_download()); + my @files2decompress; + $exit_code = &download(\@files, \@files2decompress); + if ($exit_code == 1) { + foreach (@files2decompress) { + $exit_code = &decompress($_); + last if ($exit_code != 1); + } + } + unless ($opt_legacy_exit_code) { + $exit_code = ($exit_code == 1 ? 0 : $exit_code); + } + } + $ftp->quit(); +} + +exit($exit_code); + +# Connects to NCBI ftp server +sub connect_to_ftp +{ + my %ftp_opts; + $ftp_opts{'Passive'} = 1 if $opt_passive; + $ftp_opts{'Timeout'} = $opt_timeout if ($opt_timeout >= 0); + $ftp_opts{'Debug'} = 1 if ($opt_verbose > 1); + my $ftp = Net::FTP->new(NCBI_FTP, %ftp_opts) + or die "Failed to connect to " . NCBI_FTP . ": $!\n"; + $ftp->login(USER, PASSWORD) + or die "Failed to login to " . NCBI_FTP . ": $!\n"; + my $ftp_path = BLAST_DB_DIR; + $ftp_path .= "/v$opt_blastdb_ver" if (defined $opt_blastdb_ver); + $ftp->cwd($ftp_path); + $ftp->binary(); + if ($opt_verbose) { + if (defined $opt_blastdb_ver) { + print "Connected to $location; downloading BLASTDBv$opt_blastdb_ver\n"; + } else { + print "Connected to $location\n"; + } + } + return $ftp; +} + +# Gets the list of available databases on NCBI FTP site +sub get_available_databases +{ + my @blast_db_files = $ftp->ls(); + my @retval = (); + + foreach (@blast_db_files) { + next unless (/\.tar\.gz$/); + push @retval, &extract_db_name($_); + } + my %seen = (); + return grep { ! $seen{$_} ++ } @retval; +} + +# This function exits the program if not enough disk space is available for the +# selected BLASTDBs +sub ensure_available_disk_space +{ + my $json = shift; + my $url = shift; + my $space_needed = 0; + my $space_available = &get_available_disk_space; + print "Available disk space in bytes: $space_available\n" if ($opt_verbose > 3); + return unless $space_available; + + for my $requested_db (@ARGV) { + my $x = &get_database_size_from_metadata_1_1($requested_db, $json, $url); + $space_needed += $x if ($x > 0); + } + print "Needed disk space in bytes: $space_needed\n" if ($opt_verbose > 3); + if ($space_needed > $space_available) { + my $msg = "ERROR: Need $space_needed bytes and only "; + $msg .= "$space_available bytes are available\n"; + print STDERR $msg; + my $exit_code = ($opt_legacy_exit_code == 1 ? LEGACY_EXIT_FAILURE : EXIT_FAILURE); + exit $exit_code; + } +} + +# Returns the available disk space in bytes of the current working directory +# Not supported in windows +sub get_available_disk_space +{ + my $retval = 0; + return $retval if ($^O =~ /mswin/i); + + my $BLK_SIZE = 512; + my $cmd = "df -P --block-size $BLK_SIZE ."; + $cmd = "df -P -b ." if ($^O =~ /darwin/i); + foreach (`$cmd 2>/dev/null`) { + chomp; + next if (/^Filesystem/); + my @F = split; + $retval = $F[3] * $BLK_SIZE if (scalar(@F) == 6); + } + print STDERR "WARNING: unable to compute available disk space\n" unless $retval; + return $retval; +} + +sub get_database_size_from_metadata_1_1 +{ + my $db = shift; + my $json = shift; + my $url = shift; + my $retval = -1; + &validate_metadata_file($json, $url); + my $metadata = decode_json($json); + foreach my $dbm (sort @$metadata) { + if ($$dbm{'dbname'} eq $db) { + $retval = $$dbm{'bytes-total'}; + last; + } + } + print STDERR "Warning: No BLASTDB metadata for $db\n" if ($retval == -1); + return $retval; +} + +# Obtains the list of files to download +sub get_files_to_download +{ + my @blast_db_files = $ftp->ls(); + my @retval = (); + + if ($opt_verbose > 2) { + print "Found the following files on ftp site:\n"; + print "$_\n" for (@blast_db_files); + } + + for my $requested_db (@ARGV) { + for my $file (@blast_db_files) { + next unless ($file =~ /\.tar\.gz$/); + if ($file =~ /^$requested_db\..*/) { + push @retval, $file; + } + } + } + + if ($opt_verbose) { + for my $requested_db (@ARGV) { + unless (grep(/$requested_db/, @retval)) { + print STDERR "$requested_db not found, skipping.\n" + } + } + } + + return @retval; +} + +# Download the requested files only if their checksum files are missing or if +# these (or the archives) are newer in the FTP site. Returns 0 if no files were +# downloaded, 1 if at least one file was downloaded (so that this can be the +# application's exit code) +sub download($$) +{ + my @requested_dbs = @ARGV; + my @files2download = @{$_[0]}; + my $files2decompress = $_[1]; + my $retval = 0; + + for my $file (@files2download) { + + my $attempts = 0; # Download attempts for this file + if ($opt_verbose and &is_multivolume_db($file) and $file =~ /\.00\./) { + my $db_name = &extract_db_name($file); + my $nvol = &get_num_volumes($db_name, @files2download); + print "Downloading $db_name (" . $nvol . " volumes) ...\n" unless ($opt_quiet); + } + + # We preserve the checksum files as evidence of the downloaded archive + my $checksum_file = "$file.md5"; + my $new_download = (-e $checksum_file ? 0 : 1); + my $update_available = ($new_download or + ((stat($checksum_file))->mtime < $ftp->mdtm($checksum_file))); + if (-e $file and (stat($file)->mtime < $ftp->mdtm($file))) { + $update_available = 1; + } + +download_file: + if ($opt_force_download or $new_download or $update_available) { + print "Downloading $file..." if $opt_verbose; + $ftp->get($file); + unless ($ftp->get($checksum_file)) { + print STDERR "Failed to download $checksum_file!\n"; + return EXIT_FAILURE; + } + my $rmt_digest = &read_md5_file($checksum_file); + my $lcl_digest = &compute_md5_checksum($file); + print "\nRMT $file Digest $rmt_digest" if (DEBUG); + print "\nLCL $file Digest $lcl_digest\n" if (DEBUG); + if ($lcl_digest ne $rmt_digest) { + unlink $file, $checksum_file; + if (++$attempts >= MAX_DOWNLOAD_ATTEMPTS) { + print STDERR "too many failures, aborting download!\n"; + return EXIT_FAILURE; + } else { + print "corrupt download, trying again.\n"; + goto download_file; + } + } + push @$files2decompress, $file if ($opt_decompress); + print " [OK]\n" if $opt_verbose; + $retval = 1 if ($retval == 0); + } else { + if ($opt_decompress and -f $file) { + push @$files2decompress, $file; + $retval = 1; + } else { + my $msg = ($opt_decompress + ? "The contents of $file are up to date in your system." + : "$file is up to date."); + print "$msg\n" if $opt_verbose; + } + } + } + return $retval; +} + +# Try to decompress using /bin/tar as Archive::Tar is known to be slower (as +# it's pure perl) +sub _decompress_impl($) +{ + my $file = shift; + if ($^O eq "cygwin") { + local $ENV{PATH} = "/bin:/usr/bin"; + my $cmd = "tar -zxf $file 2>/dev/null"; + return 1 unless (system($cmd)); + } + unless ($^O =~ /mswin/i) { + local $ENV{PATH} = "/bin:/usr/bin"; + my $cmd = "gzip -cd $file 2>/dev/null | tar xf - 2>/dev/null"; + print "$cmd\n" if $opt_verbose > 3; + return 1 unless (system($cmd)); + } + return Archive::Tar->extract_archive($file, 1); +} + +# Decompresses the file passed as its argument +# Returns 1 on success, and 2 on failure, printing an error to STDERR +sub decompress($) +{ + my $file = shift; + print "Decompressing $file ..." unless ($opt_quiet); + my $succeeded = &_decompress_impl($file); + unless ($succeeded) { + my $msg = "Failed to decompress $file ($Archive::Tar::error), "; + $msg .= "please do so manually."; + print STDERR "$msg\n"; + return EXIT_FAILURE; + } + print "rm $file\n" if $opt_verbose > 3; + unlink $file; # Clean up archive, but preserve the checksum file + print " [OK]\n" unless ($opt_quiet); + return 1; +} + +sub compute_md5_checksum($) +{ + my $file = shift; + my $digest = "N/A"; + if (open(DOWNLOADED_FILE, $file)) { + binmode(DOWNLOADED_FILE); + $digest = Digest::MD5->new->addfile(*DOWNLOADED_FILE)->hexdigest; + close(DOWNLOADED_FILE); + } + return $digest; +} + +sub read_md5_file($) +{ + my $md5file = shift; + open(IN, $md5file); + $_ = ; + close(IN); + my @retval = split; + return $retval[0]; +} + +# Determine if a given pre-formatted BLAST database file is part of a +# multi-volume database +sub is_multivolume_db +{ + my $file = shift; + return 1 if ($file =~ /\.\d{2,3}\.tar\.gz$/); + return 0; +} + +# Extracts the database name from the pre-formatted BLAST database archive file +# name +sub extract_db_name +{ + my $file = shift; + my $retval = ""; + if (&is_multivolume_db($file)) { + $retval = $1 if ($file =~ m/(.*)\.\d{2,3}\.tar\.gz$/); + } else { + $retval = $1 if ($file =~ m/(.*)\.tar\.gz$/); + } + return $retval; +} + +# Returns the number of volumes for a BLAST database given the file name of a +# pre-formatted BLAST database and the list of all databases to download +sub get_num_volumes +{ + my $db = shift; + my $retval = 0; + foreach (@_) { + if (/$db/) { + if (/.*\.(\d{2,3})\.tar\.gz$/) { + $retval = int($1) if (int($1) > $retval); + } + } + } + return $retval + 1; +} + +# Retrieves the name of the 'subdirectory' where the latest BLASTDBs reside +sub get_latest_dir +{ + my $source = shift; + my ($retval, $url, $cmd); + if ($source eq "AWS") { + $url = AWS_URL . "/" . AWS_BUCKET . "/latest-dir"; + $cmd = "$curl -s $url"; + } else { + $url = GCP_BUCKET . "/latest-dir"; + $cmd = "$gsutil -u $gcp_prj cat $url"; + } + print "$cmd\n" if DEBUG; + chomp($retval = `$cmd`); + unless (length($retval)) { + print STDERR "ERROR: Missing file $url, please try again or report to blast-help\@ncbi.nlm.nih.gov\n"; + exit(EXIT_FAILURE); + } + print "$source latest-dir: '$retval'\n" if DEBUG; + return $retval; +} + +# Fetches the JSON text containing the BLASTDB metadata +sub get_blastdb_metadata +{ + my $source = shift; + my $latest_dir = shift; + my $ftp = shift; + my ($url, $cmd); + my $retval; + + if ($source eq "AWS") { + $url = AWS_URL . "/" . AWS_BUCKET . "/$latest_dir/" . BLASTDB_METADATA; + $cmd = "curl -sf $url"; + } elsif ($source eq "GCP") { + $url = GCP_BUCKET . "/$latest_dir/" . BLASTDB_METADATA; + $cmd = "$gsutil -u $gcp_prj cat $url"; + } else { + $url = 'ftp://' . NCBI_FTP . "/blast/db/" . BLASTDB_METADATA; + $cmd = "curl -sf $url"; + } + if (defined $ftp) { + my $tmpfh = File::Temp->new(); + print "Downloading " . BLASTDB_METADATA . " to $tmpfh\n" if ($opt_verbose > 3); + $ftp->get(BLASTDB_METADATA, $tmpfh); + $tmpfh->seek(0, 0); + $retval = do { + local $/ = undef; + <$tmpfh>; + }; + } else { + print "$cmd\n" if DEBUG; + chomp($retval = `$cmd`); + } + return ($retval, $url); +} + +# Returns the path to the gsutil utility or undef if it is not found +sub get_gsutil_path +{ + return undef if ($^O =~ /mswin/i); + foreach (qw(/google/google-cloud-sdk/bin /usr/local/bin /usr/bin /snap/bin)) { + my $path = "$_/gsutil"; + return $path if (-f $path); + } + chomp(my $retval = `which gsutil`); + return $retval if (-f $retval); + return undef; +} + +sub get_gcloud_path +{ + return undef if ($^O =~ /mswin/i); + foreach (qw(/google/google-cloud-sdk/bin /usr/local/bin /usr/bin /snap/bin)) { + my $path = "$_/gcloud"; + return $path if (-f $path); + } + chomp(my $retval = `which gcloud`); + return $retval if (-f $retval); + return undef; +} + +sub get_gcp_project +{ + return undef if ($^O =~ /mswin/i); + my $gcloud = &get_gcloud_path(); + chomp(my $retval = `$gcloud config get-value project`); + return $retval; +} + +# Returns the path to the aws CLI utility or undef if it is not found +sub get_awscli_path +{ + return undef if ($^O =~ /mswin/i); + foreach (qw(/usr/local/bin /usr/bin)) { + my $path = "$_/aws"; + return $path if (-f $path); + } + chomp(my $retval = `which aws`); + return $retval if (-f $retval); + return undef; +} + +# Returns the number of cores, or 1 if unknown +sub get_num_cores +{ + my $retval = 1; + if ($^O =~ /linux/i) { + open my $fh, "/proc/cpuinfo" or return $retval; + $retval = scalar(map /^processor/, <$fh>); + close($fh); + } elsif ($^O =~ /darwin/i) { + chomp($retval = `/usr/sbin/sysctl -n hw.ncpu`); + } + return $retval; +} + +# Returns the path to the curl utility, or undef if it is not found +sub get_curl_path +{ + foreach (qw(/usr/local/bin /usr/bin)) { + my $path = "$_/curl"; + return $path if (-f $path); + } + if ($^O =~ /mswin/i) { + my $retval = `where curl`; + if (defined $retval) { + chomp($retval); + return $retval if (-f $retval); + } + } + return undef; +} + +__END__ + +=head1 NAME + +B - Download pre-formatted BLAST databases + +=head1 SYNOPSIS + +update_blastdb.pl [options] blastdb ... + +=head1 OPTIONS + +=over 2 + +=item B<--source> + +Location to download BLAST databases from (default: auto-detect closest location). +Supported values: ncbi, aws, or gcp. + +=item B<--decompress> + +Downloads, decompresses the archives in the current working directory, and +deletes the downloaded archive to save disk space, while preserving the +archive checksum files (default: false). + +=item B<--showall> + +Show all available pre-formatted BLAST databases (default: false). The output +of this option lists the database names which should be used when +requesting downloads or updates using this script. + +It accepts the optional arguments: 'tsv' and 'pretty' to produce tab-separated values +and a human-readable format respectively. These parameters elicit the display of +additional metadata if this is available to the program. +This metadata is displayed in columnar format; the columns represent: + +name, description, size in gigabytes, date of last update (YYYY-MM-DD format). + +=item B<--blastdb_version> + +Specify which BLAST database version to download (default: 5). +Supported values: 4, 5 + +=item B<--passive> + +Use passive FTP, useful when behind a firewall or working in the cloud (default: true). +To disable passive FTP, configure this option as follows: --passive no + +=item B<--timeout> + +Timeout on connection to NCBI (default: 120 seconds). + +=item B<--force> + +Force download even if there is a archive already on local directory (default: +false). + +=item B<--verbose> + +Increment verbosity level (default: 1). Repeat this option multiple times to +increase the verbosity level (maximum 2). + +=item B<--quiet> + +Produce no output (default: false). Overrides the B<--verbose> option. + +=item B<--version> + +Prints this script's version. Overrides all other options. + +=item B<--num_threads> + +Sets the number of threads to utilize to perform downloads in parallel when data comes from the cloud. +Defaults to use all available CPUs on the machine (Linux and macos only). + +=item B<--legacy_exit_code> + +Enables exit codes from prior to version 581818, BLAST+ 2.10.0 release, for +downloads from NCBI only. This option is meant to be used by legacy applications that rely +on this exit codes: +0 for successful operations that result in no downloads, 1 for successful +downloads, and 2 for errors. + +=back + +=head1 DESCRIPTION + +This script will download the pre-formatted BLAST databases requested in the +command line from the NCBI ftp site. + +=head1 EXIT CODES + +This script returns 0 on successful operations and non-zero on errors. + +=head1 DEPENDENCIES + +This script depends on curl for retrieval from cloud service providers. + +=head1 BUGS + +Please report them to + +=head1 COPYRIGHT + +See PUBLIC DOMAIN NOTICE included at the top of this script. + +=cut diff --git a/docker-qs/Dockerfile-build-from-local-sources b/docker-qs/Dockerfile-build-from-local-sources new file mode 100644 index 0000000..82d552a --- /dev/null +++ b/docker-qs/Dockerfile-build-from-local-sources @@ -0,0 +1,52 @@ +# PUBLIC DOMAIN NOTICE +# National Center for Biotechnology Information +# +# This software is a "United States Government Work" under the +# terms of the United States Copyright Act. It was written as part of +# the authors' official duties as United States Government employees and +# thus cannot be copyrighted. This software is freely available +# to the public for use. The National Library of Medicine and the U.S. +# Government have not placed any restriction on its use or reproduction. +# +# Although all reasonable efforts have been taken to ensure the accuracy +# and reliability of the software and data, the NLM and the U.S. +# Government do not and cannot warrant the performance or results that +# may be obtained by using this software or data. The NLM and the U.S. +# Government disclaim all warranties, express or implied, including +# warranties of performance, merchantability or fitness for any particular +# purpose. +# +# Please cite NCBI in any work or product based on this material. + +FROM alpine:3.14 +ARG version + +LABEL Description="NCBI ElasticBLAST Cloud Job Submission Module" +LABEL Version=${version} +LABEL Vendor="NCBI/NLM/NIH" +LABEL Maintainer=camacho@ncbi.nlm.nih.gov + +COPY requirements.txt . +RUN sed -i '/elastic-blast/d' requirements.txt + +COPY run.sh /usr/bin/ + +RUN chmod +x /usr/bin/run.sh && \ + apk -U upgrade && \ + apk add --no-cache bash python3 py3-pip py3-wheel && \ + pip3 install --no-cache-dir --upgrade pip && \ + pip3 install --no-cache-dir -r requirements.txt && \ + mkdir -p /var/elastic-blast && \ + rm -rf /var/cache/apk/* requirements.txt + +COPY src/ /var/elastic-blast/src/ +COPY bin/ /var/elastic-blast/bin/ +COPY requirements/ /var/elastic-blast/requirements/ +COPY setup.py /var/elastic-blast/setup.py +COPY setup.cfg_cloud /var/elastic-blast/setup.cfg + +WORKDIR /var/elastic-blast + +RUN pip3 install . -r requirements/base.txt + +CMD ["run.sh", "-h"] diff --git a/docker-qs/Makefile b/docker-qs/Makefile index 8253001..7e6e1a2 100644 --- a/docker-qs/Makefile +++ b/docker-qs/Makefile @@ -28,7 +28,7 @@ SHELL=/bin/bash .PHONY: all pre-check check clean build publish gcp-build gcp-check gcp-clean IMG?=ncbi/elasticblast-query-split -VERSION?=0.1.2 +VERSION?=0.1.3 GCP_PROJECT?=$(shell gcloud config get-value project 2>/dev/null) GCP_TEST_BUCKET?=gs://elasticblast-test/query-split-run-test AWS_REGION?=us-east-1 @@ -79,6 +79,15 @@ gcp-list-tagless-images: aws-build: gcloud builds submit --config=awscloudbuild.yaml --substitutions=TAG_NAME="${VERSION}",_IMG="${AWS_IMG}",_SERVER="${AWS_SERVER}",_AWS_ECR_PASSWD="`aws ecr-public get-login-password --region ${AWS_REGION}`" . +# Use this target to build an image from your local sources as opposed to those on PyPI.org +.PHONY: aws-build-from-local-sources +aws-build-from-local-sources: + rsync -a ../setup.py ../setup.cfg_cloud ../src ../bin ../requirements ${PWD}/ + sed -i~ -e '/^value = $${VERSION}/d;' setup.cfg_cloud + echo "value = ${VERSION}" >> setup.cfg_cloud + gcloud builds submit --config awscloudbuild.yaml --substitutions _SERVER=${AWS_SERVER},TAG_NAME=${VERSION},_IMG=${AWS_IMG},_DOCKERFILE=Dockerfile-build-from-local-sources,_AWS_ECR_PASSWD="`aws ecr-public get-login-password --region ${AWS_REGION}`" + rm -fr src bin requirements setup.cfg_cloud setup.py + # This command needs to be run on an instance with docker installed. First build the image using "make build" ECR_REPO=?65123123.dkr.ecr.${AWS_REGION}.amazonaws.com .PHONY: publish-to-ecr diff --git a/docker-qs/awscloudbuild.yaml b/docker-qs/awscloudbuild.yaml index 411a6c7..428cdee 100644 --- a/docker-qs/awscloudbuild.yaml +++ b/docker-qs/awscloudbuild.yaml @@ -1,6 +1,6 @@ steps: - name: 'docker' - args: [ 'build', '-t', '${_IMG}:$TAG_NAME', '-t', '${_IMG}:latest', '.' ] + args: [ 'build', '-t', '${_IMG}:$TAG_NAME', '-t', '${_IMG}:latest', '-f', '${_DOCKERFILE}', '.' ] - name: 'docker' args: [ 'login', '-u', 'AWS', '-p', '${_AWS_ECR_PASSWD}', '${_SERVER}' ] - name: 'docker' diff --git a/docker-qs/run.sh b/docker-qs/run.sh index 8e84249..ec6e84e 100755 --- a/docker-qs/run.sh +++ b/docker-qs/run.sh @@ -76,10 +76,8 @@ fi TMP=`mktemp` if [[ $output_bucket =~ ^s3:// ]]; then - time fasta_split.py $input -l $batch_len -o output -c $TMP - find output -type f -name "batch_*.fa" | xargs -n1 basename > batch_list.txt - time aws s3 cp output $output_bucket/query_batches --recursive --only-show-errors - time aws s3 cp $TMP $output_bucket/metadata/query_length.txt --only-show-errors + time fasta_split.py $input -l $batch_len -o $output_bucket/query_batches -c $output_bucket/metadata/query_length.txt + aws s3 ls $output_bucket/query_batches/batch_ | awk '{print $NF;}' > batch_list.txt time aws s3 cp batch_list.txt $output_bucket/metadata/batch_list.txt --only-show-errors else if [ $copy_only -eq 1 ]; then diff --git a/requirements/base.txt b/requirements/base.txt index 9b09205..9c65565 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,11 +1,11 @@ -wheel == 0.37.1 -setuptools == 56.0.0 -importlib-resources == 5.7.1 -importlib-metadata == 4.11.4 -pex == 2.1.92 -boto3 == 1.24.3 -botocore == 1.27.3 -awslimitchecker == 12.0.0 -tenacity == 8.0.1 -dataclasses-json == 0.5.7 -types-pkg-resources == 0.1.3 +wheel==0.37.0 +setuptools==56.0.0 +importlib-resources==5.9.0 +importlib-metadata==4.12.0 +pex==2.1.103 +boto3==1.24.49 +botocore==1.27.49 +awslimitchecker==12.0.0 +tenacity==8.0.1 +dataclasses-json==0.5.7 +types-pkg-resources==0.1.3 diff --git a/requirements/test.txt b/requirements/test.txt index 0675ff0..980d6f2 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -1,13 +1,13 @@ -r base.txt -pytest == 7.1.2 -pytest-cov == 3.0.0 -pytest-mock == 3.7.0 -teamcity-messages == 1.31 -mypy == 0.961 -pylint == 2.7.4 -tox == 3.25.0 -yamllint == 1.26.3 -moto == 3.1.12 -docker == 5.0.3 -cfn-lint == 0.61.0 +pytest==7.1.2 +pytest-cov==3.0.0 +pytest-mock==3.8.2 +teamcity-messages==1.31 +mypy==0.971 +pylint==2.7.4 +tox==3.25.1 +yamllint==1.27.1 +moto==3.1.17 +docker==5.0.3 +cfn-lint==0.61.4 diff --git a/src/elastic_blast/commands/submit.py b/src/elastic_blast/commands/submit.py index 60928fa..6111437 100755 --- a/src/elastic_blast/commands/submit.py +++ b/src/elastic_blast/commands/submit.py @@ -38,7 +38,6 @@ from elastic_blast.filehelper import open_for_read, open_for_read_iter, open_for_write_immediate from elastic_blast.filehelper import check_for_read, check_dir_for_write, cleanup_temp_bucket_dirs from elastic_blast.filehelper import get_length, harvest_query_splitting_results -from elastic_blast.object_storage_utils import write_to_s3 from elastic_blast.split import FASTAReader from elastic_blast.gcp import enable_gcp_api from elastic_blast.gcp import check_cluster as gcp_check_cluster @@ -78,7 +77,8 @@ def prepare_1_stage(cfg: ElasticBlastConfig, query_files): """ Prepare data for 1 stage cloud query split on AWS """ query_file = query_files[0] # Get file length as approximation of sequence length - query_length = get_length(query_file) + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + query_length = get_length(query_file, gcp_prj = gcp_prj) if query_file.endswith('.gz'): query_length = query_length * 4 # approximation again batch_len = cfg.blast.batch_len @@ -95,11 +95,8 @@ def write_config_to_metadata(cfg): # FIXME: refactor this code into object_storage_utils cfg_text = cfg.to_json() dst = os.path.join(cfg.cluster.results, ELB_METADATA_DIR, ELB_META_CONFIG_FILE) - if cfg.cloud_provider.cloud == CSP.AWS: - write_to_s3(dst, cfg_text) - else: - with open_for_write_immediate(dst) as f: - f.write(cfg_text) + with open_for_write_immediate(dst) as f: + f.write(cfg_text) # TODO: use cfg only when args.wait, args.sync, and args.run_label are replicated in cfg @@ -144,7 +141,8 @@ def submit(args, cfg, clean_up_stack): # check database availability try: - get_blastdb_size(cfg.blast.db, cfg.cluster.db_source) + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + get_blastdb_size(cfg.blast.db, cfg.cluster.db_source, gcp_prj) except ValueError as err: raise UserReportError(returncode=BLASTDB_ERROR, message=str(err)) @@ -182,12 +180,13 @@ def check_running_cluster(cfg: ElasticBlastConfig) -> bool: if cfg.cluster.dry_run: return False metadata_dir = os.path.join(cfg.cluster.results, ELB_METADATA_DIR) + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project if cfg.cloud_provider.cloud == CSP.AWS: metadata_file = os.path.join(metadata_dir, ELB_AWS_JOB_IDS) else: metadata_file = os.path.join(metadata_dir, ELB_STATE_DISK_ID_FILE) try: - check_for_read(metadata_file) + check_for_read(metadata_file, gcp_prj=gcp_prj) return True except FileNotFoundError: pass @@ -209,8 +208,9 @@ def check_submit_data(query_files: List[str], cfg: ElasticBlastConfig) -> None: """ dry_run = cfg.cluster.dry_run try: + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project for query_file in query_files: - check_for_read(query_file, dry_run, True) + check_for_read(query_file, dry_run, True, gcp_prj) except FileNotFoundError: raise UserReportError(INPUT_ERROR, f'Query input {query_file} is not readable or does not exist') bucket = cfg.cluster.results @@ -239,14 +239,15 @@ def split_query(query_files: List[str], cfg: ElasticBlastConfig) -> Tuple[List[s queries = [os.path.join(out_path, f'batch_{x:03d}.fa') for x in range(10)] logging.info(f'Splitting queries and writing batches to {out_path}') else: - reader = FASTAReader(open_for_read_iter(query_files), batch_len, out_path) + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + reader = FASTAReader(open_for_read_iter(query_files, gcp_prj), batch_len, out_path) query_length, queries = reader.read_and_cut() logging.info(f'{len(queries)} batches, {query_length} base/residue total') if len(queries) < num_concurrent_blast_jobs: adjusted_batch_len = int(query_length/num_concurrent_blast_jobs) msg = f'The provided elastic-blast configuration is sub-optimal as the query was split into {len(queries)} batch(es) and elastic-blast can run up to {num_concurrent_blast_jobs} concurrent BLAST jobs. elastic-blast changed the batch-len parameter to {adjusted_batch_len} to maximize resource utilization and improve performance.' logging.info(msg) - reader = FASTAReader(open_for_read_iter(query_files), adjusted_batch_len, out_path) + reader = FASTAReader(open_for_read_iter(query_files, gcp_prj), adjusted_batch_len, out_path) query_length, queries = reader.read_and_cut() logging.info(f'Re-computed {len(queries)} batches, {query_length} base/residue total') end = timer() @@ -260,9 +261,10 @@ def assemble_query_file_list(cfg: ElasticBlastConfig) -> List[str]: is considered a list of files, otherwise it is a FASTA file with queries.""" msg = [] query_files = [] + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project for query_file in cfg.blast.queries_arg.split(): if query_file.endswith(QUERY_LIST_EXT): - with open_for_read(query_file) as f: + with open_for_read(query_file, gcp_prj) as f: for line in f: if len(line.rstrip()) == 0: continue diff --git a/src/elastic_blast/config.py b/src/elastic_blast/config.py index f27c4e7..21e219d 100644 --- a/src/elastic_blast/config.py +++ b/src/elastic_blast/config.py @@ -117,6 +117,8 @@ def configure(args: argparse.Namespace) -> configparser.ConfigParser: retval[CFG_BLAST][CFG_BLAST_QUERY] = args.query if hasattr(args, CFG_BLAST_DB) and args.db: retval[CFG_BLAST][CFG_BLAST_DB] = args.db + if hasattr(args, CFG_BLAST_BATCH_LEN.replace('-', '_')) and args.batch_len: + retval[CFG_BLAST][CFG_BLAST_BATCH_LEN] = str(args.batch_len) if hasattr(args, 'blast_opts') and args.blast_opts: if args.blast_opts[0] == '--': args.blast_opts.pop(0) diff --git a/src/elastic_blast/constants.py b/src/elastic_blast/constants.py index d843b7c..2146714 100644 --- a/src/elastic_blast/constants.py +++ b/src/elastic_blast/constants.py @@ -202,8 +202,8 @@ def __str__(self): ELB_DFLT_AWS_REGION = 'us-east-1' ELB_UNKNOWN_GCP_PROJECT = 'elb-unknown-gcp-project' -ELB_DOCKER_VERSION = '1.1.0' -ELB_QS_DOCKER_VERSION = '0.1.2' +ELB_DOCKER_VERSION = '1.1.1' +ELB_QS_DOCKER_VERSION = '0.1.3' ELB_JANITOR_DOCKER_VERSION = '0.2.0' ELB_JOB_SUBMIT_DOCKER_VERSION = '2.0.0' diff --git a/src/elastic_blast/db_metadata.py b/src/elastic_blast/db_metadata.py index 410e510..bb103ef 100644 --- a/src/elastic_blast/db_metadata.py +++ b/src/elastic_blast/db_metadata.py @@ -30,7 +30,7 @@ from dataclasses_json import dataclass_json, Undefined, LetterCase from json.decoder import JSONDecodeError from marshmallow.exceptions import ValidationError -from typing import List +from typing import List, Optional from .constants import MolType, ELB_S3_PREFIX, ELB_GCS_PREFIX, BLASTDB_ERROR from .filehelper import open_for_read, check_for_read from .base import DBSource @@ -55,7 +55,7 @@ class DbMetadata: number_of_volumes: int -def get_db_metadata(db: str, dbtype: MolType, source: DBSource, dry_run: bool = False) -> DbMetadata: +def get_db_metadata(db: str, dbtype: MolType, source: DBSource, dry_run: bool = False, gcp_prj: Optional[str] = None) -> DbMetadata: """ Read database metadata. @@ -79,24 +79,26 @@ def get_db_metadata(db: str, dbtype: MolType, source: DBSource, dry_run: bool = if source == DBSource.AWS or source == DBSource.GCP: bucket = DB_BUCKET_AWS if source == DBSource.AWS else DB_BUCKET_GCP try: - with open_for_read(f'{bucket}/latest-dir') as f: + with open_for_read(f'{bucket}/latest-dir', gcp_prj) as f: db_path = os.path.join(f'{bucket}/{f.readline().rstrip()}', db) except: - raise UserReportError(returncode=BLASTDB_ERROR, message=f'File "{bucket}/latest-dir" could not be read') + msg = f'File "{bucket}/latest-dir" could not be read' + logging.error(msg) + raise UserReportError(returncode=BLASTDB_ERROR, message=msg) else: db_path = os.path.join(f'{DB_BUCKET_NCBI}', db) # try metadata file version 1.2 first; if not found try version 1.1 try: metadata_file = f'{db_path}{metadata_suffix_v12}' logging.debug(f'BLASTDB metadata file: {metadata_file}') - check_for_read(metadata_file, dry_run) + check_for_read(metadata_file, dry_run, gcp_prj=gcp_prj) except FileNotFoundError: metadata_file = f'{db_path}{metadata_suffix_v11}' logging.debug(f'BLASTDB metadata file: {metadata_file}') - check_for_read(metadata_file, dry_run) + check_for_read(metadata_file, dry_run, gcp_prj=gcp_prj) try: - with open_for_read(metadata_file) as f: + with open_for_read(metadata_file, gcp_prj) as f: lines = f.readlines() if isinstance(lines[0], bytes): lines = [s.decode() for s in lines] diff --git a/src/elastic_blast/elasticblast.py b/src/elastic_blast/elasticblast.py index 26b6793..3467363 100644 --- a/src/elastic_blast/elasticblast.py +++ b/src/elastic_blast/elasticblast.py @@ -118,15 +118,16 @@ def _status_from_results(self): """ cfg = self.cfg status = ElbStatus.UNKNOWN + gcp_prj = None if cfg.aws else cfg.gcp.project try: failure_file = os.path.join(cfg.cluster.results, ELB_METADATA_DIR, ELB_STATUS_FAILURE) - check_for_read(failure_file, self.dry_run) + check_for_read(failure_file, self.dry_run, gcp_prj=gcp_prj) except FileNotFoundError: pass else: status = ElbStatus.FAILURE self.cached_status = status - with open_for_read(failure_file) as f: + with open_for_read(failure_file, gcp_prj) as f: res = f.read() if res: self.cached_failure_message = res @@ -134,7 +135,7 @@ def _status_from_results(self): try: done_file = os.path.join(cfg.cluster.results, ELB_METADATA_DIR, ELB_STATUS_SUCCESS) - check_for_read(done_file, self.dry_run) + check_for_read(done_file, self.dry_run, gcp_prj=gcp_prj) except FileNotFoundError: pass else: diff --git a/src/elastic_blast/elb_config.py b/src/elastic_blast/elb_config.py index d029937..69d8165 100644 --- a/src/elastic_blast/elb_config.py +++ b/src/elastic_blast/elb_config.py @@ -88,7 +88,7 @@ from .constants import ELB_DFLT_AWS_REGION, ELB_UNKNOWN_GCP_PROJECT from .util import validate_gcp_string, check_aws_region_for_invalid_characters from .util import validate_gke_cluster_name, ElbSupportedPrograms -from .util import get_query_batch_size +from .util import get_query_batch_size, get_gcp_project from .util import UserReportError, safe_exec from .util import gcp_get_regions, sanitize_for_k8s from .gcp_traits import get_machine_properties as gcp_get_machine_properties @@ -174,10 +174,15 @@ def validate(self, dry_run: bool = False): """ Validate the value of this object is one of the valid AWS regions. Requires AWS credentials to invoke proper APIs """ if dry_run: return - regions = aws_get_regions() - if str(self) not in regions: - msg = f'{str(self)} is not a valid AWS region' - raise ValueError(msg) + try: + boto_cfg = create_aws_config(str(self)) + regions = aws_get_regions(boto_cfg) + except: + regions = [] + finally: + if str(self) not in regions: + msg = f'{str(self)} is not a valid AWS region' + raise ValueError(msg) class BLASTProgram(str): @@ -238,16 +243,15 @@ def __post_init__(self): self.cloud = CSP.GCP self.user = ELB_UNKNOWN + # FIXME: need to pass dry-run to this method p = safe_exec('gcloud config get-value account') if p.stdout: self.user = p.stdout.decode('utf-8').rstrip() + logging.debug(f'gcloud returned "{self.user}"') if self.project == ELB_UNKNOWN_GCP_PROJECT: proj = get_gcp_project() - if not proj: - raise ValueError(f'GCP project is unset, please invoke gcloud config set project REPLACE_WITH_YOUR_PROJECT_NAME_HERE') - else: - self.project = GCPString(proj) + self.project = GCPString(proj) def validate(self, errors: List[str], task: ElbCommand): """Validate config""" @@ -608,11 +612,23 @@ def __init__(self, *args, **kwargs): except ValueError as err: raise UserReportError(returncode=INPUT_ERROR, message=str(err)) + # For custom BLASTDBs, check that they reside in the appropriate cloud + if self.blast and self.blast.db and '://' in self.blast.db: + if self.cloud_provider.cloud == CSP.GCP and \ + not self.blast.db.startswith(ELB_GCS_PREFIX): + msg = f'User database {self.blast.db} must reside in Google Cloud Storage (GCS)"' + raise UserReportError(returncode=BLASTDB_ERROR, message=msg) + elif self.cloud_provider.cloud == CSP.AWS and \ + not self.blast.db.startswith(ELB_S3_PREFIX): + msg = f'User database {self.blast.db} must reside in AWS S3"' + raise UserReportError(returncode=BLASTDB_ERROR, message=msg) + # get database metadata - if self.blast and not self.blast.db_metadata: + if self.blast and not self.blast.db_metadata and not self.cluster.dry_run: try: + gcp_prj = None if self.cloud_provider.cloud == CSP.AWS else self.gcp.project self.blast.db_metadata = get_db_metadata(self.blast.db, ElbSupportedPrograms().get_db_mol_type(self.blast.program), - self.cluster.db_source) + self.cluster.db_source, gcp_prj=gcp_prj) except FileNotFoundError: # database metadata file is not mandatory for a user database (yet) EB-1308 logging.info('No database metadata') @@ -669,7 +685,7 @@ def __init__(self, *args, **kwargs): logging.debug(f'Requested number of vCPUs lowered to {self.cluster.num_cpus} because of instance type choice {self.cluster.machine_type}') # default memory limit for a blast search job - if self.cluster.mem_limit == ELB_NOT_INITIALIZED_MEM: + if self.cluster.mem_limit == ELB_NOT_INITIALIZED_MEM and not self.cluster.dry_run: if self.cluster.machine_type == 'optimal': msg = 'You specified "optimal" cluster.machine-type, which requires configuring blast.mem-limit. Please provide that configuration parameter or change cluster.machine-type.' raise UserReportError(returncode=INPUT_ERROR, message=msg) @@ -679,7 +695,7 @@ def __init__(self, *args, **kwargs): cloud_region=self.cloud_provider.region) # set batch length - if self.blast: + if self.blast and not self.cluster.dry_run: if self.blast.batch_len == ELB_NOT_INITIALIZED_NUM: self.blast.batch_len = get_batch_length(self.cloud_provider.cloud, self.blast.program, @@ -1172,27 +1188,3 @@ def default(self, o): else: return json.JSONEncoder(self, o) - -def get_gcp_project() -> Optional[str]: - """Return current GCP project or None if the property is unset. - - Raises: - util.SafeExecError on problems with command line gcloud - RuntimeError if gcloud run is successful, but the result is empty""" - cmd: str = 'gcloud config get-value project' - p = safe_exec(cmd) - result: Optional[str] - - # the result should not be empty, for unset properties gcloud returns the - # string: '(unset)' to stderr - if not p.stdout and not p.stderr: - raise RuntimeError('Current GCP project could not be established') - - result = p.stdout.decode().split('\n')[0] - - # return None if project is unset - if result == '(unset)': - result = None - return result - - diff --git a/src/elastic_blast/filehelper.py b/src/elastic_blast/filehelper.py index d1020fb..919c369 100644 --- a/src/elastic_blast/filehelper.py +++ b/src/elastic_blast/filehelper.py @@ -32,14 +32,15 @@ Author: Victor Joukov joukovv@ncbi.nlm.nih.gov """ -import subprocess, os, io, gzip, tarfile, re, tempfile, shutil +import subprocess, os, io, gzip, tarfile, re, tempfile, shutil, sys import logging import urllib.request from string import digits from random import sample from timeit import default_timer as timer +from contextlib import contextmanager from tenacity import retry, stop_after_attempt, wait_exponential -from typing import Dict, IO, Tuple, Iterable, Generator, TextIO, List +from typing import Dict, IO, Tuple, Iterable, Generator, TextIO, List, Optional import boto3 # type: ignore from botocore.exceptions import ClientError # type: ignore @@ -234,14 +235,42 @@ def check_dir_for_write(dirname: str, dry_run=False) -> None: raise PermissionError() +@contextmanager def open_for_write_immediate(fname): - " Open file on GCS filesystem for write in text mode. " - if not fname.startswith(ELB_GCS_PREFIX): - raise NotImplementedError("Immediate writing to cloud storage implemented only for GS") - proc = subprocess.Popen(['gsutil', 'cp', '-', fname], - stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, - universal_newlines=True) - return proc.stdin + """ Open a file in a cloud bucket for write in text mode. """ + if fname.startswith(ELB_GCS_PREFIX): + proc = subprocess.Popen(['gsutil', 'cp', '-', fname], + stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, + universal_newlines=True) + f = proc.stdin + elif fname.startswith(ELB_S3_PREFIX): + f = io.TextIOWrapper(buffer=io.BytesIO(), encoding='utf-8') + s3 = boto3.resource('s3') + + else: + f = open(fname, 'w') + + try: + yield f + + except: + if fname.startswith(ELB_S3_PREFIX): + f.close() + raise + else: + if fname.startswith(ELB_S3_PREFIX): + f.flush() + buffer = f.detach() + buffer.seek(0) + bufsize = buffer.getbuffer().nbytes + logging.debug(f'Attempting to stream {bufsize} bytes to {fname}') + + bucket, key = parse_bucket_name_key(fname) + obj = s3.Object(bucket, key) + obj.upload_fileobj(buffer) + buffer.close() + logging.debug(f'Uploaded {fname}') + def open_for_write(fname): """ Open file on either local (no prefix), GCS, or AWS S3 @@ -249,7 +278,9 @@ def open_for_write(fname): copy_to_bucket is called. """ global bucket_temp_dirs - if fname.startswith(ELB_GCS_PREFIX) or fname.startswith(ELB_S3_PREFIX): + if fname.startswith(ELB_S3_PREFIX): + return open_for_write_immediate(fname) + if fname.startswith(ELB_GCS_PREFIX): # for the same gs path open files in temp dir and put it into # bucket_temp_dirs dictionary, copy through to bucket in copy_to_bucket later last_slash = fname.rfind('/') @@ -321,7 +352,7 @@ def readline(self): # are typing problems, which may indicate incompatibility between classes. # We need tests checking downstream logic for these different return types. # (EB-340) -def unpack_stream(s:IO, gzipped:bool, tarred:bool) -> IO: +def unpack_stream(s:Optional[IO], gzipped:bool, tarred:bool) -> IO: """ Helper function which inserts uncompressing/unarchiving transformers as needed depending on detected file type """ @@ -333,12 +364,17 @@ def unpack_stream(s:IO, gzipped:bool, tarred:bool) -> IO: return io.TextIOWrapper(gzip.GzipFile(fileobj=s)) if gzipped else s #type: ignore -def check_for_read(fname: str, dry_run : bool = False, print_file_size: bool = False) -> None: +def check_for_read(fname: str, dry_run : bool = False, print_file_size: bool = False, + gcp_prj: Optional[str] = None) -> None: """ Check that path on local, GS, AWS S3 or URL-available filesystem can be read from. raises FileNotFoundError if there is no such file """ + if is_stdin(fname): + return if fname.startswith(ELB_GCS_PREFIX): - cmd = f'gsutil stat {fname}' if print_file_size else f'gsutil -q stat {fname}' + if not gcp_prj: + raise ValueError(f'elastic_blast.filehelper.check_for_read is missing the gcp_prj parameter') + cmd = f'gsutil -u {gcp_prj} stat {fname}' if print_file_size else f'gsutil -u {gcp_prj} -q stat {fname}' if dry_run: logging.info(cmd) return @@ -389,12 +425,14 @@ def check_for_read(fname: str, dry_run : bool = False, print_file_size: bool = F open(fname, 'r') -def get_length(fname: str, dry_run=False) -> int: +def get_length(fname: str, dry_run: bool = False, gcp_prj: Optional[str] = None) -> int: """ Get length of a path on local, GS, AWS S3, or URL-available filesystem. raises FileNotFoundError if there is no such file """ if fname.startswith(ELB_GCS_PREFIX): - cmd = f'gsutil stat {fname}' + if not gcp_prj: + raise ValueError(f'elastic_blast.filehelper.get_length is missing the gcp_prj parameter') + cmd = f'gsutil -u {gcp_prj} stat {fname}' if dry_run: logging.info(cmd) return 10000 # Arbitrary fake length @@ -433,22 +471,25 @@ def get_length(fname: str, dry_run=False) -> int: error_report_funcs = {} -def open_for_read(fname): - """ Open path for read on local, GS, or URL-available filesystem defined by prefix. - File can be gzipped, and archived with tar. +def open_for_read(fname: str, gcp_prj: Optional[str] = None): + """ Open path for read on local, GS, URL-available filesystem defined by prefix, + or stdin. File can be gzipped, and archived with tar. """ global error_report_funcs gzipped = fname[-3:] == ".gz" - tarred = re.match(r'^.*\.(tar(|\.gz|\.bz2)|tgz)$', fname) + tarred = re.match(r'^.*\.(tar(|\.gz|\.bz2)|tgz)$', fname) is not None binary = gzipped or tarred mode = 'rb' if binary else 'rt' if fname.startswith(ELB_GCS_PREFIX): - proc = subprocess.Popen(['gsutil', 'cat', fname], + if not gcp_prj: + raise ValueError(f'elastic_blast.filehelper.open_for_read is missing the gcp_prj parameter') + proc = subprocess.Popen(['gsutil', '-u', gcp_prj, 'cat', fname], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=not binary) fileobj = unpack_stream(proc.stdout, gzipped, tarred) - error_report_funcs[fileobj] = proc.stderr.read + if proc.stderr: + error_report_funcs[fileobj] = proc.stderr.read return fileobj if fname.startswith('s3'): s3 = boto3.client('s3') @@ -467,12 +508,15 @@ def open_for_read(fname): if fname.startswith(ELB_HTTP_PREFIX) or fname.startswith(ELB_FTP_PREFIX): response = urllib.request.urlopen(fname) return unpack_stream(response, gzipped, tarred) - # regular file - f = open(fname, mode) + # regular file or stdin + if is_stdin(fname): + f : IO = sys.stdin + else: + f = open(fname, mode) return unpack_stream(f, gzipped, tarred) -def open_for_read_iter(fnames: Iterable[str]) -> Generator[TextIO, None, None]: +def open_for_read_iter(fnames: Iterable[str], gcp_prj: Optional[str] = None) -> Generator[TextIO, None, None]: """Generator function that Iterates over paths/uris and open them for reading. @@ -482,7 +526,7 @@ def open_for_read_iter(fnames: Iterable[str]) -> Generator[TextIO, None, None]: Returns: Generator of files open for reading""" for fname in fnames: - with open_for_read(fname) as f: + with open_for_read(fname, gcp_prj) as f: yield f @@ -520,3 +564,7 @@ def _is_local_file(filename: str) -> bool: return False return True + +def is_stdin(fname: str) -> bool: + """Check if a filename represents stdin""" + return fname == 'stdin' or fname == '-' diff --git a/src/elastic_blast/gcp.py b/src/elastic_blast/gcp.py index c91d5d5..4e0d7e6 100644 --- a/src/elastic_blast/gcp.py +++ b/src/elastic_blast/gcp.py @@ -384,7 +384,7 @@ def job_substitutions(self) -> Dict[str, str]: cfg = self.cfg usage_reporting = get_usage_reporting() - db, _, db_label = get_blastdb_info(cfg.blast.db) + db, _, db_label = get_blastdb_info(cfg.blast.db, cfg.gcp.project) blast_program = cfg.blast.program diff --git a/src/elastic_blast/kubernetes.py b/src/elastic_blast/kubernetes.py index 8e233e7..7a54af6 100644 --- a/src/elastic_blast/kubernetes.py +++ b/src/elastic_blast/kubernetes.py @@ -356,7 +356,7 @@ def initialize_storage(cfg: ElasticBlastConfig, query_files: List[str] = [], wai def initialize_local_ssd(cfg: ElasticBlastConfig, query_files: List[str] = [], wait=ElbExecutionMode.WAIT) -> None: """ Initialize local SSDs for ElasticBLAST cluster """ - db, db_path, _ = get_blastdb_info(cfg.blast.db) + db, db_path, _ = get_blastdb_info(cfg.blast.db, cfg.gcp.project) if not db: raise ValueError("Config parameter 'db' can't be empty") dry_run = cfg.cluster.dry_run @@ -397,7 +397,7 @@ def initialize_local_ssd(cfg: ElasticBlastConfig, query_files: List[str] = [], w taxdb_path = '' if db_path: # Custom database - taxdb_path = gcp_get_blastdb_latest_path() + '/taxdb.*' + taxdb_path = gcp_get_blastdb_latest_path(cfg.gcp.project) + '/taxdb.*' subs = { 'ELB_DB': db, 'ELB_DB_PATH': db_path, @@ -484,7 +484,7 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = """ # ${LOGDATETIME} setup_pd start >>${ELB_LOGFILE} - db, db_path, _ = get_blastdb_info(cfg.blast.db) + db, db_path, _ = get_blastdb_info(cfg.blast.db, cfg.gcp.project) if not db: raise ValueError("Config parameter 'db' can't be empty") cluster_name = cfg.cluster.name @@ -494,7 +494,7 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = taxdb_path = '' if db_path: # Custom database - taxdb_path = gcp_get_blastdb_latest_path() + '/taxdb.*' + taxdb_path = gcp_get_blastdb_latest_path(cfg.gcp.project) + '/taxdb.*' results_bucket = cfg.cluster.results dry_run = cfg.cluster.dry_run diff --git a/src/elastic_blast/object_storage_utils.py b/src/elastic_blast/object_storage_utils.py index add098c..f104478 100644 --- a/src/elastic_blast/object_storage_utils.py +++ b/src/elastic_blast/object_storage_utils.py @@ -39,6 +39,8 @@ def write_to_s3(dest: str, contents: str, boto_cfg: Config = None, dry_run: bool contents: what to write into said S3 object boto_cfg: boto3 library configuration dry_run: if True, does nothing + + See also: elastic_blast.filehelper.open_for_write_immediate as an alternative to stream data to S3 """ if dry_run: logging.debug(f'Would have written "{contents}" to {dest}') diff --git a/src/elastic_blast/templates/job-init-local-ssd.yaml.template b/src/elastic_blast/templates/job-init-local-ssd.yaml.template index 921a1ba..54ab096 100644 --- a/src/elastic_blast/templates/job-init-local-ssd.yaml.template +++ b/src/elastic_blast/templates/job-init-local-ssd.yaml.template @@ -33,8 +33,8 @@ spec: sleep 30; log() { ts=`date +'%F %T'`; printf '%s RUNTIME %s %f seconds\n' "$ts" "$1" "$2"; }; if [ -z '${ELB_DB_PATH}' ]; then - echo update_blastdb.pl ${ELB_DB} --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; - update_blastdb.pl ${ELB_DB} --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; + echo update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; + update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; exit_code=$?; else echo gsutil -mq cp ${ELB_DB_PATH} .; @@ -42,11 +42,10 @@ spec: exit_code=$?; [ $exit_code -eq 0 ] || exit $exit_code; [ -f ${ELB_DB}.tar.gz ] && tar xzf ${ELB_DB}.tar.gz; - echo gsutil -mq cp ${ELB_TAX_DB_PATH} .; - gsutil -mq cp ${ELB_TAX_DB_PATH} .; - exit_code=$?; [ -f ${ELB_DB}.tar.gz ] && rm ${ELB_DB}.tar.gz; fi; + echo update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; + update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; end=`date +%s`; log "download-blastdbs" $(($end-$start)); [ $exit_code -eq 0 ] || exit $exit_code; diff --git a/src/elastic_blast/templates/job-init-pv.yaml.template b/src/elastic_blast/templates/job-init-pv.yaml.template index 0826189..f2f318e 100644 --- a/src/elastic_blast/templates/job-init-pv.yaml.template +++ b/src/elastic_blast/templates/job-init-pv.yaml.template @@ -33,20 +33,20 @@ spec: start=`date +%s`; log() { ts=`date +'%F %T'`; printf '%s RUNTIME %s %f seconds\n' "$ts" "$1" "$2"; }; if [ -z '${ELB_DB_PATH}' ]; then - echo update_blastdb.pl ${ELB_DB} --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; - update_blastdb.pl ${ELB_DB} --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; + echo update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; + update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; exit_code=$?; + [ $exit_code -eq 0 ] || exit $exit_code; else echo gsutil -mq cp ${ELB_DB_PATH} .; gsutil -mq cp ${ELB_DB_PATH} .; exit_code=$?; [ $exit_code -eq 0 ] || exit $exit_code; [ -f ${ELB_DB}.tar.gz ] && tar xzf ${ELB_DB}.tar.gz; - echo gsutil -mq cp ${ELB_TAX_DB_PATH} .; - gsutil -mq cp ${ELB_TAX_DB_PATH} .; - exit_code=$?; [ -f ${ELB_DB}.tar.gz ] && rm ${ELB_DB}.tar.gz; fi; + echo update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; + update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; end=`date +%s`; log "download-blastdbs" $(($end-$start)); [ $exit_code -eq 0 ] || exit $exit_code; diff --git a/src/elastic_blast/tuner.py b/src/elastic_blast/tuner.py index bdccea1..799fa02 100644 --- a/src/elastic_blast/tuner.py +++ b/src/elastic_blast/tuner.py @@ -30,7 +30,6 @@ from bisect import bisect_left import math from typing import Optional -from .filehelper import open_for_read from .constants import BLASTDB_ERROR, INPUT_ERROR from .constants import UNKNOWN_ERROR, MolType, CSP from .constants import ELB_S3_PREFIX, ELB_GCS_PREFIX diff --git a/src/elastic_blast/util.py b/src/elastic_blast/util.py index 52271e4..9bee3bb 100644 --- a/src/elastic_blast/util.py +++ b/src/elastic_blast/util.py @@ -32,12 +32,13 @@ import subprocess import datetime import json +import inspect from functools import reduce from pkg_resources import resource_exists -from typing import List, Union, Callable +from typing import List, Union, Callable, Optional from .constants import MolType, GCS_DFLT_BUCKET from .constants import DEPENDENCY_ERROR, AWS_MAX_TAG_LENGTH, GCP_MAX_LABEL_LENGTH -from .constants import AWS_MAX_JOBNAME_LENGTH, CSP +from .constants import AWS_MAX_JOBNAME_LENGTH, CSP, ELB_GCS_PREFIX from .constants import ELB_DFLT_LOGLEVEL, ELB_DFLT_LOGFILE from .base import DBSource @@ -206,7 +207,7 @@ def safe_exec(cmd: Union[List[str], str]) -> subprocess.CompletedProcess: return p -def get_blastdb_info(blastdb: str): +def get_blastdb_info(blastdb: str, gcp_prj: Optional[str] = None): """Get BLAST database short name, path (if applicable), and label for Kubernetes. Gets user provided database from configuration. For custom database finds short name from full path, and provides @@ -219,10 +220,12 @@ def get_blastdb_info(blastdb: str): """ db = blastdb db_path = '' - if db.startswith('gs://'): + if db.startswith(ELB_GCS_PREFIX): # Custom database, just check the presence try: - proc = safe_exec(f'gsutil ls {db}.*') + if not gcp_prj: + raise ValueError(f'elastic_blast.util.get_blastdb_info is missing the gcp_prj parameter') + proc = safe_exec(f'gsutil -u {gcp_prj} ls {db}.*') except SafeExecError: raise ValueError(f'Error requesting for {db}.*') output = proc.stdout.decode() @@ -238,43 +241,49 @@ def get_blastdb_info(blastdb: str): return db, db_path, sanitize_for_k8s(db) -def get_blastdb_size(db: str, db_source: DBSource) -> float: +def get_blastdb_size(db: str, db_source: DBSource, gcp_prj: Optional[str] = None) -> float: """Request blast database size from GCP using gcp module If applied to custom db, just check the presence Returns the size in GB, if not found raises ValueError exception cfg: application configuration object """ - if db.startswith('gs://'): + if db.startswith(ELB_GCS_PREFIX): # Custom database, just check the presence try: - safe_exec(f'gsutil ls {db}.*') + if not gcp_prj: + raise ValueError(f'elastic_blast.util.get_blastdb_size is missing the gcp_prj parameter') + safe_exec(f'gsutil -u {gcp_prj} ls {db}.*') except SafeExecError: raise ValueError(f'BLAST database {db} was not found') # TODO: find a way to check custom DB size w/o transferring it to user machine return 1000000 if db_source == DBSource.GCP: - return gcp_get_blastdb_size(db) + if not gcp_prj: + raise ValueError(f'elastic_blast.util.get_blastdb_size is missing the gcp_prj parameter') + return gcp_get_blastdb_size(db, str(gcp_prj)) elif db_source == DBSource.AWS: return 1000000 # FIXME raise NotImplementedError("Not implemented for sources other than GCP") -def gcp_get_blastdb_latest_path() -> str: +def gcp_get_blastdb_latest_path(gcp_prj: str) -> str: """Get latest path of GCP-based blastdb repository""" - cmd = f'gsutil cat {GCS_DFLT_BUCKET}/latest-dir' + if not gcp_prj: + raise ValueError(f'elastic_blast.util.gcp_get_blastdb_latest_path is missing the gcp_prj parameter') + cmd = f'gsutil -u {gcp_prj} cat {GCS_DFLT_BUCKET}/latest-dir' proc = safe_exec(cmd) return os.path.join(GCS_DFLT_BUCKET, proc.stdout.decode().rstrip()) -def gcp_get_blastdb_size(db: str) -> float: +def gcp_get_blastdb_size(db: str, gcp_prj: str) -> float: """Request blast database size from GCP using gsutil Returns the size in GB, if not found raises ValueError exception db: database name """ - latest_path = gcp_get_blastdb_latest_path() - cmd = f'gsutil cat {latest_path}/blastdb-manifest.json' + latest_path = gcp_get_blastdb_latest_path(gcp_prj) + cmd = f'gsutil -u {gcp_prj} cat {latest_path}/blastdb-manifest.json' proc = safe_exec(cmd) blastdb_metadata = json.loads(proc.stdout.decode()) if not db in blastdb_metadata: @@ -560,3 +569,73 @@ def get_resubmission_error_msg(results: str, cloud: CSP) -> str: else: retval += f'gsutil -qm rm -r {results}' return retval + + +def get_gcp_project() -> Optional[str]: + """Return current GCP project as configured in gcloud. + + Raises: + util.SafeExecError on problems with command line gcloud + ValueError if gcloud run is successful, but the project is not set""" + cmd: str = 'gcloud config get-value project' + p = safe_exec(cmd) + result: Optional[str] + + # the result should not be empty, for unset properties gcloud returns the + # string: '(unset)' to stderr + if not p.stdout and not p.stderr: + raise RuntimeError('Current GCP project could not be established') + + result = p.stdout.decode().split('\n')[0] + logging.debug(f'gcloud returned "{result}"') + + # return None if project is unset + if result == '(unset)': + result = None + if not result: + raise ValueError(f'GCP project is unset, please invoke gcloud config set project REPLACE_WITH_YOUR_PROJECT_NAME_HERE') + return result + + +class MetaFileName(type): + """ Auxiliary class to get the source file name """ + def __repr__(self): + callerframerecord = inspect.stack()[1] + frame = callerframerecord[0] + info = inspect.getframeinfo(frame) + return str(info.filename) + + +class __FILE__(metaclass=MetaFileName): + """ Auxiliary class to get the source file name """ + pass + + +class MetaFileLine(type): + """ Auxiliary class to get the source file number """ + def __repr__(self): + callerframerecord = inspect.stack()[1] + frame = callerframerecord[0] + info = inspect.getframeinfo(frame) + return str(info.lineno) + + +class __LINE__(metaclass=MetaFileLine): + """ Auxiliary class to get the source file number """ + pass + + +class MetaFunction(type): + """ Auxiliary class to get the current function name """ + def __repr__(self): + callerframerecord = inspect.stack()[1] + frame = callerframerecord[0] + info = inspect.getframeinfo(frame) + return str(info.function) + + +class __FUNCTION__(metaclass=MetaFunction): + """ Auxiliary class to get the current function name """ + pass + + diff --git a/tests/app/test_elasticblast.py b/tests/app/test_elasticblast.py index 675367b..c73d673 100644 --- a/tests/app/test_elasticblast.py +++ b/tests/app/test_elasticblast.py @@ -119,7 +119,6 @@ def app_mocks(caplog, aws_credentials, gke_mock, mocker): #TODO: These function should be mocked at the level of cloud API calls mocker.patch('tests.app.elastic_blast_app.clean_up', new=MagicMock(return_value=[])) mocker.patch('elastic_blast.commands.submit.check_resource_quotas', new=MagicMock(return_value=None)) - mocker.patch('elastic_blast.commands.submit.write_to_s3', new=MagicMock(return_value=None)) mocker.patch('elastic_blast.elasticblast.copy_to_bucket', new=MagicMock(return_value=None)) mocker.patch(target='elastic_blast.elb_config.aws_get_machine_properties', new=MagicMock(return_value=InstanceProperties(32, 120))) mocker.patch('elastic_blast.elasticblast_factory.ElasticBlastAws', new=MagicMock(return_value=MagicMock())) @@ -242,6 +241,7 @@ def test_too_many_k8s_jobs_client_split(app_mocks, client_query_split): with contextlib.redirect_stderr(io.StringIO()) as stderr: returncode = main() + print(stderr.getvalue()) assert returncode == constants.INPUT_ERROR assert 'Traceback' not in stderr.getvalue() @@ -832,7 +832,7 @@ def test_misplaced_config_parameter(app_mocks): def test_database_too_large(app_mocks): """Test that a database too large to fit in an instance RAM will be reported""" - DB = 'gs://bucket/testdb' + DB = 's3://bucket/testdb' app_mocks.gke_mock.cloud.storage[f'{DB}-prot-metadata.json'] = DB_METADATA conf = f"""[cloud-provider] @@ -1089,7 +1089,6 @@ def test_invalid_janitor_schedule_aws(app_mocks): assert re.search(r'Invalid value of environment variable ELB_JANITOR_SCHEDULE', msg) -@patch(target='elastic_blast.elb_config.get_gcp_project', new=MagicMock(return_value=None)) def test_no_gcp_project(app_mocks): """Test that missing GCP project is properly reported""" @@ -1100,6 +1099,9 @@ def test_no_gcp_project(app_mocks): results = gs://test-results """ + # make GCP project unset in the mocked gcloud config + app_mocks.gke_mock.cloud.conf['project'] = None + with NamedTemporaryFile() as f: f.write(conf.encode()) f.flush() @@ -1110,6 +1112,7 @@ def test_no_gcp_project(app_mocks): with contextlib.redirect_stderr(io.StringIO()) as stderr: returncode = main() + print(stderr.getvalue()) assert returncode == constants.INPUT_ERROR assert 'Traceback' not in stderr.getvalue() diff --git a/tests/aws/test_aws.py b/tests/aws/test_aws.py index 7930df4..93e95c0 100644 --- a/tests/aws/test_aws.py +++ b/tests/aws/test_aws.py @@ -600,14 +600,8 @@ def test_incorrect_user_db(): def test_wrong_provider_user_db(): cfg = configparser.ConfigParser() cfg.read(f"{TEST_CONFIG_DATA_DIR}/aws-wrong-provider-custom-db.ini") - cfg = ElasticBlastConfig(cfg, task = ElbCommand.SUBMIT) - cfg.cluster.machine_type = 't2.micro' - try: - with pytest.raises(UserReportError) as exc_info: - b = aws.ElasticBlastAws(cfg, create=True) - finally: - # In case the test fails and cluster is created, clean up the cluster - b = aws.ElasticBlastAws(cfg) - b.delete() + with pytest.raises(UserReportError) as exc_info: + cfg = ElasticBlastConfig(cfg, task = ElbCommand.SUBMIT) assert(exc_info.value.returncode == BLASTDB_ERROR) - assert('User database should be in the AWS S3 bucket' in exc_info.value.message) + assert('User database ' in exc_info.value.message) + assert('must reside in AWS S3' in exc_info.value.message) diff --git a/tests/cost/test_cost.py b/tests/cost/test_cost.py index 38ed22c..bf3b793 100644 --- a/tests/cost/test_cost.py +++ b/tests/cost/test_cost.py @@ -23,6 +23,7 @@ import pytest # type: ignore import os, subprocess, io from elastic_blast.cost import get_cost, DFLT_BQ_DATASET, DFLT_BQ_TABLE, BQ_ERROR, NO_RESULTS_ERROR, CMD_ARGS_ERROR +from tests.utils import gcp_env_vars #TEST_CMD = 'python3 elb-cost.py' TEST_CMD = 'elb-cost.py' @@ -85,7 +86,7 @@ def test_app_bq_error(): with io.BytesIO(p.stderr) as f: assert len(f.readlines()) > 0, 'Application error message is missing' -def test_app_label_format_error(): +def test_app_label_format_error(gcp_env_vars): """Test if the application returns the appropriate error code and an error message if run label was formatted incorrectly""" cmd = TEST_CMD + ' aaa --date-range 2020-01-09:2020-01-10' @@ -94,7 +95,7 @@ def test_app_label_format_error(): with io.BytesIO(p.stderr) as f: assert len(f.readlines()) > 0, 'Application error message is missing' -def test_app_date_format_error(): +def test_app_date_format_error(gcp_env_vars): """Test if the application returns the appropriate error code and an error message if date range was formatted incorrectly""" cmd = TEST_CMD + ' aaa:bb --date-range 2020-01-09:2020-01-xx' diff --git a/tests/db_metadata/test_db_metadata.py b/tests/db_metadata/test_db_metadata.py index c11f78f..dba9986 100644 --- a/tests/db_metadata/test_db_metadata.py +++ b/tests/db_metadata/test_db_metadata.py @@ -35,13 +35,14 @@ from tests.utils import gke_mock, aws_credentials, DB_METADATA_PROT as DB_METADATA import pytest +GCP_PRJ = "mocked-gcp-project" def test_get_db_metadata(gke_mock): """Test downloading and parsing BLAST database metadata""" REF_METADATA = json.loads(DB_METADATA) # for GCP - db = get_db_metadata('testdb', MolType.PROTEIN, DBSource.GCP) + db = get_db_metadata('testdb', MolType.PROTEIN, DBSource.GCP, gcp_prj=GCP_PRJ) assert db.dbtype == REF_METADATA['dbtype'] assert db.bytes_to_cache == REF_METADATA['bytes-to-cache'] @@ -56,7 +57,7 @@ def test_get_db_metadata_user_db(gke_mock): REF_METADATA = json.loads(DB_METADATA) # for GCP - db = get_db_metadata('gs://test-bucket/testdb', MolType.PROTEIN, DBSource.GCP) + db = get_db_metadata('gs://test-bucket/testdb', MolType.PROTEIN, DBSource.GCP, gcp_prj=GCP_PRJ) assert db.dbtype == REF_METADATA['dbtype'] assert db.bytes_to_cache == REF_METADATA['bytes-to-cache'] @@ -99,7 +100,7 @@ def test_get_db_metadata_version_12(gke_mock): REF_METADATA = json.loads(DB_METADATA_VERSION_12) # for GCP - db = get_db_metadata('testdb', MolType.NUCLEOTIDE, DBSource.GCP) + db = get_db_metadata('testdb', MolType.NUCLEOTIDE, DBSource.GCP, gcp_prj=GCP_PRJ) assert db.dbtype == REF_METADATA['dbtype'] assert db.bytes_to_cache == REF_METADATA['bytes-to-cache'] assert db.version == '1.2' @@ -121,7 +122,7 @@ def test_get_db_metadata_user_db_version_12(gke_mock): REF_METADATA = json.loads(DB_METADATA_VERSION_12) # for GCP - db = get_db_metadata(f'gs://test-bucket/{DB_NAME}', MolType.NUCLEOTIDE, DBSource.GCP) + db = get_db_metadata(f'gs://test-bucket/{DB_NAME}', MolType.NUCLEOTIDE, DBSource.GCP, gcp_prj=GCP_PRJ) assert db.dbtype == REF_METADATA['dbtype'] assert db.bytes_to_cache == REF_METADATA['bytes-to-cache'] assert db.version == '1.2' @@ -140,7 +141,7 @@ def test_missing_metadata_file(gke_mock): get_db_metadata('s3://some-bucket/non-existent-db', MolType.NUCLEOTIDE, DBSource.AWS) with pytest.raises(FileNotFoundError): - get_db_metadata('this-db-does-not-exist', MolType.PROTEIN, DBSource.GCP) + get_db_metadata('this-db-does-not-exist', MolType.PROTEIN, DBSource.GCP, gcp_prj=GCP_PRJ) # additional field at the end @@ -178,7 +179,7 @@ def test_metadata_with_new_field(gke_mock): REF_METADATA = json.loads(DB_METADATA_NEW_FIELD) gke_mock.cloud.storage[f'{DB}-prot-metadata.json'] = DB_METADATA_NEW_FIELD - db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP) + db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP, gcp_prj=GCP_PRJ) assert db.dbtype == REF_METADATA['dbtype'] assert db.bytes_to_cache == REF_METADATA['bytes-to-cache'] @@ -216,7 +217,7 @@ def test_missing_field(gke_mock): gke_mock.cloud.storage[f'{DB}-prot-metadata.json'] = DB_METADATA_MISSING_FIELD with pytest.raises(UserReportError) as err: - db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP) + db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP, gcp_prj=GCP_PRJ) assert err.value.returncode == BLASTDB_ERROR assert 'Missing field' in err.value.message @@ -255,7 +256,7 @@ def test_spec_problem(gke_mock): gke_mock.cloud.storage[f'{DB}-prot-metadata.json'] = DB_METADATA_SPEC_PROBLEM with pytest.raises(UserReportError) as err: - db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP) + db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP, gcp_prj=GCP_PRJ) assert err.value.returncode == BLASTDB_ERROR assert 'Problem parsing BLAST database metadata file' in err.value.message @@ -266,6 +267,6 @@ def test_malformed_json(gke_mock): gke_mock.cloud.storage[f'{DB}-prot-metadata.json'] = 'abc' with pytest.raises(UserReportError) as err: - db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP) + db = get_db_metadata(DB, MolType.PROTEIN, DBSource.GCP, gcp_prj=GCP_PRJ) assert err.value.returncode == BLASTDB_ERROR assert 'is not a proper JSON file' in err.value.message diff --git a/tests/elb_config/test_elb_config.py b/tests/elb_config/test_elb_config.py index 7a0615f..b1bbcef 100644 --- a/tests/elb_config/test_elb_config.py +++ b/tests/elb_config/test_elb_config.py @@ -238,13 +238,15 @@ def test_gcpconfig_from_configparser_errors(): assert [message for message in errors if key in message and 'invalid value' in message and confpars[CFG_CLOUD_PROVIDER][key] in message] -@patch(target='elastic_blast.elb_config.get_gcp_project', new=MagicMock(return_value=None)) def test_gcpconfig_from_configparser_missing_project(gke_mock ): """Test test missing GCP project is properly reported""" + gke_mock.cloud.conf['project'] = None + confpars = configparser.ConfigParser() with pytest.raises(ValueError) as err: cfg = GCPConfig.create_from_cfg(confpars) + print(err.value) assert 'GCP project is unset' in str(err.value) diff --git a/tests/filehelper/test_aws_filesystem.py b/tests/filehelper/test_aws_filesystem.py index e86eba3..3a90e90 100644 --- a/tests/filehelper/test_aws_filesystem.py +++ b/tests/filehelper/test_aws_filesystem.py @@ -28,6 +28,7 @@ import os import subprocess import boto3 +import botocore.errorfactory from elastic_blast import filehelper from elastic_blast.object_storage_utils import write_to_s3, delete_from_s3 from tempfile import mktemp, NamedTemporaryFile @@ -113,7 +114,10 @@ def test_check_aws_for_write_success(): @pytest.mark.skipif(os.getenv('TEAMCITY_VERSION') is not None, reason='AWS credentials not set in TC') def test_check_aws_for_write_failure(): tn = os.path.join(WRONG_BUCKET, mktemp(prefix='', dir='')) - with filehelper.open_for_write(tn) as f: - f.write('Test') - with pytest.raises(boto3.exceptions.S3UploadFailedError): - filehelper.copy_to_bucket() + try: + with filehelper.open_for_write(tn) as f: + f.write('Test') + except: + assert(True) + else: + assert(False) diff --git a/tests/filehelper/test_filesystem_checks.py b/tests/filehelper/test_filesystem_checks.py index 0588a52..f0b4271 100644 --- a/tests/filehelper/test_filesystem_checks.py +++ b/tests/filehelper/test_filesystem_checks.py @@ -28,20 +28,22 @@ import os from elastic_blast import filehelper from tempfile import TemporaryDirectory +import pytest TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') WRITEABLE_BUCKET = 'gs://blast-test' +GCP_PRJ = "ncbi-sandbox-blast" def test_check_for_read_success(): - filehelper.check_for_read('gs://blast-db/latest-dir') + filehelper.check_for_read('gs://blast-db/latest-dir', gcp_prj=GCP_PRJ) + #filehelper.check_for_read('s3://ncbi-blast-databases/latest-dir') filehelper.check_for_read(os.path.join(TEST_DATA_DIR, 'test.tar')) - filehelper.check_for_read('https://storage.googleapis.com/blast-db/latest-dir') def test_check_for_read_failure(): with pytest.raises(FileNotFoundError): - filehelper.check_for_read('gs://blast-db/non-existent-file') + filehelper.check_for_read('gs://blast-db/non-existent-file', gcp_prj=GCP_PRJ) with pytest.raises(FileNotFoundError): filehelper.check_for_read(os.path.join(TEST_DATA_DIR, 'non-existent-file')) with pytest.raises(FileNotFoundError): diff --git a/tests/gcp/test_gcp.py b/tests/gcp/test_gcp.py index cb76347..1b14292 100644 --- a/tests/gcp/test_gcp.py +++ b/tests/gcp/test_gcp.py @@ -33,6 +33,7 @@ from elastic_blast import kubernetes from elastic_blast import config from elastic_blast import elb_config +from elastic_blast import util from elastic_blast.constants import CLUSTER_ERROR, ElbCommand from elastic_blast.util import SafeExecError, UserReportError from elastic_blast.elb_config import ElasticBlastConfig @@ -65,9 +66,9 @@ def test_fake_gcloud(gke_mock): def test_get_gcp_project(gke_mock): """Test getting GCP project""" - project = elb_config.get_gcp_project() + project = util.get_gcp_project() assert project == GCP_PROJECT - elb_config.safe_exec.assert_called() + util.safe_exec.assert_called() def test_get_unset_gcp_project(mocker): @@ -80,13 +81,10 @@ def subst_safe_exec_unset_project(cmd): # this is how gcloud reports unset project return MockedCompletedProcess('(unset)') - mocker.patch('elastic_blast.elb_config.safe_exec', + mocker.patch('elastic_blast.util.safe_exec', side_effect=subst_safe_exec_unset_project) - project = elb_config.get_gcp_project() - - # unset project must be returned as None - assert project is None - elb_config.safe_exec.assert_called() + with pytest.raises(ValueError): + project = util.get_gcp_project() def test_set_gcp_project(gke_mock): diff --git a/tests/taxonomy/test_taxonomy.py b/tests/taxonomy/test_taxonomy.py index d96fc4d..7995fe1 100644 --- a/tests/taxonomy/test_taxonomy.py +++ b/tests/taxonomy/test_taxonomy.py @@ -25,10 +25,11 @@ """ import configparser -import re +import re, io from tempfile import NamedTemporaryFile from urllib.error import HTTPError import time +import boto3 from unittest.mock import MagicMock, patch from elastic_blast import taxonomy @@ -47,7 +48,7 @@ # input and output taxids for tests input_taxids = [9605, 9608] expected_taxids = set([9605, 9606, 63221, 741158, 1425170, 2665952, 2665953, - 9608, 9611, 9612, 9614, 9615, 9616, 9619, 9620, 9621, 9622, 9623, 9624, 9625, 9626, 9627, 9629, 9630, 9631, 30540, 32534, 34879, 34880, 45781, 55039, 55040, 68721, 68722, 68723, 68724, 68725, 68727, 68728, 68729, 68730, 68732, 68734, 68736, 68737, 68739, 68740, 68741, 69045, 71547, 132609, 143281, 188536, 192959, 228401, 242524, 242525, 244585, 246881, 246882, 286419, 354189, 354190, 354191, 383736, 425200, 425201, 425934, 443256, 476259, 476260, 494514, 554455, 561074, 613187, 644627, 659069, 673762, 676787, 945042, 990119, 1002243, 1002244, 1002254, 1002255, 1224817, 1295334, 1303779, 1316008, 1316009, 1320375, 1341016, 1353242, 1398410, 1419108, 1419257, 1419712, 1605264, 1621113, 1621114, 1621115, 1621116, 1621117, 1621118, 1707807, 1785177, 2494276, 2562269, 2605939, 2626217, 2627721, 2639686, 2658581, 2714668, 2714669, 2714670, 2714671, 2714672, 2714673, 2714674, 2714675, 2726995, 2726996, 2769327, 2769328, 2769329, 2793302, 2793303, 2841919, 2841920, 2841921, 2841922, 2841923, 2813598, 2813599]) + 9608, 9611, 9612, 9614, 9615, 9616, 9619, 9620, 9621, 9622, 9623, 9624, 9625, 9626, 9627, 9629, 9630, 9631, 30540, 32534, 34879, 34880, 45781, 55039, 55040, 68721, 68722, 68723, 68724, 68725, 68727, 68728, 68729, 68730, 68732, 68734, 68736, 68737, 68739, 68740, 68741, 69045, 71547, 132609, 143281, 188536, 192959, 228401, 242524, 242525, 244585, 246881, 246882, 286419, 354189, 354190, 354191, 383736, 425200, 425201, 425934, 443256, 476259, 476260, 494514, 554455, 561074, 613187, 644627, 659069, 673762, 676787, 945042, 990119, 1002243, 1002244, 1002254, 1002255, 1224817, 1295334, 1303779, 1316008, 1316009, 1320375, 1341016, 1353242, 1398410, 1419108, 1419257, 1419712, 1605264, 1621113, 1621114, 1621115, 1621116, 1621117, 1621118, 1707807, 1785177, 2494276, 2562269, 2605939, 2626217, 2627721, 2639686, 2658581, 2714668, 2714669, 2714670, 2714671, 2714672, 2714673, 2714674, 2714675, 2726995, 2726996, 2769327, 2769328, 2769329, 2793302, 2793303, 2841919, 2841920, 2841921, 2841922, 2841923, 2813598, 2813599, 2879911]) @pytest.fixture() def wait(mocker): @@ -172,13 +173,11 @@ def test_get_user_taxids_errors(): def test_get_species_taxids(wait): """Test translating higher level taxids into species level ones""" species_taxids = taxonomy.get_species_taxids(input_taxids) - list_difference = list(expected_taxids - set(species_taxids)) - error_message = f"The taxonomy IDs returned has changed: actual {len(species_taxids)}, expected {len(expected_taxids)}" - error_message += f"List difference: {list_difference}" - assert sorted(species_taxids) == sorted(expected_taxids), error_message + for taxid in expected_taxids: + assert taxid in species_taxids, f'Expected taxid {taxid} not among returned species taxids' -def test_setup_taxid_filtering_taxids(wait, cfg): +def test_setup_taxid_filtering_taxids(wait, cfg, gke_mock): """Test preparing taxidlist file and blast options for taxid filtering -taxids option""" # set up blast command line options @@ -191,14 +190,18 @@ def test_setup_taxid_filtering_taxids(wait, cfg): matches = re.findall(r'-taxidlist\s+(\S+)', cfg.blast.options) assert len(matches) == 1 assert matches[0] == ELB_TAXIDLIST_FILE - key = '/'.join([cfg.cluster.results, ELB_QUERY_BATCH_DIR]) - filename = '/'.join([filehelper.bucket_temp_dirs[key], matches[0]]) - with open(filename) as f: - taxids = [int(i.rstrip()) for i in f.readlines()] - assert taxids == sorted(expected_taxids) + s3 = boto3.resource('s3') + obj = s3.Object(cfg.cluster.results[5:], f'{ELB_QUERY_BATCH_DIR}/{ELB_TAXIDLIST_FILE}') + with io.BytesIO() as f: + obj.download_fileobj(f) + f.seek(0) + taxids = [int(i.rstrip()) for i in f.read().decode().split()] + for taxid in expected_taxids: + assert taxid in taxids, f'Expected taxid {taxid} not found in returned species taxids' -def test_setup_taxid_filtering_negative_taxids(wait, cfg): + +def test_setup_taxid_filtering_negative_taxids(wait, cfg, gke_mock): """Test preparing taxidlist file and blast options for taxid filtering -negative_taxids option""" # set up blast command line options @@ -211,14 +214,17 @@ def test_setup_taxid_filtering_negative_taxids(wait, cfg): matches = re.findall(r'-negative_taxidlist\s+(\S+)', cfg.blast.options) assert len(matches) == 1 assert matches[0] == ELB_TAXIDLIST_FILE - key = '/'.join([cfg.cluster.results, ELB_QUERY_BATCH_DIR]) - filename = '/'.join([filehelper.bucket_temp_dirs[key], matches[0]]) - with open(filename) as f: - taxids = [int(i.rstrip()) for i in f.readlines()] - assert taxids == sorted(expected_taxids) + s3 = boto3.resource('s3') + obj = s3.Object(cfg.cluster.results[5:], f'{ELB_QUERY_BATCH_DIR}/{ELB_TAXIDLIST_FILE}') + with io.BytesIO() as f: + obj.download_fileobj(f) + f.seek(0) + taxids = [int(i.rstrip()) for i in f.read().decode().split()] + for taxid in expected_taxids: + assert taxid in taxids, f'Expected taxid {taxid} not found in returned species taxids' -def test_setup_taxid_filtering_taxidlist(wait, cfg): +def test_setup_taxid_filtering_taxidlist(wait, cfg, gke_mock): """Test preparing taxidlist file and blast options for taxid filtering -taxidlist option""" @@ -239,11 +245,14 @@ def test_setup_taxid_filtering_taxidlist(wait, cfg): matches = re.findall(r'-taxidlist\s+(\S+)', cfg.blast.options) assert len(matches) == 1 assert matches[0] == ELB_TAXIDLIST_FILE - key = '/'.join([cfg.cluster.results, ELB_QUERY_BATCH_DIR]) - filename = '/'.join([filehelper.bucket_temp_dirs[key], matches[0]]) - with open(filename) as f: - taxids = [int(i.rstrip()) for i in f.readlines()] - assert taxids == sorted(expected_taxids) + s3 = boto3.resource('s3') + obj = s3.Object(cfg.cluster.results[5:], f'{ELB_QUERY_BATCH_DIR}/{ELB_TAXIDLIST_FILE}') + with io.BytesIO() as f: + obj.download_fileobj(f) + f.seek(0) + taxids = [int(i.rstrip()) for i in f.read().decode().split()] + for taxid in expected_taxids: + assert taxid in taxids, f'Expected taxid {taxid} not found in returned species taxids' def test_non_existent_taxid(wait, cfg): diff --git a/tests/util/test_util.py b/tests/util/test_util.py index 0acee63..4cb2c56 100644 --- a/tests/util/test_util.py +++ b/tests/util/test_util.py @@ -43,7 +43,7 @@ from elastic_blast.base import InstanceProperties from elastic_blast.db_metadata import DbMetadata import pytest -from tests.utils import MockedCompletedProcess, gke_mock, GCP_REGIONS +from tests.utils import MockedCompletedProcess, gke_mock, GCP_REGIONS, gcp_env_vars DB_METADATA = DbMetadata(version = '1', @@ -126,18 +126,6 @@ def test_safe_exec_cmd_not_a_list_or_string(self): with self.assertRaises(ValueError) as e: safe_exec(1) - @patch(target='elastic_blast.elb_config.gcp_get_regions', new=MagicMock(return_value=GCP_REGIONS)) - def test_get_blastdb_size(self): - cfg = create_config_for_db('nr') - dbsize = get_blastdb_size(cfg.blast.db, cfg.cluster.db_source) - assert dbsize >= 227.4 - - @patch(target='elastic_blast.elb_config.gcp_get_regions', new=MagicMock(return_value=GCP_REGIONS)) - def test_get_blastdb_size_invalid_database(self): - cfg = create_config_for_db('non_existent_blast_database') - with self.assertRaises(ValueError): - get_blastdb_size(cfg.blast.db, cfg.cluster.db_source) - def test_sanitize_for_k8s(self): self.assertEqual('ref-viruses-rep-genomes', sanitize_for_k8s('ref_viruses_rep_genomes')) self.assertEqual('betacoronavirus', sanitize_for_k8s('Betacoronavirus')) @@ -150,6 +138,21 @@ def test_sanitize_aws_batch_job_name(self): def test_sanitize_aws_user_name(self): self.assertEqual('user-name', sanitize_aws_batch_job_name('user.name')) +@patch(target='elastic_blast.elb_config.gcp_get_regions', new=MagicMock(return_value=GCP_REGIONS)) +def test_get_blastdb_size(gcp_env_vars): + cfg = create_config_for_db('nr') + gcp_prj = os.environ.get('CLOUDSDK_CORE_PROJECT', "ncbi-sandbox-blast") + dbsize = get_blastdb_size(cfg.blast.db, cfg.cluster.db_source, gcp_prj) + assert dbsize >= 227.4 + +@patch(target='elastic_blast.elb_config.gcp_get_regions', new=MagicMock(return_value=GCP_REGIONS)) +def test_get_blastdb_size_invalid_database(gcp_env_vars): + cfg = create_config_for_db('non_existent_blast_database') + + with pytest.raises(ValueError): + gcp_prj = os.environ.get('CLOUDSDK_CORE_PROJECT', "ncbi-sandbox-blast") + get_blastdb_size(cfg.blast.db, cfg.cluster.db_source, gcp_prj) + @patch(target='elastic_blast.elb_config.get_db_metadata', new=MagicMock(return_value=DB_METADATA)) def create_config_for_db(dbname): @@ -340,30 +343,34 @@ def test_get_blastdb_info(mocker): DB = f'{DB_BUCKET}/{DB_NAME}' response = DB_NAME+'tar.gz' + orig_safe_exec = util.safe_exec + def safe_exec_gsutil_ls(cmd): """Mocked util.safe_exec function that simulates gsutil ls""" - if cmd != f'gsutil ls {DB}.*': - raise ValueError(f'Bad gsutil command line: "{cmd}"') - return MockedCompletedProcess(response) + if cmd.startswith('gsutil') and f'ls {DB}.*' in cmd: + return MockedCompletedProcess(response) + else: + return orig_safe_exec(cmd) mocker.patch('elastic_blast.util.safe_exec', side_effect=safe_exec_gsutil_ls) + gcp_prj = os.environ.get('CLOUDSDK_CORE_PROJECT', "ncbi-sandbox-blast") # tar.gz file, db_path should explicitely mention it - db, db_path, k8sdblabel = util.get_blastdb_info(DB) + db, db_path, k8sdblabel = util.get_blastdb_info(DB, gcp_prj) assert(db_path == DB+'.tar.gz') assert(k8sdblabel == DB_LABEL) print(db, db_path, k8sdblabel) # no tar.gz file, db_path should have .* response = DB_NAME+'tar.gz.md5' - db, db_path, k8sdblabel = util.get_blastdb_info(DB) + db, db_path, k8sdblabel = util.get_blastdb_info(DB, gcp_prj) assert(db_path == DB+'.*') assert(k8sdblabel == DB_LABEL) print(db, db_path, k8sdblabel) # tar.gz file, db_path should explicitely mention it response = DB_NAME+'tar.gz'+'\n'+DB_NAME+'.ndb' - db, db_path, k8sdblabel = util.get_blastdb_info(DB) + db, db_path, k8sdblabel = util.get_blastdb_info(DB, gcp_prj) assert(db_path == DB+'.tar.gz') assert(k8sdblabel == DB_LABEL) print(db, db_path, k8sdblabel) @@ -371,7 +378,7 @@ def safe_exec_gsutil_ls(cmd): # empty result, should throw an exception response = '' with pytest.raises(ValueError): - util.get_blastdb_info(DB) + util.get_blastdb_info(DB, gcp_prj) # error executing gsutil, should throw an exception def safe_exec_gsutil_ls_exception(cmd): @@ -379,4 +386,4 @@ def safe_exec_gsutil_ls_exception(cmd): raise SafeExecError(1, 'CommandException: One or more URLs matched no objects.') mocker.patch('elastic_blast.util.safe_exec', side_effect=safe_exec_gsutil_ls_exception) with pytest.raises(ValueError): - util.get_blastdb_info(DB) + util.get_blastdb_info(DB, gcp_prj) diff --git a/tests/utils.py b/tests/utils.py index 4981000..f4e643b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -160,6 +160,8 @@ def gke_mock(mocker): mock = GKEMock() + mock.cloud.conf['project'] = GCP_PROJECT + mock.cloud.storage['gs://test-bucket/test-query.fa'] = '>query\nACTGGAGATGAC' mock.cloud.storage['gs://test-results'] = '' mock.cloud.storage[f'gs://{NOT_WRITABLE_BUCKET}'] = '' @@ -235,6 +237,9 @@ class CloudResources: # object content, any object is readable and writable storage: Dict[str, str] = field(default_factory=dict) + # gcloud config + conf: Dict[str, str] = field(default_factory=dict) + def mocked_safe_exec(cmd: Union[List[str], str], cloud_state: CloudResources = None) -> MockedCompletedProcess: """Substitute for util.safe_exec function that calls command line gcloud @@ -379,8 +384,7 @@ def mocked_safe_exec(cmd: Union[List[str], str], cloud_state: CloudResources = N return MockedCompletedProcess() # Check whether a file exists in GCS - elif ' '.join(cmd).startswith('gsutil -q stat') or \ - ' '.join(cmd).startswith('gsutil stat'): + elif ' '.join(cmd).startswith('gsutil') and 'stat' in cmd: if cloud_state: if cmd[-1] in cloud_state.storage: return MockedCompletedProcess() @@ -390,7 +394,7 @@ def mocked_safe_exec(cmd: Union[List[str], str], cloud_state: CloudResources = N return MockedCompletedProcess(stdout='',stderr='',returncode=0) # Check whether a file exists in GCS - elif ' '.join(cmd).startswith('gsutil -q cat') or ' '.join(cmd).startswith('gsutil cat'): + elif ' '.join(cmd).startswith('gsutil') and 'cat' in cmd: cmd = ' '.join(cmd) # simulate reading NCBI database manifest if cmd.endswith('latest-dir'): @@ -402,15 +406,11 @@ def mocked_safe_exec(cmd: Union[List[str], str], cloud_state: CloudResources = N return MockedCompletedProcess(stdout='',stderr='',returncode=0) # copy files to GCS - elif ' '.join(cmd).startswith('gsutil -qm cp') or ' '.join(cmd).startswith('gsutil -mq cp'): + elif ' '.join(cmd).startswith('gsutil') and 'cp' in cmd: return MockedCompletedProcess() # remove a file from GCS - elif ' '.join(cmd).startswith('gsutil -q rm'): - return MockedCompletedProcess(stdout='',stderr='',returncode=0) - - # Check whether a file is being removed from GCS - elif ' '.join(cmd).startswith('gsutil -mq rm'): + elif ' '.join(cmd).startswith('gsutil') and 'rm' in cmd: return MockedCompletedProcess(stdout='',stderr='',returncode=0) elif ' '.join(cmd).startswith('kubectl config current-context'): @@ -484,6 +484,12 @@ def mocked_safe_exec(self, cmd): if cmd.startswith('gcloud compute disks delete'): self.disk_delete_called = True + if cmd.startswith('gcloud config get-value project'): + print(self.cloud.conf) + if 'project' in self.cloud.conf and self.cloud.conf['project']: + return MockedCompletedProcess(self.cloud.conf['project']) + return MockedCompletedProcess('(unset)') + return mocked_safe_exec(cmd, self.cloud) @@ -491,13 +497,13 @@ def mocked_popen(self, cmd, stderr, stdin=None, stdout=None, universal_newlines= """Mocked subprocess.Popen function, used to mock calls to gsutil used in elastic_blast.filehelper""" # open_for_read - if ' '.join(cmd).startswith('gsutil cat'): + if ' '.join(cmd).startswith('gsutil') and 'cat' in cmd: if cmd[-1] in self.cloud.storage: return MockedCompletedProcess(stdout=self.cloud.storage[cmd[-1]], stderr='', subprocess_run_called=False) else: return MockedCompletedProcess(returncode=1, stdout='', stderr=f'Object "{cmd[-1]}" does not exist', subprocess_run_called=False) # test dir for write - elif ' '.join(cmd).startswith('gsutil cp -'): + elif ' '.join(cmd).startswith('gsutil') and 'cp' in cmd and '-' in cmd: if '/'.join(cmd[-1].split('/')[:-1]) in self.cloud.storage: if NOT_WRITABLE_BUCKET in cmd[-1]: return MockedCompletedProcess(returncode=1, stderr=f'Mocked error: cannot write to bucker {cmd[-1]}') @@ -532,6 +538,29 @@ def mocked_client(self, client, config=None): raise NotImplementedError(f'boto3 mock for {client} client is not implemented') +@pytest.fixture() +def gcp_env_vars(): + env = { 'CLOUDSDK_CORE_PROJECT': 'ncbi-sandbox-blast' } + orig_env = {} + + if 'TEAMCITY_VERSION' in os.environ: + for var_name in env: + if var_name in os.environ: + orig_env[var_name] = os.environ[var_name] + os.environ[var_name] = str(env[var_name]) + + yield env + + # cleanup + for var_name in env: + if var_name in orig_env: + os.environ[var_name] = orig_env[var_name] + else: + # os.unsetenv does not work on every system + del os.environ[var_name] + else: + yield orig_env + @pytest.fixture def aws_credentials(): """Credentials for mocked AWS services. This fixture ensures that we are @@ -567,6 +596,14 @@ def load(self): if self.obj not in self.storage: raise ClientError(None, None) + def upload_fileobj(self, stream): + """Upload a file object to the cloud bucket""" + self.storage[self.obj] = stream.read() + + def download_fileobj(self, stream): + """Download a file object from the cloud bucket""" + stream.write(self.storage[self.obj]) + class MockedEC2ClientBase: """ Mocked EC2 client """