diff --git a/CITATION.cff b/CITATION.cff index e04c860..74e08b0 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -1,8 +1,8 @@ cff-version: "1.1.0" message: "If you use this software, please cite it using these metadata." title: ElasticBLAST -version: "0.2.7" -date-released: 2022-08-11 +version: "1.0.0" +date-released: 2022-12-05 license: "NCBI Public Domain" repository-code: "https://github.com/ncbi/elastic-blast/" authors: diff --git a/Makefile b/Makefile index 06185c0..c616230 100644 --- a/Makefile +++ b/Makefile @@ -60,7 +60,7 @@ elastic-blast3.7: ${PYTHON_SRC} ${YAML_TEMPLATES} ${VENV} validate-cf-templates # Python support ${VENV}: requirements/base.txt requirements/test.txt - [ -d ${VENV} ] || python3 -m venv $@ + [ -d ${VENV} ] || virtualenv -p python3 $@ source ${VENV}/bin/activate && pip3 install -qe . -r requirements/test.txt source ${VENV}/bin/activate && python3 setup.py install_data @@ -107,7 +107,7 @@ yamllint: ${VENV} source ${VENV}/bin/activate && \ yamllint -d share/etc/yamllint-config.yaml src/elastic_blast/templates/storage-gcp-ssd.yaml source ${VENV}/bin/activate && \ - yamllint -d share/etc/yamllint-config.yaml src/elastic_blast/templates/pvc.yaml.template + yamllint -d share/etc/yamllint-config.yaml src/elastic_blast/templates/pvc-*.yaml.template source ${VENV}/bin/activate && \ yamllint -d share/etc/yamllint-config.yaml src/elastic_blast/templates/job-init-* source ${VENV}/bin/activate && \ diff --git a/bin/elastic-blast b/bin/elastic-blast index 885dd96..2b557f4 100755 --- a/bin/elastic-blast +++ b/bin/elastic-blast @@ -46,7 +46,6 @@ from elastic_blast.constants import ElbCommand from elastic_blast.constants import ELB_DFLT_LOGLEVEL, ELB_DFLT_LOGFILE from elastic_blast.constants import CFG_CLOUD_PROVIDER, CFG_CP_GCP_PROJECT - DESC = r"""This application facilitates running BLAST on large amounts of query sequence data on the cloud""" diff --git a/bin/fasta_split.py b/bin/fasta_split.py index f272280..dbda49c 100755 --- a/bin/fasta_split.py +++ b/bin/fasta_split.py @@ -43,6 +43,8 @@ from elastic_blast.split import FASTAReader from elastic_blast.jobs import write_job_files from elastic_blast.constants import ELB_QUERY_BATCH_FILE_PREFIX +from elastic_blast.constants import ELB_DFLT_LOGFILE, ELB_DFLT_LOGLEVEL +from elastic_blast.util import config_logging DEFAULT_BATCH_LEN = 5000000 @@ -74,6 +76,12 @@ def parse_arguments(): help='file to report total number of bases/residues in input file') parser.add_argument("-n", "--dry-run", action='store_true', help="Do not run any commands, just show what would be executed") + parser.add_argument("--logfile", default=argparse.SUPPRESS, type=str, + help=f"Default: {ELB_DFLT_LOGFILE}") + parser.add_argument("--loglevel", default=argparse.SUPPRESS, + help=f"Default: {ELB_DFLT_LOGLEVEL}", + choices=["DEBUG", "INFO", "WARNING", + "ERROR", "CRITICAL"]) return parser.parse_args() def main(): @@ -89,6 +97,8 @@ def main(): count_file = args.count dry_run = args.dry_run job_template_text = '' + + config_logging(args) try: if job_template: with open_for_read(job_template) as f: diff --git a/docker-blast/Makefile b/docker-blast/Makefile index 020f994..99ae9fb 100644 --- a/docker-blast/Makefile +++ b/docker-blast/Makefile @@ -30,7 +30,7 @@ GCP_IMG?=gcr.io/ncbi-sandbox-blast/${IMG} AWS_SERVER?=public.ecr.aws/i6v3i0i9 AWS_IMG?=${AWS_SERVER}/elasticblast-elb AWS_REGION?=us-east-1 -VERSION?=1.1.1 +VERSION?=1.1.3 ifeq (, $(shell which vmtouch 2>/dev/null)) NOVMTOUCH?=--no-vmtouch diff --git a/docker-blast/splitq_download_db_search b/docker-blast/splitq_download_db_search index 6817c2c..d86e57e 100755 --- a/docker-blast/splitq_download_db_search +++ b/docker-blast/splitq_download_db_search @@ -249,7 +249,7 @@ def _download_database(args, is_user, db_done): verbose = ' --verbose --verbose --verbose --verbose --verbose --verbose' if args.verbose else '' creds = ' --no-sign-request' if args.no_creds else '' nprocs_to_download_db = min(MAX_PROCS_TO_DOWNLOAD_DB, int(os.cpu_count()/args.num_threads)) - p = safe_exec(f"time update_blastdb.pl taxdb --decompress --source ncbi {verbose} --num_threads {nprocs_to_download_db}") + p = safe_exec(f"time update_blastdb.pl taxdb --decompress --source {args.source}{verbose} --num_threads {nprocs_to_download_db}") print(p.stdout.decode(), end='') print(p.stderr.decode(), end='') if is_user: diff --git a/docker-blast/update_blastdb.pl b/docker-blast/update_blastdb.pl index 84d4844..9d08c66 100755 --- a/docker-blast/update_blastdb.pl +++ b/docker-blast/update_blastdb.pl @@ -55,8 +55,9 @@ use constant AMI_URL => "http://169.254.169.254/latest/meta-data/local-hostname"; use constant AWS_BUCKET => "ncbi-blast-databases"; +use constant GCS_URL => "https://storage.googleapis.com"; use constant GCP_URL => "http://metadata.google.internal/computeMetadata/v1/instance/id"; -use constant GCP_BUCKET => "gs://blast-db"; +use constant GCP_BUCKET => "blast-db"; # TODO: deprecate this in the next release 2.14.x #use constant BLASTDB_MANIFEST => "blastdb-manifest.json"; @@ -79,6 +80,7 @@ my $opt_source; my $opt_legacy_exit_code = 0; my $opt_nt = &get_num_cores(); +my $opt_gcp_prj = undef; my $result = GetOptions("verbose+" => \$opt_verbose, "quiet" => \$opt_quiet, "force" => \$opt_force_download, @@ -89,6 +91,7 @@ "blastdb_version:i" => \$opt_blastdb_ver, "decompress" => \$opt_decompress, "source=s" => \$opt_source, + "gcp-project=s" => \$opt_gcp_prj, "num_threads=i" => \$opt_nt, "legacy_exit_code" => \$opt_legacy_exit_code, "help" => \$opt_help); @@ -168,15 +171,16 @@ print "Error: $0 depends on curl to fetch data from cloud storage, please install this utility to access this data source.\n"; exit(EXIT_FAILURE); } -if ($location =~ /gcp/i and (not defined $gsutil or not defined $gcloud)) { - print "Error: $0 depends on gsutil and gcloud to fetch data from cloud storage, please install these utilities to access this data source.\n"; - exit(EXIT_FAILURE); -} -my $gcp_prj = ($location =~ /gcp/i) ? &get_gcp_project() : undef; -if ($location =~ /gcp/i and not defined $gcp_prj) { - print "Error: $0 depends on gcloud being configured to fetch data from cloud storage, please configure it per the instructions in https://cloud.google.com/sdk/docs/initializing .\n"; +if ($location =~ /gcp/i and defined($opt_gcp_prj) and (not defined $gsutil or not defined $gcloud)) { + print "Error: when providing a GCP project, $0 depends on gsutil and gcloud to fetch data from cloud storage, please install these utilities to access this data source.\n"; exit(EXIT_FAILURE); } +my $gcp_prj = $opt_gcp_prj; +#my $gcp_prj = ($location =~ /gcp/i) ? &get_gcp_project() : undef; +#if ($location =~ /gcp/i and not defined $gcp_prj) { +# print "Error: $0 depends on gcloud being configured to fetch data from cloud storage, please configure it per the instructions in https://cloud.google.com/sdk/docs/initializing .\n"; +# exit(EXIT_FAILURE); +#} my $ftp; @@ -285,10 +289,11 @@ sub showall_from_metadata_file_1_1 } } if (@files2download) { + my $gsutil = &get_gsutil_path(); my $awscli = &get_awscli_path(); my $cmd; my $fh = File::Temp->new(); - if ($location eq "GCP") { + if ($location eq "GCP" and defined($gcp_prj)) { $cmd = "$gsutil -u $gcp_prj "; if ($opt_nt > 1) { $cmd .= "-m -q "; @@ -298,29 +303,28 @@ sub showall_from_metadata_file_1_1 $cmd .= "-q cp "; } $cmd .= join(" ", @files2download) . " ."; - } else { - if (defined ($awscli)) { - # https://registry.opendata.aws/ncbi-blast-databases/#usageexamples - my $aws_cmd = "$awscli s3 cp --no-sign-request "; - $aws_cmd .= "--only-show-errors " unless $opt_verbose >= 3; + } elsif ($location eq "AWS" and defined ($awscli)) { + # https://registry.opendata.aws/ncbi-blast-databases/#usageexamples + my $aws_cmd = "$awscli s3 cp --no-sign-request "; + $aws_cmd .= "--only-show-errors " unless $opt_verbose >= 3; + print $fh join("\n", @files2download); + $cmd = "/usr/bin/xargs -P $opt_nt -n 1 -I{}"; + $cmd .= " -t" if $opt_verbose > 3; + $cmd .= " $aws_cmd {} ."; + $cmd .= " <$fh " ; + } else { # fall back to curl + my $url = $location eq "AWS" ? AWS_URL : GCS_URL; + s,gs://,$url/, foreach (@files2download); + s,s3://,$url/, foreach (@files2download); + if ($opt_nt > 1 and -f "/usr/bin/xargs") { print $fh join("\n", @files2download); - $cmd = "/usr/bin/xargs -P $opt_nt -n 1 -I{}"; + $cmd = "/usr/bin/xargs -P $opt_nt -n 1"; $cmd .= " -t" if $opt_verbose > 3; - $cmd .= " $aws_cmd {} ."; + $cmd .= " $curl -sSOR"; $cmd .= " <$fh " ; - } else { # fall back to curl for AWS only - my $url = AWS_URL; - s,s3://,$url/, foreach (@files2download); - if ($opt_nt > 1 and -f "/usr/bin/xargs") { - print $fh join("\n", @files2download); - $cmd = "/usr/bin/xargs -P $opt_nt -n 1"; - $cmd .= " -t" if $opt_verbose > 3; - $cmd .= " $curl -sSOR"; - $cmd .= " <$fh " ; - } else { - $cmd = "$curl -sSR"; - $cmd .= " -O $_" foreach (@files2download); - } + } else { + $cmd = "$curl -sSR"; + $cmd .= " -O $_" foreach (@files2download); } } print "$cmd\n" if $opt_verbose > 3; @@ -670,8 +674,13 @@ sub get_latest_dir $url = AWS_URL . "/" . AWS_BUCKET . "/latest-dir"; $cmd = "$curl -s $url"; } else { - $url = GCP_BUCKET . "/latest-dir"; - $cmd = "$gsutil -u $gcp_prj cat $url"; + if (defined($gcp_prj)) { + $url = 'gs://' . GCP_BUCKET . "/latest-dir"; + $cmd = "$gsutil -u $gcp_prj cat $url"; + } else { + $url = GCS_URL . "/" . GCP_BUCKET . "/latest-dir"; + $cmd = "$curl -s $url"; + } } print "$cmd\n" if DEBUG; chomp($retval = `$cmd`); @@ -696,8 +705,13 @@ sub get_blastdb_metadata $url = AWS_URL . "/" . AWS_BUCKET . "/$latest_dir/" . BLASTDB_METADATA; $cmd = "curl -sf $url"; } elsif ($source eq "GCP") { - $url = GCP_BUCKET . "/$latest_dir/" . BLASTDB_METADATA; - $cmd = "$gsutil -u $gcp_prj cat $url"; + if (defined($gcp_prj)) { + $url = 'gs://' . GCP_BUCKET . "/$latest_dir/" . BLASTDB_METADATA; + $cmd = "$gsutil -u $gcp_prj cat $url"; + } else { + $url = GCS_URL . "/" . GCP_BUCKET . "/$latest_dir/" . BLASTDB_METADATA; + $cmd = "curl -sf $url"; + } } else { $url = 'ftp://' . NCBI_FTP . "/blast/db/" . BLASTDB_METADATA; $cmd = "curl -sf $url"; diff --git a/docker-janitor/Makefile b/docker-janitor/Makefile index f157bb9..44aa313 100644 --- a/docker-janitor/Makefile +++ b/docker-janitor/Makefile @@ -27,7 +27,7 @@ SHELL=/bin/bash .PHONY: all pre-check check clean build publish gcp-build gcp-check gcp-clean IMG?=ncbi/elasticblast-janitor -VERSION?=0.2.0 +VERSION?=0.3.0 GCP_PROJECT?=$(shell gcloud config get-value project 2>/dev/null) GCP_TEST_BUCKET?=gs://elasticblast-test/query-split-run-test diff --git a/docker-job-submit/Dockerfile.gcp b/docker-job-submit/Dockerfile.gcp index 3476339..9d006fc 100644 --- a/docker-job-submit/Dockerfile.gcp +++ b/docker-job-submit/Dockerfile.gcp @@ -28,10 +28,13 @@ LABEL Vendor="NCBI/NLM/NIH" LABEL Maintainer=camacho@ncbi.nlm.nih.gov COPY cloud-job-submit.sh /usr/bin/ +COPY templates/volume-snapshot-class.yaml /templates/ +COPY templates/volume-snapshot.yaml /templates/ +COPY templates/pvc-rom.yaml.template /templates/ RUN chmod +x /usr/bin/cloud-job-submit.sh && \ apk -U upgrade && \ - apk add --no-cache bash gettext curl && \ + apk add --no-cache bash gettext curl jq && \ curl -LO https://storage.googleapis.com/kubernetes-release/release/`curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt`/bin/linux/amd64/kubectl && \ chmod +x ./kubectl && \ mv kubectl /usr/bin/ && \ diff --git a/docker-job-submit/Makefile b/docker-job-submit/Makefile index 060cf28..123da67 100644 --- a/docker-job-submit/Makefile +++ b/docker-job-submit/Makefile @@ -28,7 +28,7 @@ SHELL=/bin/bash .PHONY: all pre-check check clean build publish gcp-build gcp-check gcp-clean IMG?=ncbi/elasticblast-job-submit -VERSION?=2.0.0 +VERSION?=3.0.0 GCP_PROJECT?=$(shell gcloud config get-value project 2>/dev/null) GCP_TEST_BUCKET?=gs://elasticblast-test/cloud-job-submission AWS_REGION?=us-east-1 @@ -61,6 +61,7 @@ check: .PHONY: gcp-build gcp-build: + rsync -a ../src/elastic_blast/templates ${PWD}/ gcloud builds submit --config cloudbuild.yaml --substitutions _VERSION=${VERSION},_IMG=${IMG} .PHONY: aws-build diff --git a/docker-job-submit/cloud-job-submit.sh b/docker-job-submit/cloud-job-submit.sh index 2643fb8..13634d4 100755 --- a/docker-job-submit/cloud-job-submit.sh +++ b/docker-job-submit/cloud-job-submit.sh @@ -34,6 +34,7 @@ K8S_JOB_GET_BLASTDB=get-blastdb K8S_JOB_IMPORT_QUERY_BATCHES=import-query-batches K8S_JOB_SUBMIT_JOBS=submit-jobs ELB_PAUSE_AFTER_INIT_PV=150 +ELB_DISK_ID_FILE=disk-id.txt GSUTIL_COPY='gsutil -q cp' GCLOUD=gcloud @@ -53,6 +54,7 @@ ELB_RESULTS=test ELB_CLUSTER_NAME=test-cluster ELB_GCP_PROJECT=test-project ELB_GCP_ZONE=test-zone +ELB_USE_LOCAL_SSD=false mkdir -p test/metadata cp ../src/elastic_blast/templates/blast-batch-job.yaml.template test/metadata/job.yaml.template for ((i=0; i<1020; i++)) do printf 'batch_%03d.fa\n' "$i" >> test/metadata/batch_list.txt; done @@ -78,20 +80,51 @@ if [[ "$s" != Complete*( Complete) ]]; then exit 1 fi -# Unmount ReadWrite blastdb volume, necessary for cluster use + +# Get init-pv job logs pods=`kubectl get pods -l job-name=init-pv -o jsonpath='{.items[*].metadata.name}'` for pod in $pods; do for c in ${K8S_JOB_GET_BLASTDB} ${K8S_JOB_IMPORT_QUERY_BATCHES}; do ${KUBECTL} logs $pod -c $c --timestamps --since=24h --tail=-1 | ${GSUTIL_COPY} /dev/stdin ${ELB_RESULTS}/logs/k8s-$pod-$c.log done done -if [ ! -z "$pods" ]; then + + +# no need to deal with persistent disks and snapshots if a local SSD is used +if ! $ELB_USE_LOCAL_SSD ; then + + # Create a volume snapshot + ${KUBECTL} apply -f /templates/volume-snapshot-class.yaml + ${KUBECTL} apply -f /templates/volume-snapshot.yaml + sleep 5 + + # Wait for the snapshot to be ready + while true; do + st=$(${KUBECTL} get volumesnapshot blast-dbs-snapshot -o jsonpath='{.status.readyToUse}') + [ $? -ne 0 ] && echo "ERROR: Getting volume snapshot status" && exit 1 + [ $st == true ] && break + echo "Volume snapshot status: $st" + sleep 30 + done + + # save writable disk id + export pv_rwo=$(${KUBECTL} get pvc blast-dbs-pvc-rwo -o jsonpath='{.spec.volumeName}') + + # Delete the job to unmount ReadWrite blastdb volume ${KUBECTL} delete job init-pv # Wait for disk to be unmounted echo Waiting for $ELB_PAUSE_AFTER_INIT_PV sec to unmount PV disk sleep $ELB_PAUSE_AFTER_INIT_PV + + # Delete ReadWriteOnce PVC + ${KUBECTL} delete pvc blast-dbs-pvc-rwo + + # Create ReadOnlyMany PVC + envsubst '${ELB_PD_SIZE}' pvc-rom.yaml + ${KUBECTL} apply -f pvc-rom.yaml fi + # Debug job fail - set env variable ELB_DEBUG_SUBMIT_JOB_FAIL to non empty value [ -n "${ELB_DEBUG_SUBMIT_JOB_FAIL:-}" ] && echo Job submit job failed for debug && exit 1 @@ -144,4 +177,42 @@ if ${GSUTIL_COPY} ${ELB_RESULTS}/${ELB_METADATA_DIR}/job.yaml.template . && fi copy_job_logs_to_results_bucket submit "${K8S_JOB_SUBMIT_JOBS}" echo Done +else + echo "Job file or batch list not found in GCS" + exit 1 +fi + + +# no need to deal with persistent disks and snapshots if a local SSD is used +if $ELB_USE_LOCAL_SSD ; then + exit 0 +fi + + +# wait for PVC to bind +while true; do + st=$(${KUBECTL} get -f pvc-rom.yaml -o jsonpath='{.status.phase}') + [ $? -ne 0 ] && echo "ERROR: Getting PVC bind state" && exit 1 + [ $st == Bound ] && break + echo "PVC status: $st" + sleep 30 +done + +# label the new persistent disk +export pv=$(${KUBECTL} get -f pvc-rom.yaml -o jsonpath='{.spec.volumeName}') +jq -n --arg dd $pv '[$dd]' | gsutil cp - ${ELB_RESULTS}/${ELB_METADATA_DIR}/$ELB_DISK_ID_FILE +gcloud compute disks update $pv --update-labels ${ELB_LABELS} --zone ${ELB_GCP_ZONE} --project ${ELB_GCP_PROJECT} + +# delete snapshot +${KUBECTL} delete volumesnapshot --all + +# check if the writable disk was deleted and try deleting again, +# if unsuccessful save its id in GS +if gcloud compute disks describe $pv_rwo --zone $ELB_GCP_ZONE ; then + gcloud compute disks delete $pv_rwo --zone $ELB_GCP_ZONE + sleep 10 + + if gcloud compute disks describe $pv_rwo --zone $ELB_GCP_ZONE ; then + jq -n --arg d1 $pv_rwo --arg d2 $pv '[d1, d2]' | gsutil cp - ${ELB_RESULTS}/${ELB_METADATA_DIR}/$ELB_DISK_ID_FILE + fi fi diff --git a/docker-qs/Makefile b/docker-qs/Makefile index 7e6e1a2..045538f 100644 --- a/docker-qs/Makefile +++ b/docker-qs/Makefile @@ -28,7 +28,7 @@ SHELL=/bin/bash .PHONY: all pre-check check clean build publish gcp-build gcp-check gcp-clean IMG?=ncbi/elasticblast-query-split -VERSION?=0.1.3 +VERSION?=0.1.4 GCP_PROJECT?=$(shell gcloud config get-value project 2>/dev/null) GCP_TEST_BUCKET?=gs://elasticblast-test/query-split-run-test AWS_REGION?=us-east-1 @@ -77,7 +77,7 @@ gcp-list-tagless-images: .PHONY: aws-build aws-build: - gcloud builds submit --config=awscloudbuild.yaml --substitutions=TAG_NAME="${VERSION}",_IMG="${AWS_IMG}",_SERVER="${AWS_SERVER}",_AWS_ECR_PASSWD="`aws ecr-public get-login-password --region ${AWS_REGION}`" . + gcloud builds submit --config=awscloudbuild.yaml --substitutions=_DOCKERFILE=Dockerfile,TAG_NAME="${VERSION}",_IMG="${AWS_IMG}",_SERVER="${AWS_SERVER}",_AWS_ECR_PASSWD="`aws ecr-public get-login-password --region ${AWS_REGION}`" . # Use this target to build an image from your local sources as opposed to those on PyPI.org .PHONY: aws-build-from-local-sources diff --git a/docker-qs/run.sh b/docker-qs/run.sh index ec6e84e..325a8c7 100755 --- a/docker-qs/run.sh +++ b/docker-qs/run.sh @@ -97,7 +97,7 @@ EOF else time gsutil -qm cp $TMP $output_bucket/metadata/query_length.txt if [ $split_to_cloud -eq 1 ]; then - gsutil -mq cp "$local_output_dir/batch_*.fa" $output_bucket/query_batches + gsutil -mq cp "$local_output_dir/batch_*.fa" $output_bucket/query_batches/ fi fi fi diff --git a/requirements/base.txt b/requirements/base.txt index 9c65565..510f94a 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,11 +1,11 @@ -wheel==0.37.0 -setuptools==56.0.0 -importlib-resources==5.9.0 -importlib-metadata==4.12.0 -pex==2.1.103 -boto3==1.24.49 -botocore==1.27.49 +wheel==0.37.1 +setuptools==65.5.0 +importlib-resources==5.4.0 +importlib-metadata==4.11.1 +pex==2.1.117 +boto3==1.26.20 +botocore==1.29.20 awslimitchecker==12.0.0 -tenacity==8.0.1 +tenacity==8.1.0 dataclasses-json==0.5.7 types-pkg-resources==0.1.3 diff --git a/requirements/test.txt b/requirements/test.txt index 980d6f2..cb75ad2 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -1,13 +1,13 @@ -r base.txt -pytest==7.1.2 -pytest-cov==3.0.0 -pytest-mock==3.8.2 -teamcity-messages==1.31 -mypy==0.971 +pytest==7.2.0 +pytest-cov==4.0.0 +pytest-mock==3.10.0 +teamcity-messages==1.32 +mypy==0.991 pylint==2.7.4 -tox==3.25.1 -yamllint==1.27.1 -moto==3.1.17 -docker==5.0.3 -cfn-lint==0.61.4 +tox==3.27.1 +yamllint==1.28.0 +moto==4.0.11 +docker==6.0.1 +cfn-lint==0.72.1 diff --git a/src/elastic_blast/aws.py b/src/elastic_blast/aws.py index 9b11e4f..caffdb6 100644 --- a/src/elastic_blast/aws.py +++ b/src/elastic_blast/aws.py @@ -39,7 +39,7 @@ from pprint import pformat from pathlib import Path -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Tuple, Optional import boto3 # type: ignore from botocore.exceptions import ClientError, NoCredentialsError, ParamValidationError, WaiterError # type: ignore @@ -153,7 +153,7 @@ class ElasticBlastAws(ElasticBlast): (janitor module) """ - def __init__(self, cfg: ElasticBlastConfig, create=False, cleanup_stack: List[Any]=None): + def __init__(self, cfg: ElasticBlastConfig, create=False, cleanup_stack: Optional[List[Any]]=None): """ Class constructor: it's meant to be a starting point and to implement a base class with the core ElasticBLAST interface Parameters: @@ -176,7 +176,9 @@ def _init(self, cfg: ElasticBlastConfig, create: bool): self.iam = boto3.resource('iam', config=self.boto_cfg) self.ec2 = boto3.resource('ec2', config=self.boto_cfg) - self.owner = sanitize_aws_batch_job_name(getpass.getuser().lower()) + # Per EB-1554, to prevent role names from getting longer than 64 characters + MAX_USERNAME_LENGTH=38 + self.owner = sanitize_aws_batch_job_name(getpass.getuser()[:MAX_USERNAME_LENGTH].lower()) self.results_bucket = cfg.cluster.results self.vpc_id = cfg.aws.vpc self.subnets = None @@ -223,9 +225,9 @@ def _init(self, cfg: ElasticBlastConfig, create: bool): if re.match(r'[cmr]5a?dn?\.\d{0,2}x?large', instance_type) or instance_type.startswith('x1'): use_ssd = True # Shrink the default EBS root disk since EC2 instances will use locally attached SSDs - logging.warning("Using gp2 30GB EBS root disk because locally attached SSDs will be used") + disk_type = 'gp3' disk_size = 30 - disk_type = 'gp2' + logging.warning(f"Using {disk_type} {disk_size}GB EBS root disk because locally attached SSDs will be used") if instance_type.lower() == 'optimal': # EXPERIMENTAL! max_cpus = self.cfg.cluster.num_nodes * self.cfg.cluster.num_cpus else: diff --git a/src/elastic_blast/aws_traits.py b/src/elastic_blast/aws_traits.py index 2221d59..623865a 100644 --- a/src/elastic_blast/aws_traits.py +++ b/src/elastic_blast/aws_traits.py @@ -109,7 +109,7 @@ def get_instance_type_offerings(region: str) -> List[str]: def get_suitable_instance_types(min_memory: MemoryStr, min_cpus: PositiveInteger, - instance_types: List[str] = None) -> List[Any]: + instance_types: Optional[List[str]] = None) -> List[Any]: """Get a list of instance type descriptions with at least min_memory and number of CPUs diff --git a/src/elastic_blast/base.py b/src/elastic_blast/base.py index 84b6127..bc9d758 100644 --- a/src/elastic_blast/base.py +++ b/src/elastic_blast/base.py @@ -106,10 +106,13 @@ class MemoryStr(str): def __new__(cls, value): """Constructor, validates that argumant is a valid GCP name""" str_value = str(value) - number_re = re.compile(r'^\d+[kKmMgGtT]$|^\d+.\d+[kKmMgGtT]$') + number_re = re.compile(r'^\d+[kKmMgGtT]i?$|^\d+.\d+[kKmMgGtT]i?$') if not number_re.match(str_value): raise ValueError('Memory request or limit must be specified by a number followed by a unit, for example 100m') - if float(str_value[:-1]) <= 0: + unit_pos = -1 + if str_value.endswith('i'): + unit_pos = -2 + if float(str_value[:unit_pos]) <= 0: raise ValueError('Memory request or limit must be larger than zero') return super(cls, cls).__new__(cls, str_value) @@ -117,25 +120,20 @@ def __new__(cls, value): def asGB(self) -> float: """Return the amount of memory in GB as float""" mult = 1.0 - if self[-1].upper() == 'K': + unit_pos = -1; + if self.endswith('i'): + unit_pos = -2 + if self[unit_pos].upper() == 'K': mult /= 1024 ** 2 - elif self[-1].upper() == 'M': + elif self[unit_pos].upper() == 'M': mult /= 1024 - elif self[-1].upper() == 'T': + elif self[unit_pos].upper() == 'T': mult *= 1024 - return float(self[:-1]) * mult + return float(self[:unit_pos]) * mult def asMB(self) -> float: """Return the amount of memory in MB as float""" - mult = 1.0 - if self[-1].upper() == 'K': - mult /= 1024 - elif self[-1].upper() == 'G': - mult *= 1024 - elif self[-1].upper() == 'T': - mult *= 1024 ** 2 - return float(self[:-1]) * mult - + return self.asGB() * 1024 class DBSource(Enum): """Sources of a BLAST database supported by update_blastdb.pl from BLAST+ package""" diff --git a/src/elastic_blast/commands/submit.py b/src/elastic_blast/commands/submit.py index 6111437..3983888 100755 --- a/src/elastic_blast/commands/submit.py +++ b/src/elastic_blast/commands/submit.py @@ -39,7 +39,6 @@ from elastic_blast.filehelper import check_for_read, check_dir_for_write, cleanup_temp_bucket_dirs from elastic_blast.filehelper import get_length, harvest_query_splitting_results from elastic_blast.split import FASTAReader -from elastic_blast.gcp import enable_gcp_api from elastic_blast.gcp import check_cluster as gcp_check_cluster from elastic_blast.gcp_traits import get_machine_properties from elastic_blast.util import get_blastdb_size, UserReportError @@ -77,7 +76,7 @@ def prepare_1_stage(cfg: ElasticBlastConfig, query_files): """ Prepare data for 1 stage cloud query split on AWS """ query_file = query_files[0] # Get file length as approximation of sequence length - gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.get_project_for_gcs_downloads() query_length = get_length(query_file, gcp_prj = gcp_prj) if query_file.endswith('.gz'): query_length = query_length * 4 # approximation again @@ -109,8 +108,6 @@ def submit(args, cfg, clean_up_stack): # For now, checking resources is only implemented for AWS if cfg.cloud_provider.cloud == CSP.AWS: check_resource_quotas(cfg) - else: - enable_gcp_api(cfg) if check_running_cluster(cfg): msg = get_resubmission_error_msg(cfg.cluster.results, cfg.cloud_provider.cloud) @@ -140,8 +137,8 @@ def submit(args, cfg, clean_up_stack): setup_taxid_filtering(cfg) # check database availability + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.get_project_for_gcs_downloads() try: - gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project get_blastdb_size(cfg.blast.db, cfg.cluster.db_source, gcp_prj) except ValueError as err: raise UserReportError(returncode=BLASTDB_ERROR, message=str(err)) @@ -157,7 +154,9 @@ def submit(args, cfg, clean_up_stack): if 'ELB_NO_SEARCH' in os.environ: return 0 if not elastic_blast.cloud_job_submission: elastic_blast.wait_for_cloud_query_split() - qs_res = harvest_query_splitting_results(cfg.cluster.results, dry_run) + qs_res = harvest_query_splitting_results(cfg.cluster.results, + dry_run, + gcp_project=gcp_prj) queries = qs_res.query_batches query_length = qs_res.query_length @@ -180,7 +179,7 @@ def check_running_cluster(cfg: ElasticBlastConfig) -> bool: if cfg.cluster.dry_run: return False metadata_dir = os.path.join(cfg.cluster.results, ELB_METADATA_DIR) - gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.get_project_for_gcs_downloads() if cfg.cloud_provider.cloud == CSP.AWS: metadata_file = os.path.join(metadata_dir, ELB_AWS_JOB_IDS) else: @@ -208,7 +207,7 @@ def check_submit_data(query_files: List[str], cfg: ElasticBlastConfig) -> None: """ dry_run = cfg.cluster.dry_run try: - gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.get_project_for_gcs_downloads() for query_file in query_files: check_for_read(query_file, dry_run, True, gcp_prj) except FileNotFoundError: @@ -239,7 +238,7 @@ def split_query(query_files: List[str], cfg: ElasticBlastConfig) -> Tuple[List[s queries = [os.path.join(out_path, f'batch_{x:03d}.fa') for x in range(10)] logging.info(f'Splitting queries and writing batches to {out_path}') else: - gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.get_project_for_gcs_downloads() reader = FASTAReader(open_for_read_iter(query_files, gcp_prj), batch_len, out_path) query_length, queries = reader.read_and_cut() logging.info(f'{len(queries)} batches, {query_length} base/residue total') @@ -261,7 +260,7 @@ def assemble_query_file_list(cfg: ElasticBlastConfig) -> List[str]: is considered a list of files, otherwise it is a FASTA file with queries.""" msg = [] query_files = [] - gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.project + gcp_prj = None if cfg.cloud_provider.cloud == CSP.AWS else cfg.gcp.get_project_for_gcs_downloads() for query_file in cfg.blast.queries_arg.split(): if query_file.endswith(QUERY_LIST_EXT): with open_for_read(query_file, gcp_prj) as f: diff --git a/src/elastic_blast/constants.py b/src/elastic_blast/constants.py index 2146714..2b7d10a 100644 --- a/src/elastic_blast/constants.py +++ b/src/elastic_blast/constants.py @@ -97,7 +97,9 @@ class ElbExecutionMode(Enum): ELB_DFLT_USE_PREEMPTIBLE = False -ELB_DFLT_GCP_PD_SIZE = '3000G' +# Using Gi unit rather than G is safer for volume cloning and creating volumes +# from snapshots +ELB_DFLT_GCP_PD_SIZE = '3000Gi' ELB_DFLT_GCP_MACHINE_TYPE = 'n1-highmem-32' ELB_DFLT_AWS_MACHINE_TYPE = 'm5.8xlarge' @@ -148,7 +150,7 @@ class ElbExecutionMode(Enum): ELB_LOCAL_SSD_BLAST_JOB_TEMPLATE = 'resource:templates/blast-batch-job-local-ssd.yaml.template' GCS_DFLT_BUCKET = 'gs://blast-db' -GCP_APIS = ['serviceusage', 'container', 'storage-api', 'storage-component'] +GCP_APIS = ['compute', 'serviceusage', 'container', 'storage-api', 'storage-component'] # https://cloud.google.com/kubernetes-engine/docs/how-to/creating-managing-labels#requirements GCP_MAX_NUM_LABELS = 64 # https://cloud.google.com/kubernetes-engine/docs/how-to/creating-managing-labels#requirements @@ -202,10 +204,10 @@ def __str__(self): ELB_DFLT_AWS_REGION = 'us-east-1' ELB_UNKNOWN_GCP_PROJECT = 'elb-unknown-gcp-project' -ELB_DOCKER_VERSION = '1.1.1' -ELB_QS_DOCKER_VERSION = '0.1.3' -ELB_JANITOR_DOCKER_VERSION = '0.2.0' -ELB_JOB_SUBMIT_DOCKER_VERSION = '2.0.0' +ELB_DOCKER_VERSION = '1.1.3' +ELB_QS_DOCKER_VERSION = '0.1.4' +ELB_JANITOR_DOCKER_VERSION = '0.3.0' +ELB_JOB_SUBMIT_DOCKER_VERSION = '3.0.0' ELB_DOCKER_IMAGE_GCP = f'gcr.io/ncbi-sandbox-blast/ncbi/elb:{ELB_DOCKER_VERSION}' ELB_DOCKER_IMAGE_AWS = f'public.ecr.aws/ncbi-elasticblast/elasticblast-elb:{ELB_DOCKER_VERSION}' diff --git a/src/elastic_blast/elasticblast.py b/src/elastic_blast/elasticblast.py index 3467363..2905ed3 100644 --- a/src/elastic_blast/elasticblast.py +++ b/src/elastic_blast/elasticblast.py @@ -29,7 +29,7 @@ import os from abc import ABCMeta, abstractmethod from collections import defaultdict -from typing import Any, List, Tuple, Dict, DefaultDict +from typing import Any, List, Tuple, Dict, DefaultDict, Optional from .constants import ELB_QUERY_BATCH_DIR, ELB_METADATA_DIR from .filehelper import copy_to_bucket, remove_bucket_key, cleanup_temp_bucket_dirs @@ -39,7 +39,7 @@ class ElasticBlast(metaclass=ABCMeta): """ Base class for core ElasticBLAST functionality. """ - def __init__(self, cfg: ElasticBlastConfig, create=False, cleanup_stack: List[Any]=None): + def __init__(self, cfg: ElasticBlastConfig, create=False, cleanup_stack: Optional[List[Any]]=None): self.cfg = cfg self.cleanup_stack = cleanup_stack if cleanup_stack else [] self.dry_run = self.cfg.cluster.dry_run @@ -118,7 +118,7 @@ def _status_from_results(self): """ cfg = self.cfg status = ElbStatus.UNKNOWN - gcp_prj = None if cfg.aws else cfg.gcp.project + gcp_prj = None if cfg.aws else cfg.gcp.get_project_for_gcs_downloads() try: failure_file = os.path.join(cfg.cluster.results, ELB_METADATA_DIR, ELB_STATUS_FAILURE) check_for_read(failure_file, self.dry_run, gcp_prj=gcp_prj) diff --git a/src/elastic_blast/elb_config.py b/src/elastic_blast/elb_config.py index 69d8165..8522a27 100644 --- a/src/elastic_blast/elb_config.py +++ b/src/elastic_blast/elb_config.py @@ -92,6 +92,7 @@ from .util import UserReportError, safe_exec from .util import gcp_get_regions, sanitize_for_k8s from .gcp_traits import get_machine_properties as gcp_get_machine_properties +from .gcp_traits import enable_gcp_api from .aws_traits import get_machine_properties as aws_get_machine_properties from .aws_traits import get_regions as aws_get_regions from .aws_traits import create_aws_config @@ -224,9 +225,11 @@ class GCPConfig(CloudProviderBaseConfig, ConfigParserToDataclassMapper): network: Optional[str] = None subnet: Optional[str] = None user: Optional[str] = None - # FIXME: This is a temporary fix for EB-1530. gke_version should be set to - # None once the proper fix is implemented. - gke_version: Optional[str] = '1.21' + # gke_version should be set to None to use the default GKE, otherwise use a specific version supported by GKE (e.g.: 1.25) + gke_version: Optional[str] = None + # if True, GCP project will be passed to gsutil calls that download files + # from GCS and users will be charged for the downloads. + requester_pays: bool = False # mapping to class attributes to ConfigParser parameters so that objects # can be initialized from ConfigParser objects @@ -237,7 +240,8 @@ class GCPConfig(CloudProviderBaseConfig, ConfigParserToDataclassMapper): 'user': None, 'network': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_NETWORK), 'subnet': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_SUBNETWORK), - 'gke_version': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_GKE_VERSION)} + 'gke_version': ParamInfo(CFG_CLOUD_PROVIDER, CFG_CP_GCP_GKE_VERSION), + 'requester_pays': None} def __post_init__(self): self.cloud = CSP.GCP @@ -252,12 +256,21 @@ def __post_init__(self): if self.project == ELB_UNKNOWN_GCP_PROJECT: proj = get_gcp_project() self.project = GCPString(proj) + else: + self.requester_pays = True def validate(self, errors: List[str], task: ElbCommand): """Validate config""" if bool(self.network) != bool(self.subnet): errors.append('Both gcp-network and gcp-subnetwork need to be specified if one of them is specified') + def get_project_for_gcs_downloads(self) -> Optional[str]: + """Get GCP project for downloads from GCS buckets. Returns GCP project + if requester_pays is True, otherwise None. If GCP project is provided + to the gsutil call, then a user is paying for this download.""" + return str(self.project) if self.requester_pays else None + + @dataclass_json(letter_case=LetterCase.KEBAB) @dataclass class AWSConfig(CloudProviderBaseConfig, ConfigParserToDataclassMapper): @@ -441,6 +454,15 @@ def __post_init__(self): if cloud_provider == CSP.GCP: if not self.pd_size: self.pd_size = ELB_DFLT_GCP_PD_SIZE + + # This is needed for PVC cloning or creating PVs from snapshots. + # A cloned volume must be at least as large as the source volume. + # Volume sizes expressed as GB are rounded and the cloned volume + # may end up smaller than the source one. GiBs are more exact. + if not self.pd_size.endswith('i') and \ + self.pd_size[-1].lower() in ['k', 'm', 'g', 't']: + self.pd_size += 'i' + else: if not self.pd_size: self.pd_size = ELB_DFLT_AWS_PD_SIZE @@ -495,7 +517,7 @@ class AppState: """Application state values""" # The GCP persistent disk ID - disk_id: Optional[str] = None + disk_ids: List[str] = field(default_factory=list) # The kubernetes context k8s_ctx: Optional[str] = None @@ -606,6 +628,9 @@ def __init__(self, *args, **kwargs): # post-init activities + if self.cloud_provider.cloud == CSP.GCP: + enable_gcp_api(self.gcp.project, self.cluster.dry_run) + try: if self.cloud_provider.region: self.cloud_provider.region.validate(dry_run) @@ -626,9 +651,10 @@ def __init__(self, *args, **kwargs): # get database metadata if self.blast and not self.blast.db_metadata and not self.cluster.dry_run: try: - gcp_prj = None if self.cloud_provider.cloud == CSP.AWS else self.gcp.project + gcp_prj = None if self.cloud_provider.cloud == CSP.AWS else self.gcp.get_project_for_gcs_downloads() self.blast.db_metadata = get_db_metadata(self.blast.db, ElbSupportedPrograms().get_db_mol_type(self.blast.program), - self.cluster.db_source, gcp_prj=gcp_prj) + self.cluster.db_source, + gcp_prj=gcp_prj) except FileNotFoundError: # database metadata file is not mandatory for a user database (yet) EB-1308 logging.info('No database metadata') @@ -1043,7 +1069,7 @@ def create_labels(cloud_provider: CSP, results: str, blast_conf: Optional[BlastConfig], cluster_name: str, - user_provided_labels: str = None) -> str: + user_provided_labels: Optional[str] = None) -> str: """Generate labels for cloud resources""" if cloud_provider == CSP.AWS: sanitize = sanitize_aws_tag diff --git a/src/elastic_blast/filehelper.py b/src/elastic_blast/filehelper.py index 919c369..0605cc0 100644 --- a/src/elastic_blast/filehelper.py +++ b/src/elastic_blast/filehelper.py @@ -45,6 +45,7 @@ import boto3 # type: ignore from botocore.exceptions import ClientError # type: ignore from botocore.config import Config # type: ignore +from boto3.s3.transfer import TransferConfig # type: ignore from .base import QuerySplittingResults from .util import safe_exec, SafeExecError from .constants import ELB_GCP_BATCH_LIST, ELB_METADATA_DIR, ELB_QUERY_LENGTH, ELB_QUERY_BATCH_DIR @@ -52,7 +53,7 @@ from .constants import ELB_QUERY_BATCH_FILE_PREFIX -def harvest_query_splitting_results(bucket_name: str, dry_run: bool = False, boto_cfg: Config = None) -> QuerySplittingResults: +def harvest_query_splitting_results(bucket_name: str, dry_run: bool = False, boto_cfg: Config = None, gcp_project: Optional[str] = None) -> QuerySplittingResults: """ Retrieves the results for query splitting from bucket, used in 2-stage cloud query splitting """ qlen = 0 @@ -75,10 +76,10 @@ def harvest_query_splitting_results(bucket_name: str, dry_run: bool = False, bot query_batches.append(os.path.join(ELB_S3_PREFIX, s3_bucket.name, obj.key)) elif bucket_name.startswith(ELB_GCS_PREFIX): qlen_file = os.path.join(bucket_name, ELB_METADATA_DIR, ELB_QUERY_LENGTH) - with open_for_read(qlen_file) as ql: + with open_for_read(qlen_file, gcp_project) as ql: qlen = int(ql.read()) qbatch_list_file = os.path.join(bucket_name, ELB_METADATA_DIR, ELB_GCP_BATCH_LIST) - with open_for_read(qbatch_list_file) as qlist: + with open_for_read(qbatch_list_file, gcp_project) as qlist: for line in qlist: query_batches.append(line.strip()) else: @@ -246,6 +247,7 @@ def open_for_write_immediate(fname): elif fname.startswith(ELB_S3_PREFIX): f = io.TextIOWrapper(buffer=io.BytesIO(), encoding='utf-8') s3 = boto3.resource('s3') + trans_conf = TransferConfig(multipart_threshold=1024*25, max_concurrency=10, multipart_chunksize=1024*25, use_threads=True) else: f = open(fname, 'w') @@ -265,11 +267,13 @@ def open_for_write_immediate(fname): bufsize = buffer.getbuffer().nbytes logging.debug(f'Attempting to stream {bufsize} bytes to {fname}') + start = timer() bucket, key = parse_bucket_name_key(fname) obj = s3.Object(bucket, key) - obj.upload_fileobj(buffer) + obj.upload_fileobj(buffer, Config=trans_conf) buffer.close() - logging.debug(f'Uploaded {fname}') + end = timer() + logging.debug(f'Uploaded {fname} in {end - start:.2f} seconds') def open_for_write(fname): @@ -372,9 +376,8 @@ def check_for_read(fname: str, dry_run : bool = False, print_file_size: bool = F if is_stdin(fname): return if fname.startswith(ELB_GCS_PREFIX): - if not gcp_prj: - raise ValueError(f'elastic_blast.filehelper.check_for_read is missing the gcp_prj parameter') - cmd = f'gsutil -u {gcp_prj} stat {fname}' if print_file_size else f'gsutil -u {gcp_prj} -q stat {fname}' + prj = f'-u {gcp_prj}' if gcp_prj else '' + cmd = f'gsutil {prj} stat {fname}' if print_file_size else f'gsutil {prj} -q stat {fname}' if dry_run: logging.info(cmd) return @@ -430,9 +433,8 @@ def get_length(fname: str, dry_run: bool = False, gcp_prj: Optional[str] = None) raises FileNotFoundError if there is no such file """ if fname.startswith(ELB_GCS_PREFIX): - if not gcp_prj: - raise ValueError(f'elastic_blast.filehelper.get_length is missing the gcp_prj parameter') - cmd = f'gsutil -u {gcp_prj} stat {fname}' + prj = f'-u {gcp_prj}' if gcp_prj else '' + cmd = f'gsutil {prj} stat {fname}' if dry_run: logging.info(cmd) return 10000 # Arbitrary fake length @@ -481,9 +483,9 @@ def open_for_read(fname: str, gcp_prj: Optional[str] = None): binary = gzipped or tarred mode = 'rb' if binary else 'rt' if fname.startswith(ELB_GCS_PREFIX): - if not gcp_prj: - raise ValueError(f'elastic_blast.filehelper.open_for_read is missing the gcp_prj parameter') - proc = subprocess.Popen(['gsutil', '-u', gcp_prj, 'cat', fname], + prj = f'-u {gcp_prj}' if gcp_prj else '' + cmd = f'gsutil {prj} cat {fname}' + proc = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=not binary) diff --git a/src/elastic_blast/gcp.py b/src/elastic_blast/gcp.py index 4e0d7e6..d954b32 100644 --- a/src/elastic_blast/gcp.py +++ b/src/elastic_blast/gcp.py @@ -42,13 +42,13 @@ from .subst import substitute_params -from .filehelper import open_for_write_immediate, open_for_read +from .filehelper import open_for_write_immediate from .jobs import read_job_template, write_job_files from .util import ElbSupportedPrograms, safe_exec, UserReportError, SafeExecError from .util import validate_gcp_disk_name, get_blastdb_info, get_usage_reporting from . import kubernetes -from .constants import CLUSTER_ERROR, ELB_NUM_JOBS_SUBMITTED, GCP_APIS, ELB_METADATA_DIR, K8S_JOB_SUBMIT_JOBS +from .constants import CLUSTER_ERROR, ELB_NUM_JOBS_SUBMITTED, ELB_METADATA_DIR, K8S_JOB_SUBMIT_JOBS from .constants import ELB_STATE_DISK_ID_FILE, DEPENDENCY_ERROR from .constants import ELB_QUERY_BATCH_DIR, ELB_DFLT_MIN_NUM_NODES from .constants import K8S_JOB_CLOUD_SPLIT_SSD, K8S_JOB_INIT_PV @@ -62,10 +62,11 @@ from .constants import STATUS_MESSAGE_ERROR from .elb_config import ElasticBlastConfig from .elasticblast import ElasticBlast +from .gcp_traits import enable_gcp_api class ElasticBlastGcp(ElasticBlast): """ Implementation of core ElasticBLAST functionality in GCP. """ - def __init__(self, cfg: ElasticBlastConfig, create=False, cleanup_stack: List[Any]=None): + def __init__(self, cfg: ElasticBlastConfig, create=False, cleanup_stack: Optional[List[Any]]=None): super().__init__(cfg, create, cleanup_stack) self.query_files: List[str] = [] self.cluster_initialized = False @@ -174,6 +175,22 @@ def submit(self, query_batches: List[str], query_length, one_stage_cloud_query_s safe_exec(cmd) logging.info('Done enabling autoscaling') + if not self.cfg.cluster.use_local_ssd: + if not self.cfg.appstate.k8s_ctx: + raise RuntimeError('K8s context not set') + kubernetes.wait_for_pvc(self.cfg.appstate.k8s_ctx, 'blast-dbs-pvc') + # save persistent disk id + disk_ids = kubernetes.get_persistent_disks(self.cfg.appstate.k8s_ctx) + logging.debug(f'New persistent disk id: {disk_ids}') + self.cfg.appstate.disk_ids += disk_ids + dest = os.path.join(self.cfg.cluster.results, ELB_METADATA_DIR, + ELB_STATE_DISK_ID_FILE) + with open_for_write_immediate(dest) as f: + f.write(json.dumps(self.cfg.appstate.disk_ids)) + + kubernetes.label_persistent_disk(self.cfg, 'blast-dbs-pvc') + kubernetes.delete_volume_snapshots(self.cfg.appstate.k8s_ctx) + self.cleanup_stack.clear() self.cleanup_stack.append(lambda: kubernetes.collect_k8s_logs(self.cfg)) @@ -305,7 +322,7 @@ def _job_status_by_app(self, app): def delete(self): - enable_gcp_api(self.cfg) + enable_gcp_api(self.cfg.gcp.project, self.cfg.cluster.dry_run) delete_cluster_with_cleanup(self.cfg) def _initialize_cluster(self): @@ -316,7 +333,7 @@ def _initialize_cluster(self): disk_quota = disk_limit - disk_usage if pd_size > disk_quota: raise UserReportError(INPUT_ERROR, f'Requested disk size {pd_size}G is larger than allowed ({disk_quota}G) for region {cfg.gcp.region}\n' - 'Please adjust parameter [cluster] pd-size to less than {disk_quota}G, run your request in another region, or\n' + f'Please adjust parameter [cluster] pd-size to less than {disk_quota}G, run your request in another region, or\n' 'request a disk quota increase (see https://cloud.google.com/compute/quotas)') logging.info('Starting cluster') clean_up_stack.append(lambda: logging.debug('Before creating cluster')) @@ -383,8 +400,8 @@ def job_substitutions(self) -> Dict[str, str]: """ Prepare substitution dictionary for job generation """ cfg = self.cfg usage_reporting = get_usage_reporting() - - db, _, db_label = get_blastdb_info(cfg.blast.db, cfg.gcp.project) + db, _, db_label = get_blastdb_info(cfg.blast.db, + cfg.gcp.get_project_for_gcs_downloads()) blast_program = cfg.blast.program @@ -475,7 +492,7 @@ def get_disk_quota(self) -> Tuple[float, float]: def _enable_gcp_apis(self) -> None: """ Enables GCP APIs only once per object initialization """ if not self.apis_enabled: - enable_gcp_api(self.cfg) + enable_gcp_api(self.cfg.gcp.project, self.cfg.cluster.dry_run) self.apis_enabled = True def _get_gke_credentials(self) -> str: @@ -484,27 +501,6 @@ def _get_gke_credentials(self) -> str: self.cfg.appstate.k8s_ctx = get_gke_credentials(self.cfg) return self.cfg.appstate.k8s_ctx -def enable_gcp_api(cfg: ElasticBlastConfig): - """ Enable GCP APIs if they are not already enabled - parameters: - cfg: configuration object - raises: - SafeExecError if there is an error checking or trying to enable APIs - """ - dry_run = cfg.cluster.dry_run - for api in GCP_APIS: - cmd = 'gcloud services list --enabled --format=value(config.name) ' - cmd += f'--filter=config.name={api}.googleapis.com ' - cmd += f'--project {cfg.gcp.project}' - if dry_run: - logging.info(cmd) - else: - p = safe_exec(cmd) - if not p.stdout: - cmd = f'gcloud services enable {api}.googleapis.com ' - cmd += f'--project {cfg.gcp.project}' - p = safe_exec(cmd) - def set_gcp_project(project: str) -> None: """Set current GCP project in gcloud environment, raises @@ -555,9 +551,9 @@ def delete_disk(name: str, cfg: ElasticBlastConfig) -> None: def _get_pd_id(cfg: ElasticBlastConfig) -> List[str]: """ Try to get the GCP persistent disk ID from elastic-blast records""" retval = list() - if cfg.appstate.disk_id: - retval = [cfg.appstate.disk_id] - logging.debug(f'GCP disk ID {retval[0]}') + if cfg.appstate.disk_ids: + retval = cfg.appstate.disk_ids + logging.debug(f'GCP disk ID {retval}') # no need to get disk id from GS if we already have it return retval @@ -572,17 +568,18 @@ def _get_pd_id(cfg: ElasticBlastConfig) -> List[str]: cmd = f'gsutil -q cat {disk_id_on_gcs}' try: p = safe_exec(cmd) - gcp_disk_id = p.stdout.decode().strip('\n') + gcp_disk_ids = json.loads(p.stdout.decode()) err = p.stderr.decode() - if gcp_disk_id: - logging.debug(f"Retrieved GCP disk ID {gcp_disk_id} from {disk_id_on_gcs}") + if gcp_disk_ids: + logging.debug(f"Retrieved GCP disk IDs {gcp_disk_ids} from {disk_id_on_gcs}") try: - validate_gcp_disk_name(gcp_disk_id) + for disk_id in gcp_disk_ids: + validate_gcp_disk_name(disk_id) except ValueError: - logging.error(f'GCP disk ID "{gcp_disk_id}" retrieved from {disk_id_on_gcs} is invalid.') + logging.error(f'GCP disk ID "{disk_id}" retrieved from {disk_id_on_gcs} is invalid.') gcp_disk_id = '' else: - retval.append(gcp_disk_id) + retval += gcp_disk_ids else: raise RuntimeError('Persistent disk id stored in GS is empty') except Exception as e: diff --git a/src/elastic_blast/gcp_traits.py b/src/elastic_blast/gcp_traits.py index 73fa8c7..f67e8e9 100644 --- a/src/elastic_blast/gcp_traits.py +++ b/src/elastic_blast/gcp_traits.py @@ -24,8 +24,10 @@ Author: Victor Joukov joukovv@ncbi.nlm.nih.gov """ -import re +import re, logging from .base import InstanceProperties +from .util import safe_exec +from .constants import GCP_APIS GCP_MACHINES = { "n1-standard" : 3.75, @@ -60,3 +62,25 @@ def get_machine_properties(machineType: str) -> InstanceProperties: err = f'Cannot get properties for {machineType}' raise NotImplementedError(err) return InstanceProperties(ncpu, nram) + + +def enable_gcp_api(project: str, dry_run: bool): + """ Enable GCP APIs if they are not already enabled + parameters: + project: GCP project + dry_run: True for dry run + raises: + SafeExecError if there is an error checking or trying to enable APIs + """ + for api in GCP_APIS: + cmd = 'gcloud services list --enabled --format=value(config.name) ' + cmd += f'--filter=config.name={api}.googleapis.com ' + cmd += f'--project {project}' + if dry_run: + logging.info(cmd) + else: + p = safe_exec(cmd) + if not p.stdout: + cmd = f'gcloud services enable {api}.googleapis.com ' + cmd += f'--project {project}' + p = safe_exec(cmd) diff --git a/src/elastic_blast/jobs.py b/src/elastic_blast/jobs.py index 39bffd6..38dbc53 100644 --- a/src/elastic_blast/jobs.py +++ b/src/elastic_blast/jobs.py @@ -30,13 +30,14 @@ import re from typing import List from pkg_resources import resource_string +from typing import Optional from .filehelper import open_for_read, open_for_write from .subst import substitute_params from .constants import ELB_DFLT_BLAST_JOB_TEMPLATE, ELB_LOCAL_SSD_BLAST_JOB_TEMPLATE from .elb_config import ElasticBlastConfig -def read_job_template(template_name=ELB_DFLT_BLAST_JOB_TEMPLATE, cfg: ElasticBlastConfig = None): +def read_job_template(template_name=ELB_DFLT_BLAST_JOB_TEMPLATE, cfg: Optional[ElasticBlastConfig] = None): """ Read job template file or resource Parameters: template_name - name of file to read or default resource diff --git a/src/elastic_blast/kubernetes.py b/src/elastic_blast/kubernetes.py index 7a54af6..68e32de 100644 --- a/src/elastic_blast/kubernetes.py +++ b/src/elastic_blast/kubernetes.py @@ -48,7 +48,8 @@ from .constants import ELB_CJS_DOCKER_IMAGE_GCP from .constants import ElbExecutionMode, ELB_JANITOR_SCHEDULE from .constants import ELB_DFLT_JANITOR_SCHEDULE_GCP, PERMISSIONS_ERROR, DEPENDENCY_ERROR -from .filehelper import upload_file_to_gcs +from .constants import CLUSTER_ERROR +from .filehelper import open_for_write_immediate from .elb_config import ElasticBlastConfig @@ -112,7 +113,7 @@ def get_persistent_disks(k8s_ctx: str, dry_run: bool = False) -> List[str]: p = safe_exec(cmd) if p.stdout: pds = json.loads(p.stdout.decode()) - return [i['spec']['gcePersistentDisk']['pdName'] for i in pds['items']] + return [i['spec']['csi']['volumeHandle'].split('/')[-1] for i in pds['items']] return list() @@ -179,7 +180,8 @@ def delete_all(k8s_ctx: str, dry_run: bool = False) -> List[str]: commands1 = [f'kubectl --context={k8s_ctx} delete jobs --ignore-not-found=true -l app=setup', f'kubectl --context={k8s_ctx} delete jobs --ignore-not-found=true -l app=blast'] commands2 = [f'kubectl --context={k8s_ctx} delete pvc --all --force=true', - f'kubectl --context={k8s_ctx} delete pv --all --force=true'] + f'kubectl --context={k8s_ctx} delete pv --all --force=true', + f'kubectl --context={k8s_ctx} delete volumesnapshots --all --ignore-not-found=true --force=true'] def run_commands(commands: List[str], dry_run: bool) -> List[str]: """ Run the commands in the argument list and return the names of the relevant k8s objects. @@ -246,6 +248,19 @@ def inspect_storage_objects_for_debugging(k8s_ctx: str, dry_run: bool = False): return deleted1 + deleted2 +def delete_volume_snapshots(k8s_ctx: str, dry_run: bool = False): + """Delete all volume snapshots associated with the kubernetes cluster""" + # We are not using --force=true here to do a graceful deletion. Volume + # snapshot does not need to wait for any pod or job to be deleted and it + # is fine if deletion takes some time. --ignore-not-found defaults to true + # if --all is used. + cmd = f'kubectl --context={k8s_ctx} delete volumesnapshot --all' + if dry_run: + logging.info(cmd) + return + safe_exec(cmd) + + def get_jobs(k8s_ctx: str, selector: Optional[str] = None, dry_run: bool = False) -> List[str]: """Return a list of kubernetes jobs @@ -344,6 +359,94 @@ def _ensure_successful_job(k8s_ctx: str, k8s_job_file: pathlib.Path, dry_run: bo raise RuntimeError(f'{k8s_job_file} failed: {p.stderr.decode()}') +def _snapshot_ready(k8s_ctx: str, k8s_spec_file: pathlib.Path, dry_run: bool = False) -> bool: + """Check whether persistent volume snapshot is ready + Parameters: + k8s_ctx: Kubernetes context + k8s_spec_file: Kubernetex spec file for the volume snapshot + dry_run: Dry run if true + + Returns: + True if the snapshot is ready""" + if not k8s_spec_file.exists(): + raise FileNotFoundError(str(k8s_spec_file)) + + cmd = f'kubectl --context={k8s_ctx} get -f {k8s_spec_file} -o json' + + if dry_run: + logging.info(cmd) + return True + + p = safe_exec(cmd) + if not p.stdout: + return False + + json_output = json.loads(p.stdout.decode()) + if 'status' not in json_output or 'readyToUse' not in json_output['status']: + return False + + if not isinstance(json_output['status']['readyToUse'], bool): + raise UserReportError(returncode=CLUSTER_ERROR, message='Unexpected response when checking PVC snapshot readiness') + + return json_output['status']['readyToUse'] + + +def _wait_for_snapshot(k8s_ctx: str, spec_file: pathlib.Path, attempts: int = 30, secs2wait: int = 20, dry_run: bool = False) -> None: + """Wait for the PVC snapshot to be ready or raise a TimeoutError after + specified number of attempts. + Parameters: + k8s_ctx: Kubernetes context + k8s_spec_file: Kubernetex spec file for the volume snapshot + attempts: Numnber of attempts + secs2wait: Time between attempts + dry_run: Dry run if true""" + for counter in range(attempts): + if _snapshot_ready(k8s_ctx, spec_file, dry_run): + break + time.sleep(secs2wait) + else: + raise TimeoutError(f'{spec_file} timed out') + + +def _pvc_bound(k8s_ctx: str, pvc_name: str, dry_run: bool = False) -> bool: + """Check whether a persistent volume claim is bound to a kubernetes node + Parameters: + k8s_ctx: Kubernetes context + pvc_name: PVC name + dry_run: Dry run if true + + Returns: + True if the snapshot is ready""" + + cmd = f'kubectl --context={k8s_ctx} get pvc {pvc_name} -o json' + + if dry_run: + logging.info(cmd) + return True + + p = safe_exec(cmd) + if not p.stdout: + return False + + json_output = json.loads(p.stdout.decode()) + if 'status' not in json_output or 'phase' not in json_output['status']: + return False + + logging.debug(f"PVC {pvc_name} status: {json_output['status']['phase']}") + return json_output['status']['phase'] == 'Bound' + + +def wait_for_pvc(k8s_ctx: str, pvc_name: str, attempts: int = 30, secs2wait: int = 20, dry_run: bool = False) -> None: + """Wait for the persistent volume claim to be bound to an instance. A bound + PVC means that a persistent disk has been created.""" + for counter in range(attempts): + if _pvc_bound(k8s_ctx, pvc_name, dry_run): + break + time.sleep(secs2wait) + else: + raise TimeoutError(f'Waiting for PVC {pvc_name} timed out') + + def initialize_storage(cfg: ElasticBlastConfig, query_files: List[str] = [], wait=ElbExecutionMode.WAIT) -> None: """ Initialize storage for ElasticBLAST cluster """ use_local_ssd = cfg.cluster.use_local_ssd @@ -351,12 +454,11 @@ def initialize_storage(cfg: ElasticBlastConfig, query_files: List[str] = [], wai initialize_local_ssd(cfg, query_files, wait) else: initialize_persistent_disk(cfg, query_files, wait) - label_persistent_disk(cfg) def initialize_local_ssd(cfg: ElasticBlastConfig, query_files: List[str] = [], wait=ElbExecutionMode.WAIT) -> None: """ Initialize local SSDs for ElasticBLAST cluster """ - db, db_path, _ = get_blastdb_info(cfg.blast.db, cfg.gcp.project) + db, db_path, _ = get_blastdb_info(cfg.blast.db, cfg.gcp.get_project_for_gcs_downloads()) if not db: raise ValueError("Config parameter 'db' can't be empty") dry_run = cfg.cluster.dry_run @@ -395,9 +497,11 @@ def initialize_local_ssd(cfg: ElasticBlastConfig, query_files: List[str] = [], w program = cfg.blast.program job_init_template = 'job-init-local-ssd.yaml.template' taxdb_path = '' + gcp_project = cfg.gcp.get_project_for_gcs_downloads() + prj = f'--gcp-project ${gcp_project}' if gcp_project else '' if db_path: # Custom database - taxdb_path = gcp_get_blastdb_latest_path(cfg.gcp.project) + '/taxdb.*' + taxdb_path = gcp_get_blastdb_latest_path(cfg.gcp.get_project_for_gcs_downloads()) + '/taxdb.*' subs = { 'ELB_DB': db, 'ELB_DB_PATH': db_path, @@ -412,7 +516,8 @@ def initialize_local_ssd(cfg: ElasticBlastConfig, query_files: List[str] = [], w 'K8S_JOB_SUBMIT_JOBS' : K8S_JOB_SUBMIT_JOBS, 'K8S_JOB_BLAST' : K8S_JOB_BLAST, 'K8S_JOB_RESULTS_EXPORT' : K8S_JOB_RESULTS_EXPORT, - 'TIMEOUT': str(init_blastdb_minutes_timeout*60) + 'TIMEOUT': str(init_blastdb_minutes_timeout*60), + 'GCP_PROJECT_OPT' : prj } logging.debug(f"Initializing local SSD: {ELB_DOCKER_IMAGE_GCP}") with TemporaryDirectory() as d: @@ -484,7 +589,8 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = """ # ${LOGDATETIME} setup_pd start >>${ELB_LOGFILE} - db, db_path, _ = get_blastdb_info(cfg.blast.db, cfg.gcp.project) + db, db_path, _ = get_blastdb_info(cfg.blast.db, + cfg.gcp.get_project_for_gcs_downloads()) if not db: raise ValueError("Config parameter 'db' can't be empty") cluster_name = cfg.cluster.name @@ -494,7 +600,7 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = taxdb_path = '' if db_path: # Custom database - taxdb_path = gcp_get_blastdb_latest_path(cfg.gcp.project) + '/taxdb.*' + taxdb_path = gcp_get_blastdb_latest_path(cfg.gcp.get_project_for_gcs_downloads()) + '/taxdb.*' results_bucket = cfg.cluster.results dry_run = cfg.cluster.dry_run @@ -523,6 +629,8 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = raise RuntimeError(f'kubernetes context is missing for "{cluster_name}"') init_blastdb_minutes_timeout = cfg.timeouts.init_pv + gcp_project = cfg.gcp.get_project_for_gcs_downloads() + prj = f'--gcp-project {gcp_project}' if gcp_project else '' subs = { # For cloud query split @@ -545,7 +653,8 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = # Container names 'K8S_JOB_GET_BLASTDB' : K8S_JOB_GET_BLASTDB, 'K8S_JOB_IMPORT_QUERY_BATCHES' : K8S_JOB_IMPORT_QUERY_BATCHES, - 'TIMEOUT': str(init_blastdb_minutes_timeout*60) + 'TIMEOUT': str(init_blastdb_minutes_timeout*60), + 'GCP_PROJECT_OPT' : prj } logging.debug(f"Initializing persistent volume: {ELB_DOCKER_IMAGE_GCP} {ELB_QS_DOCKER_IMAGE_GCP}") @@ -558,9 +667,9 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = else: safe_exec(cmd) - pvc_yaml = os.path.join(d, 'pvc.yaml') + pvc_yaml = os.path.join(d, 'pvc-rwo.yaml') with open(pvc_yaml, 'wt') as f: - f.write(substitute_params(resource_string('elastic_blast', 'templates/pvc.yaml.template').decode(), subs)) + f.write(substitute_params(resource_string('elastic_blast', 'templates/pvc-rwo.yaml.template').decode(), subs)) cmd = f"kubectl --context={k8s_ctx} apply -f {pvc_yaml}" if dry_run: logging.info(cmd) @@ -578,31 +687,25 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = safe_exec(cmd) # wait for the disk to be provisioned - time.sleep(5) - - # save persistent disk id so that it can be deleted on clean up + try: + wait_for_pvc(k8s_ctx, 'blast-dbs-pvc-rwo', dry_run=dry_run) + except TimeoutError: + logging.warning('Timed out waiting for PVC to bind') disks = get_persistent_disks(k8s_ctx, dry_run) - nretries = ELB_K8S_JOB_SUBMISSION_MAX_RETRIES - while nretries and not dry_run and not disks: - time.sleep(5) - disks = get_persistent_disks(k8s_ctx, dry_run) - nretries -= 1 if disks: logging.debug(f'GCP disk IDs {disks}') - logging.debug(f'Disk id(s) got in {(ELB_K8S_JOB_SUBMISSION_MAX_RETRIES+1)-nretries} tries') - cfg.appstate.disk_id = disks[0] - disk_id_file = os.path.join(d, ELB_STATE_DISK_ID_FILE) - with open(disk_id_file, 'w') as f: - for d in disks: - print(d, file=f) + cfg.appstate.disk_ids += disks dest = os.path.join(cfg.cluster.results, ELB_METADATA_DIR, ELB_STATE_DISK_ID_FILE) - upload_file_to_gcs(disk_id_file, dest, dry_run) + with open_for_write_immediate(dest) as f: + f.write(json.dumps(cfg.appstate.disk_ids)) elif not dry_run: logging.error('Failed to get disk ID') if wait != ElbExecutionMode.WAIT: return + label_persistent_disk(cfg, 'blast-dbs-pvc-rwo') + _wait_for_job(k8s_ctx, job_init_pv, init_blastdb_minutes_timeout, dry_run=dry_run) end = timer() @@ -624,17 +727,61 @@ def initialize_persistent_disk(cfg: ElasticBlastConfig, query_files: List[str] = secs2sleep = int(os.getenv('ELB_PAUSE_AFTER_INIT_PV', str(ELB_PAUSE_AFTER_INIT_PV))) time.sleep(secs2sleep) + # PVC snapshot + logging.debug('Creating PVC snapshot') + start = timer() + snapshot_class = resource_filename('elastic_blast', 'templates/volume-snapshot-class.yaml') + cmd = f"kubectl --context={k8s_ctx} apply -f {snapshot_class}" + if dry_run: + logging.info(cmd) + else: + safe_exec(cmd) + + snapshot = resource_filename('elastic_blast', 'templates/volume-snapshot.yaml') + cmd = f"kubectl --context={k8s_ctx} apply -f {snapshot}" + if dry_run: + logging.info(cmd) + else: + safe_exec(cmd) + + # wair until snapshot is ready + _wait_for_snapshot(k8s_ctx, pathlib.Path(snapshot), dry_run=dry_run) + end = timer() + logging.debug(f'PVC snapshot created and ready in {end - start:.2f} seconds') + + # delete the persistent disk + logging.debug('Deleting writable persistent disk') + cmd = f'kubectl --context={k8s_ctx} delete -f {pvc_yaml}' + if dry_run: + logging.info(cmd) + else: + safe_exec(cmd) + + # Create a new ReadOnlyMany PVC + logging.debug('Creating ReadOnlyMany PVC from snapshot') + cloned_pvc_yaml = os.path.join(d, 'pvc-rom.yaml') + with open(cloned_pvc_yaml, 'wt') as f: + f.write(substitute_params(resource_string('elastic_blast', 'templates/pvc-rom.yaml.template').decode(), subs)) + cmd = f"kubectl --context={k8s_ctx} apply -f {cloned_pvc_yaml}" + if dry_run: + logging.info(cmd) + else: + safe_exec(cmd) + + +def label_persistent_disk(cfg: ElasticBlastConfig, pv_claim: str) -> None: + """Label disk with given claim with standard labels + Arguments: + cfg: ElasticBlastConfig object + pv_claim: Name of PersistentVolumeClaim""" -def label_persistent_disk(cfg: ElasticBlastConfig) -> None: use_local_ssd = cfg.cluster.use_local_ssd if use_local_ssd: return dry_run = cfg.cluster.dry_run cluster_name = cfg.cluster.name - # Label disk with given claim with standard labels - pv_claim = 'blast-dbs-pvc' labels = cfg.cluster.labels - get_pv_cmd = f'kubectl --context={cfg.appstate.k8s_ctx} get pv -o custom-columns=CLAIM:.spec.claimRef.name,PDNAME:.spec.gcePersistentDisk.pdName' + get_pv_cmd = f'kubectl --context={cfg.appstate.k8s_ctx} get pv -o custom-columns=CLAIM:.spec.claimRef.name,PDNAME:.spec.csi.volumeHandle' if dry_run: logging.info(get_pv_cmd) pd_name = f'disk_name_with_claim_{pv_claim}' @@ -647,7 +794,7 @@ def label_persistent_disk(cfg: ElasticBlastConfig) -> None: if len(parts) < 2: continue if parts[0] == pv_claim: - pd_name = parts[1] + pd_name = parts[1].split('/')[-1] if not pd_name: logging.debug(f'kubectl get pv returned\n{output}') raise LookupError(f"Disk with claim '{pv_claim}' can't be found in cluster '{cluster_name}'") @@ -783,8 +930,11 @@ def submit_job_submission_job(cfg: ElasticBlastConfig): 'ELB_GCP_ZONE' : cfg.gcp.zone, 'ELB_RESULTS' : cfg.cluster.results, 'ELB_CLUSTER_NAME' : cfg.cluster.name, + 'ELB_PD_SIZE' : cfg.cluster.pd_size, + 'ELB_LABELS' : cfg.cluster.labels, # For autoscaling 'ELB_NUM_NODES' : str(cfg.cluster.num_nodes), + 'ELB_USE_LOCAL_SSD': str(cfg.cluster.use_local_ssd).lower() } logging.debug(f"Submitting job submission job: {ELB_CJS_DOCKER_IMAGE_GCP}") with TemporaryDirectory() as d: diff --git a/src/elastic_blast/split.py b/src/elastic_blast/split.py index a9f42c7..a9834d6 100644 --- a/src/elastic_blast/split.py +++ b/src/elastic_blast/split.py @@ -28,6 +28,8 @@ import os import io +import logging +from timeit import default_timer as timer from .filehelper import open_for_write, get_error from typing import Union, List, Iterable, TextIO, Tuple from .constants import ELB_QUERY_BATCH_FILE_PREFIX @@ -102,6 +104,7 @@ def read_and_cut(self) -> Tuple[int, List[str]]: Return the total number of bases/residues in the input and list of query files written """ + start = timer() nline = 0 for f in self.file: for line in f: @@ -116,6 +119,8 @@ def read_and_cut(self) -> Tuple[int, List[str]]: self.seq_buffer.append('\n') self.process_new_sequence() self.process_chunk() + end = timer() + logging.debug(f'Splitting: {end - start:.2f} seconds') if not nline: error = get_error(f) if error: diff --git a/src/elastic_blast/templates/blast-batch-job-local-ssd.yaml.template b/src/elastic_blast/templates/blast-batch-job-local-ssd.yaml.template index de5a88c..94ba53e 100644 --- a/src/elastic_blast/templates/blast-batch-job-local-ssd.yaml.template +++ b/src/elastic_blast/templates/blast-batch-job-local-ssd.yaml.template @@ -34,7 +34,7 @@ spec: gsutil -mq cp ${ELB_RESULTS}/query_batches/batch_${JOB_NUM}.fa /shared/requests; containers: - name: ${K8S_JOB_BLAST} - image: gcr.io/ncbi-sandbox-blast/ncbi/blast:latest + image: ${ELB_DOCKER_IMAGE} workingDir: /blast/blastdb resources: requests: diff --git a/src/elastic_blast/templates/elastic-blast-cf.yaml b/src/elastic_blast/templates/elastic-blast-cf.yaml index f214d9e..1d8656d 100644 --- a/src/elastic_blast/templates/elastic-blast-cf.yaml +++ b/src/elastic_blast/templates/elastic-blast-cf.yaml @@ -51,7 +51,7 @@ Parameters: DiskType: Description: EBS volume disk type attached to instance Type: String - Default: gp2 + Default: gp3 ProvisionedIops: Description: Provisioned IOPS for EBS volume @@ -796,7 +796,7 @@ Resources: Type: AWS::IAM::Role Condition: CreateEcsInstanceRole Properties: - RoleName: !Join [-, ['elasticblast-ecs-instance-role', !Ref Owner, !Ref RandomToken]] + RoleName: !Join [-, [elasticblast-e, !Ref Owner, !Ref RandomToken]] Description: Role for ECS instances, created by elastic-blast AssumeRolePolicyDocument: Version: 2012-10-17 @@ -821,7 +821,7 @@ Resources: Type: AWS::IAM::Role Condition: CreateBatchServiceRole Properties: - RoleName: !Join [-, ['elasticblast-batch-service-role', !Ref Owner, !Ref RandomToken]] + RoleName: !Join [-, [elasticblast-b, !Ref Owner, !Ref RandomToken]] Description: Role for AWS Batch service, created by elastic-blast Path: /service-role/ AssumeRolePolicyDocument: @@ -847,7 +847,7 @@ Resources: Type: AWS::IAM::Role Condition: CreateJobRole Properties: - RoleName: !Join [-, ['elasticblast-job-role', !Ref Owner, !Ref RandomToken]] + RoleName: !Join [-, [elasticblast-j, !Ref Owner, !Ref RandomToken]] Description: Role allowing elastic-blast jobs S3 access AssumeRolePolicyDocument: Version: 2012-10-17 @@ -876,7 +876,7 @@ Resources: Type: AWS::IAM::Role Condition: CreateSpotFleetRole Properties: - RoleName: !Join [-, ['elasticblast-spot-fleet-role', !Ref Owner, !Ref RandomToken]] + RoleName: !Join [-, [elasticblast-s, !Ref Owner, !Ref RandomToken]] Description: Role for AWS Batch Spot Fleet, created by elastic-blast Path: /service-role/ AssumeRolePolicyDocument: diff --git a/src/elastic_blast/templates/job-init-local-ssd.yaml.template b/src/elastic_blast/templates/job-init-local-ssd.yaml.template index 54ab096..df7bf78 100644 --- a/src/elastic_blast/templates/job-init-local-ssd.yaml.template +++ b/src/elastic_blast/templates/job-init-local-ssd.yaml.template @@ -33,8 +33,8 @@ spec: sleep 30; log() { ts=`date +'%F %T'`; printf '%s RUNTIME %s %f seconds\n' "$ts" "$1" "$2"; }; if [ -z '${ELB_DB_PATH}' ]; then - echo update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; - update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; + echo update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; + update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; exit_code=$?; else echo gsutil -mq cp ${ELB_DB_PATH} .; @@ -44,8 +44,8 @@ spec: [ -f ${ELB_DB}.tar.gz ] && tar xzf ${ELB_DB}.tar.gz; [ -f ${ELB_DB}.tar.gz ] && rm ${ELB_DB}.tar.gz; fi; - echo update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; - update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; + echo update_blastdb.pl taxdb --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; + update_blastdb.pl taxdb --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; end=`date +%s`; log "download-blastdbs" $(($end-$start)); [ $exit_code -eq 0 ] || exit $exit_code; diff --git a/src/elastic_blast/templates/job-init-pv.yaml.template b/src/elastic_blast/templates/job-init-pv.yaml.template index f2f318e..382d3b4 100644 --- a/src/elastic_blast/templates/job-init-pv.yaml.template +++ b/src/elastic_blast/templates/job-init-pv.yaml.template @@ -14,7 +14,7 @@ spec: volumes: - name: blastdb persistentVolumeClaim: - claimName: blast-dbs-pvc + claimName: blast-dbs-pvc-rwo readOnly: false containers: - name: ${K8S_JOB_GET_BLASTDB} @@ -33,8 +33,8 @@ spec: start=`date +%s`; log() { ts=`date +'%F %T'`; printf '%s RUNTIME %s %f seconds\n' "$ts" "$1" "$2"; }; if [ -z '${ELB_DB_PATH}' ]; then - echo update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; - update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose; + echo update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; + update_blastdb.pl ${ELB_DB} --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; exit_code=$?; [ $exit_code -eq 0 ] || exit $exit_code; else @@ -45,8 +45,8 @@ spec: [ -f ${ELB_DB}.tar.gz ] && tar xzf ${ELB_DB}.tar.gz; [ -f ${ELB_DB}.tar.gz ] && rm ${ELB_DB}.tar.gz; fi; - echo update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; - update_blastdb.pl taxdb --decompress --source NCBI --verbose --verbose --verbose --verbose --verbose --verbose; + echo update_blastdb.pl taxdb --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; + update_blastdb.pl taxdb --decompress --source ${ELB_BLASTDB_SRC} --verbose --verbose --verbose --verbose --verbose --verbose ${GCP_PROJECT_OPT}; end=`date +%s`; log "download-blastdbs" $(($end-$start)); [ $exit_code -eq 0 ] || exit $exit_code; diff --git a/src/elastic_blast/templates/job-submit-jobs.yaml.template b/src/elastic_blast/templates/job-submit-jobs.yaml.template index cc0a5c6..26ca975 100644 --- a/src/elastic_blast/templates/job-submit-jobs.yaml.template +++ b/src/elastic_blast/templates/job-submit-jobs.yaml.template @@ -29,6 +29,12 @@ spec: value: "${ELB_CLUSTER_NAME}" - name: ELB_NUM_NODES value: "${ELB_NUM_NODES}" + - name: ELB_PD_SIZE + value: "${ELB_PD_SIZE}" + - name: ELB_LABELS + value: "${ELB_LABELS}" + - name: ELB_USE_LOCAL_SSD + value: "${ELB_USE_LOCAL_SSD}" # - name: ELB_DEBUG_SUBMIT_JOB_FAIL # value: "1" workingDir: /workspace diff --git a/src/elastic_blast/templates/pvc-rom.yaml.template b/src/elastic_blast/templates/pvc-rom.yaml.template new file mode 100644 index 0000000..c57ec9c --- /dev/null +++ b/src/elastic_blast/templates/pvc-rom.yaml.template @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: blast-dbs-pvc +spec: + dataSource: + name: blast-dbs-snapshot + kind: VolumeSnapshot + apiGroup: snapshot.storage.k8s.io + storageClassName: gcp-pd-ssd + accessModes: + - ReadOnlyMany + resources: + requests: + storage: ${ELB_PD_SIZE} diff --git a/src/elastic_blast/templates/pvc.yaml.template b/src/elastic_blast/templates/pvc-rwo.yaml.template similarity index 73% rename from src/elastic_blast/templates/pvc.yaml.template rename to src/elastic_blast/templates/pvc-rwo.yaml.template index e3a1362..cfe0b67 100644 --- a/src/elastic_blast/templates/pvc.yaml.template +++ b/src/elastic_blast/templates/pvc-rwo.yaml.template @@ -2,12 +2,11 @@ apiVersion: v1 kind: PersistentVolumeClaim metadata: - name: blast-dbs-pvc + name: blast-dbs-pvc-rwo spec: storageClassName: "gcp-pd-ssd" accessModes: - - ReadOnlyMany - - ReadWriteOnce + - ReadWriteOnce resources: requests: storage: ${ELB_PD_SIZE} diff --git a/src/elastic_blast/templates/storage-gcp-ssd.yaml b/src/elastic_blast/templates/storage-gcp-ssd.yaml index d4b41d7..01c2701 100644 --- a/src/elastic_blast/templates/storage-gcp-ssd.yaml +++ b/src/elastic_blast/templates/storage-gcp-ssd.yaml @@ -3,10 +3,10 @@ apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: gcp-pd-ssd -provisioner: kubernetes.io/gce-pd +provisioner: pd.csi.storage.gke.io parameters: type: pd-ssd - fsType: ext4 + csi.storage.k8s.io/fstype: ext4 replication-type: none reclaimPolicy: Delete allowVolumeExpansion: true diff --git a/src/elastic_blast/templates/storage-gcp.yaml b/src/elastic_blast/templates/storage-gcp.yaml index 6128042..9dc2eeb 100644 --- a/src/elastic_blast/templates/storage-gcp.yaml +++ b/src/elastic_blast/templates/storage-gcp.yaml @@ -3,10 +3,10 @@ apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: gcp-pd -provisioner: kubernetes.io/gce-pd +provisioner: pd.csi.storage.gke.io parameters: type: pd-standard - fsType: ext4 + csi.storage.k8s.io/fstype: ext4 replication-type: none reclaimPolicy: Delete allowVolumeExpansion: true diff --git a/src/elastic_blast/templates/volume-snapshot-class.yaml b/src/elastic_blast/templates/volume-snapshot-class.yaml new file mode 100644 index 0000000..fe42862 --- /dev/null +++ b/src/elastic_blast/templates/volume-snapshot-class.yaml @@ -0,0 +1,6 @@ +apiVersion: snapshot.storage.k8s.io/v1 +kind: VolumeSnapshotClass +metadata: + name: blast-dbs-snapshotclass +driver: pd.csi.storage.gke.io +deletionPolicy: Delete diff --git a/src/elastic_blast/templates/volume-snapshot.yaml b/src/elastic_blast/templates/volume-snapshot.yaml new file mode 100644 index 0000000..5a06824 --- /dev/null +++ b/src/elastic_blast/templates/volume-snapshot.yaml @@ -0,0 +1,8 @@ +apiVersion: snapshot.storage.k8s.io/v1 +kind: VolumeSnapshot +metadata: + name: blast-dbs-snapshot +spec: + volumeSnapshotClassName: blast-dbs-snapshotclass + source: + persistentVolumeClaimName: blast-dbs-pvc-rwo diff --git a/src/elastic_blast/tuner.py b/src/elastic_blast/tuner.py index 799fa02..9a387a0 100644 --- a/src/elastic_blast/tuner.py +++ b/src/elastic_blast/tuner.py @@ -101,8 +101,8 @@ def __str__(self): MAX_NUM_THREADS_GCP = 15 -def get_mt_mode(program: str, options: str = '', db_metadata: DbMetadata = None, - query: SeqData = None) -> MTMode: +def get_mt_mode(program: str, options: str = '', db_metadata: Optional[DbMetadata] = None, + query: Optional[SeqData] = None) -> MTMode: """ Compute BLAST search MT mode @@ -141,7 +141,8 @@ def get_mt_mode(program: str, options: str = '', db_metadata: DbMetadata = None, return MTMode.ZERO -def get_num_cpus(cloud_provider: CSP, program: str, mt_mode: MTMode, query: SeqData = None) -> int: +def get_num_cpus(cloud_provider: CSP, program: str, mt_mode: MTMode, + query: Optional[SeqData] = None) -> int: """Get number of CPUs to use to optimally run BLAST Arguments: @@ -164,7 +165,7 @@ def get_num_cpus(cloud_provider: CSP, program: str, mt_mode: MTMode, query: SeqD def get_batch_length(cloud_provider: CSP, program: str, mt_mode: MTMode, - num_cpus: int, db_metadata: DbMetadata = None) -> int: + num_cpus: int, db_metadata: Optional[DbMetadata] = None) -> int: """ Get batch length for BLAST batch search diff --git a/src/elastic_blast/util.py b/src/elastic_blast/util.py index 9bee3bb..f5f12a6 100644 --- a/src/elastic_blast/util.py +++ b/src/elastic_blast/util.py @@ -46,13 +46,16 @@ 'job-cloud-split-local-ssd.yaml.template', 'job-init-local-ssd.yaml.template', 'storage-gcp-ssd.yaml', - 'pvc.yaml.template', + 'pvc-rwo.yaml.template', + 'pvc-rom.yaml.template', 'job-init-pv.yaml.template', 'elb-janitor-rbac.yaml', 'elb-janitor-cronjob.yaml.template', 'job-submit-jobs.yaml.template', 'blast-batch-job.yaml.template', - 'blast-batch-job-local-ssd.yaml.template' + 'blast-batch-job-local-ssd.yaml.template', + 'volume-snapshot-class.yaml', + 'volume-snapshot.yaml' ] # Not used by elastic-blast tool: # storage-gcp.yaml @@ -223,9 +226,8 @@ def get_blastdb_info(blastdb: str, gcp_prj: Optional[str] = None): if db.startswith(ELB_GCS_PREFIX): # Custom database, just check the presence try: - if not gcp_prj: - raise ValueError(f'elastic_blast.util.get_blastdb_info is missing the gcp_prj parameter') - proc = safe_exec(f'gsutil -u {gcp_prj} ls {db}.*') + prj = f'-u {gcp_prj}' if gcp_prj else '' + proc = safe_exec(f'gsutil {prj} ls {db}.*') except SafeExecError: raise ValueError(f'Error requesting for {db}.*') output = proc.stdout.decode() @@ -251,39 +253,36 @@ def get_blastdb_size(db: str, db_source: DBSource, gcp_prj: Optional[str] = None if db.startswith(ELB_GCS_PREFIX): # Custom database, just check the presence try: - if not gcp_prj: - raise ValueError(f'elastic_blast.util.get_blastdb_size is missing the gcp_prj parameter') - safe_exec(f'gsutil -u {gcp_prj} ls {db}.*') + prj = f'-u {gcp_prj}' if gcp_prj else '' + safe_exec(f'gsutil {prj} ls {db}.*') except SafeExecError: raise ValueError(f'BLAST database {db} was not found') # TODO: find a way to check custom DB size w/o transferring it to user machine return 1000000 if db_source == DBSource.GCP: - if not gcp_prj: - raise ValueError(f'elastic_blast.util.get_blastdb_size is missing the gcp_prj parameter') - return gcp_get_blastdb_size(db, str(gcp_prj)) + return gcp_get_blastdb_size(db, gcp_prj) elif db_source == DBSource.AWS: return 1000000 # FIXME raise NotImplementedError("Not implemented for sources other than GCP") -def gcp_get_blastdb_latest_path(gcp_prj: str) -> str: +def gcp_get_blastdb_latest_path(gcp_prj: Optional[str]) -> str: """Get latest path of GCP-based blastdb repository""" - if not gcp_prj: - raise ValueError(f'elastic_blast.util.gcp_get_blastdb_latest_path is missing the gcp_prj parameter') - cmd = f'gsutil -u {gcp_prj} cat {GCS_DFLT_BUCKET}/latest-dir' + prj = f'-u {gcp_prj}' if gcp_prj else '' + cmd = f'gsutil {prj} cat {GCS_DFLT_BUCKET}/latest-dir' proc = safe_exec(cmd) return os.path.join(GCS_DFLT_BUCKET, proc.stdout.decode().rstrip()) -def gcp_get_blastdb_size(db: str, gcp_prj: str) -> float: +def gcp_get_blastdb_size(db: str, gcp_prj: Optional[str]) -> float: """Request blast database size from GCP using gsutil Returns the size in GB, if not found raises ValueError exception db: database name """ latest_path = gcp_get_blastdb_latest_path(gcp_prj) - cmd = f'gsutil -u {gcp_prj} cat {latest_path}/blastdb-manifest.json' + prj = f'-u {gcp_prj}' if gcp_prj else '' + cmd = f'gsutil {prj} cat {latest_path}/blastdb-manifest.json' proc = safe_exec(cmd) blastdb_metadata = json.loads(proc.stdout.decode()) if not db in blastdb_metadata: diff --git a/tests/config/test_config.py b/tests/config/test_config.py index 2ac022f..473bd87 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -76,6 +76,7 @@ @patch(target='elastic_blast.elb_config.get_db_metadata', new=MagicMock(return_value=DB_METADATA)) @patch(target='elastic_blast.elb_config.safe_exec', new=MagicMock(side_effect=mocked_safe_exec)) @patch(target='elastic_blast.util.safe_exec', new=MagicMock(side_effect=mocked_safe_exec)) +@patch(target='elastic_blast.gcp_traits.safe_exec', new=MagicMock(side_effect=mocked_safe_exec)) class ElbConfigLibTester(unittest.TestCase): """ Testing class for this module. """ diff --git a/tests/gcp/test_gcp.py b/tests/gcp/test_gcp.py index 1b14292..8e1e5cb 100644 --- a/tests/gcp/test_gcp.py +++ b/tests/gcp/test_gcp.py @@ -452,6 +452,7 @@ def safe_exec_gsutil_rm(cmd): return MockedCompletedProcess('') mocker.patch('elastic_blast.gcp.safe_exec', side_effect=safe_exec_gsutil_rm) + mocker.patch('elastic_blast.gcp_traits.safe_exec', side_effect=mocked_safe_exec) with patch(target='elastic_blast.elb_config.safe_exec', new=MagicMock(side_effect=GKEMock().mocked_safe_exec)): with patch(target='elastic_blast.util.safe_exec', new=MagicMock(side_effect=GKEMock().mocked_safe_exec)): cfg = ElasticBlastConfig(gcp_project = 'test-gcp-project', diff --git a/tests/kubernetes/test_kubernetes.py b/tests/kubernetes/test_kubernetes.py index 2af2131..eb37607 100644 --- a/tests/kubernetes/test_kubernetes.py +++ b/tests/kubernetes/test_kubernetes.py @@ -50,17 +50,7 @@ # Mocked tests -@pytest.fixture -def kubectl_mock(mocker): - """Fixture function that replaces util.safe_exec with mocked_safe_exec""" - - # we need kubernetes.safe_exec instead of util.safe_exec here, because - # safe_exec is imported in kubernetes.py with 'from util import safe_exec' - # and safe_exec in kubernetes is seen as local, python is funny this way - mocker.patch('elastic_blast.kubernetes.safe_exec', side_effect=mocked_safe_exec) - - -def test_fake_kubectl(kubectl_mock): +def test_fake_kubectl(gke_mock): """Test that calling fake safe_exec with wrong command line results in ValueError""" with pytest.raises(ValueError): @@ -95,7 +85,7 @@ def safe_exec_bad_json(cmd): kubernetes.safe_exec.assert_called() -def test_get_persistent_disk(kubectl_mock): +def test_get_persistent_disk(gke_mock): """Test getting k8s cluster persistent disks""" disks = kubernetes.get_persistent_disks(K8S_UNINITIALIZED_CONTEXT) assert sorted(disks) == sorted(GCP_DISKS) @@ -144,10 +134,10 @@ def print_safe_exec(cmd): print(cmd) if 'kubectl ' in cmd and 'get pv -o json' in cmd: result = {'items': []} # type: ignore - result['items'].append({'spec': {'gcePersistentDisk': {'pdName': GCP_DISKS[0]}}}) # type: ignore + result['items'].append({'spec': {'csi': {'volumeHandle': f'/project/test-project/{GCP_DISKS[0]}'}}}) # type: ignore return MockedCompletedProcess(stdout=json.dumps(result)) if 'kubectl ' in cmd and 'get pv' in cmd: - return MockedCompletedProcess(stdout='CLAIM PDNAME\nblast-dbs-pvc gke-some-synthetic-name') + return MockedCompletedProcess(stdout='CLAIM PDNAME\nblast-dbs-pvc-rwo gke-some-synthetic-name') if 'kubectl' in cmd and 'get -f' in cmd: fn = os.path.join(TEST_DATA_DIR, 'job-status.json') return MockedCompletedProcess(stdout=Path(fn).read_text()) @@ -168,6 +158,7 @@ def print_safe_exec(cmd): mocker.patch('elastic_blast.kubernetes.safe_exec', side_effect=print_safe_exec) mocker.patch('elastic_blast.elb_config.safe_exec', side_effect=print_safe_exec) mocker.patch('elastic_blast.util.safe_exec', side_effect=print_safe_exec) + mocker.patch('elastic_blast.gcp_traits.safe_exec', side_effect=print_safe_exec) DB_METADATA = DbMetadata(version = '1', @@ -183,24 +174,25 @@ def print_safe_exec(cmd): number_of_volumes = 1) @patch(target='elastic_blast.elb_config.get_db_metadata', new=MagicMock(return_value=DB_METADATA)) -def test_initialize_persistent_disk(gke_mock, safe_exec_mock, mocker): +@patch(target='elastic_blast.kubernetes.wait_for_pvc', new=MagicMock(return_value=None)) +@patch(target='elastic_blast.kubernetes._wait_for_snapshot', new=MagicMock(return_value=None)) +@patch(target='elastic_blast.kubernetes._wait_for_job', new=MagicMock(return_value=None)) +def test_initialize_persistent_disk(gke_mock, safe_exec_mock): """Exercises initialize_persistent_disk with mock safe_exec and prints out arguments to safe_exec Run pytest -s -v tests/kubernetes to verify correct order of calls""" from argparse import Namespace - def mocked_upload_file_to_gcs(fname, loc, dryrun): - """Mocked upload to GS function""" - pass - mocker.patch('elastic_blast.kubernetes.upload_file_to_gcs', side_effect=mocked_upload_file_to_gcs) - args = Namespace(cfg=os.path.join(TEST_DATA_DIR, 'initialize_persistent_disk.ini')) cfg = ElasticBlastConfig(configure(args), task = ElbCommand.SUBMIT) cfg.appstate.k8s_ctx = K8S_UNINITIALIZED_CONTEXT + cfg.cluster.labels = FAKE_LABELS kubernetes.initialize_persistent_disk(cfg) @patch(target='elastic_blast.elb_config.get_db_metadata', new=MagicMock(return_value=DB_METADATA)) -def test_initialize_persistent_disk_failed(gke_mock, mocker): +@patch(target='elastic_blast.kubernetes.wait_for_pvc', new=MagicMock()) +@patch(target='elastic_blast.kubernetes.label_persistent_disk', new=MagicMock()) +def test_initialize_persistent_disk_failed(gke_mock, safe_exec_mock, mocker): def fake_safe_exec_failed_job(cmd): fn = os.path.join(TEST_DATA_DIR, 'job-status-failed.json') return MockedCompletedProcess(stdout=Path(fn).read_text()) @@ -233,16 +225,17 @@ def test_label_persistent_disk(safe_exec_mock): # Replace labels with well-known fake for the purpose of testing command match, # see above in safe_exec_mock cfg.cluster.labels = FAKE_LABELS - kubernetes.label_persistent_disk(cfg) + kubernetes.label_persistent_disk(cfg, 'blast-dbs-pvc-rwo') -def test_delete_all(kubectl_mock): +def test_delete_all(gke_mock): """Test deleteting all jobs, persistent volume claims and persistent volumes""" deleted = kubernetes.delete_all(K8S_UNINITIALIZED_CONTEXT) assert sorted(set(deleted)) == sorted(K8S_JOBS + GKE_PVS) kubernetes.safe_exec.assert_called() +@patch.dict(os.environ, {'ELB_PAUSE_AFTER_INIT_PV': '1'}) def test_delete_all_no_resources(mocker): """Test deleting all whem no resources were created""" def safe_exec_no_resources(cmd): @@ -258,7 +251,7 @@ def safe_exec_no_resources(cmd): kubernetes.safe_exec.assert_called() -def test_get_jobs(kubectl_mock): +def test_get_jobs(gke_mock): """Test getting kubernetes job ids""" jobs = kubernetes.get_jobs(K8S_UNINITIALIZED_CONTEXT) assert sorted(jobs) == sorted(K8S_JOBS) diff --git a/tests/resources/quotas/test_quotas.py b/tests/resources/quotas/test_quotas.py index 5e14243..ea7fce6 100644 --- a/tests/resources/quotas/test_quotas.py +++ b/tests/resources/quotas/test_quotas.py @@ -55,6 +55,7 @@ class TestResourceQuotasAws(unittest.TestCase): + @patch(target='elastic_blast.elb_config.enable_gcp_api', new=MagicMock()) def setUp(self): """ Initialize 2 configurations: one for GCP another for AWS """ cfg_gcp = configparser.ConfigParser() diff --git a/tests/util/test_util.py b/tests/util/test_util.py index 4cb2c56..8e4b92b 100644 --- a/tests/util/test_util.py +++ b/tests/util/test_util.py @@ -155,6 +155,7 @@ def test_get_blastdb_size_invalid_database(gcp_env_vars): @patch(target='elastic_blast.elb_config.get_db_metadata', new=MagicMock(return_value=DB_METADATA)) +@patch(target='elastic_blast.elb_config.enable_gcp_api', new=MagicMock()) def create_config_for_db(dbname): """Create minimal config for a database name""" return ElasticBlastConfig(gcp_project = 'test-gcp-project', diff --git a/tests/utils.py b/tests/utils.py index f4e643b..3720c7b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -191,12 +191,14 @@ def gke_mock(mocker): mocker.patch('elastic_blast.util.safe_exec', side_effect=mock.mocked_safe_exec) mocker.patch('elastic_blast.filehelper.safe_exec', side_effect=mock.mocked_safe_exec) mocker.patch('elastic_blast.elb_config.safe_exec', side_effect=mock.mocked_safe_exec) + mocker.patch('elastic_blast.gcp_traits.safe_exec', side_effect=mock.mocked_safe_exec) mocker.patch('elastic_blast.tuner.aws_get_machine_type', new=MagicMock(return_value='test-machine-type')) # mocker.patch('subprocess.Popen', new=MagicMock(return_value=MockedCompletedProcess())) mocker.patch('subprocess.Popen', side_effect=mock.mocked_popen) mocker.patch('boto3.resource', side_effect=mock.mocked_resource) mocker.patch('boto3.client', side_effect=mock.mocked_client) mocker.patch('botocore.exceptions.ClientError.__init__', new=MagicMock(return_value=None)) + mocker.patch.dict(os.environ, {'ELB_PAUSE_AFTER_INIT_PV': '1'}) yield mock del mock @@ -214,6 +216,7 @@ def gke_mock(mocker): BLASTDB = 'mocked_blastdb' +@patch(target='elastic_blast.elb_config.enable_gcp_api', new=MagicMock()) def get_mocked_config() -> ElasticBlastConfig: """Generate config for mocked gcloud and kubeclt""" cfg = ElasticBlastConfig(gcp_project = GCP_PROJECT, @@ -318,7 +321,7 @@ def mocked_safe_exec(cmd: Union[List[str], str], cloud_state: CloudResources = N elif cmd[0] == 'kubectl' and 'get pv -o json' in ' '.join(cmd): result = {'items': []} # type: ignore for i in GCP_DISKS: - result['items'].append({'spec': {'gcePersistentDisk': {'pdName': i}}}) # type: ignore + result['items'].append({'spec': {'csi': {'volumeHandle': f'/test-project/test-region/{i}'}}}) # type: ignore return MockedCompletedProcess(json.dumps(result)) # get kubernetes jobs @@ -363,6 +366,10 @@ def mocked_safe_exec(cmd: Union[List[str], str], cloud_state: CloudResources = N elif cmd[0] == 'kubectl' and 'delete pv --all' in ' '.join(cmd): return MockedCompletedProcess('\n') + # delete all volume snapshots + elif cmd[0] == 'kubectl' and 'delete volumesnapshots --all' in ' '.join(cmd): + return MockedCompletedProcess('\n') + # check if kubernetes client is installed or cluster is alive elif ' '.join(cmd).startswith('kubectl') and 'version' in ' '.join(cmd): return MockedCompletedProcess() @@ -596,7 +603,7 @@ def load(self): if self.obj not in self.storage: raise ClientError(None, None) - def upload_fileobj(self, stream): + def upload_fileobj(self, stream, Config = None): """Upload a file object to the cloud bucket""" self.storage[self.obj] = stream.read()