Skip to content

Commit

Permalink
Add support for non-dmel and perf improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
jogoodma committed May 18, 2024
1 parent b041b3f commit 06151e7
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 61 deletions.
15 changes: 10 additions & 5 deletions src/blast_db_configuration/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from Bio import Entrez
import agr_blast_service_configuration.schemas.metadata as agrdb

from .db_metadata import create_flybase_metadata
from .db_metadata import create_metadata_from_ncbi

app = typer.Typer()

Expand Down Expand Up @@ -70,7 +70,7 @@ def generate_config(
ncbi_email: Annotated[
str, typer.Option(help="Email to use when connecting to NCBI.")
] = DEFAULT_CONFIG.contact,
organisms_file: Annotated[
organisms: Annotated[
Path, typer.Option(help="Path to the organisms file.")
] = DEFAULT_CONFIG.organisms,
output: Annotated[Path, typer.Option(help="Output configuration file.")] = None,
Expand All @@ -92,15 +92,20 @@ def generate_config(
)
all_dbs: list[agrdb.SequenceMetadata] = []

for genus, species in tqdm(load_organisms(organisms_file)):
for genus, species in tqdm(
load_organisms(organisms),
ncols=100,
desc="Processing organisms",
unit="organism",
):
if genus == "Drosophila" and species == "melanogaster":
pass
else:
all_dbs.extend(create_flybase_metadata(genus, species))
all_dbs.extend(create_metadata_from_ncbi(genus, species, ncbi_email))

blast_dbs = agrdb.AgrBlastDatabases(metadata=metadata, data=all_dbs)
with output.open("w") as outfile:
json.dump(blast_dbs.to_dict(), outfile)
json.dump(blast_dbs.to_dict(), outfile, indent=2)


def load_organisms(organism_json: Path) -> list[list[str, str], ...]:
Expand Down
25 changes: 20 additions & 5 deletions src/blast_db_configuration/db_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@
logger = logging.getLogger(__name__)


def create_flybase_metadata(
genus: str, species: str
def create_metadata_from_ncbi(
genus: str, species: str, email: str
) -> list[blast_metadata_schema.SequenceMetadata]:
"""
Create the BLAST Database metadata schema using NCBI FTP and API services.
:param genus: Genus name
:param species: Species name
:param email: Email address to use for connecting to the NCBI
:return: BLAST Database metadata schemas
"""
dbs: list[blast_metadata_schema.SequenceMetadata] = []
assembly_targets = [
{
Expand All @@ -34,7 +42,11 @@ def create_flybase_metadata(
]
for target in assembly_targets:
assembly_files = genomes.get_current_genome_assembly_files(
genus, species, file_regex=target["file_regex"]
genus,
species,
file_regex=target["file_regex"],
email=email,
organism_group=genomes.OrganismGroup.INVERTEBRATE,
)
if not assembly_files:
logger.error(f"No assembly files found for {genus} {species}")
Expand All @@ -53,12 +65,15 @@ def create_flybase_metadata(
genus[0], species, assembly_files[0]
),
description=target["description"].format(genus, species),
taxon_id=taxid,
seqtype=target["seqtype"],
taxon_id=str(taxid),
seqtype=blast_metadata_schema.BlastDBType(target["seqtype"]),
)
)
return dbs


def create_dmel_metadata():
pass
# dbs.extend(
# [
# blast_metadata.BlastDBMetaData(
Expand Down
132 changes: 81 additions & 51 deletions src/blast_db_configuration/ncbi/genomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,68 @@
import logging
import re
import urllib.error
import urllib.request as req
from pathlib import Path
from functools import cache
from ftplib import FTP
from typing import Iterator, Optional
from enum import StrEnum
from Bio import Entrez

logger = logging.getLogger(__name__)

FTP_HOST = "ftp.ncbi.nlm.nih.gov"
DEFAULT_ORGANISM_GROUP = "invertebrate"

FtpDirectoryListing = list[tuple[str, dict[str, str]]]

FTP_FILES_CACHE: dict[str, FtpDirectoryListing] = {}


class OrganismGroup(StrEnum):
"""
Organism groups used by the NCBI RefSeq genomes FTP server directory structure.
see https://ftp.ncbi.nlm.nih.gov/genomes/refseq/
"""

ARCHAEA = "archaea"
BACTERIA = "bacteria"
FUNGI = "fungi"
INVERTEBRATE = "invertebrate"
METAGENOMES = "metagenomes"
MITOCHONDRIA = "mitochondria"
PLANT = "plant"
PLASMID = "plasmid"
PLASTID = "plastid"
PROTOZOA = "protozoa"
VERTEBRATE_MAMMALIAN = "vertebrate_mammalian"
VERTEBRATE_OTHER = "vertebrate_other"
VIRAL = "viral"


def get_current_genome_assembly_files(
genus: str,
species: str,
organism_group: str = DEFAULT_ORGANISM_GROUP,
email: str,
organism_group: str | OrganismGroup = OrganismGroup.INVERTEBRATE,
file_regex: str = None,
) -> Optional[tuple[str, str, str]]:
"""
Get the current genome assembly directory for a given organism.
Organism group is one of:
- 'archaea'
- 'bacteria'
- 'fungi'
- 'invertebrate'
- 'mitochondria'
- 'plant'
- 'plasmid'
- 'plastid'
- 'protozoa'
- 'vertebrate_mammalian'
- 'vertebrate_other'
- 'viral'
:param genus: Genus of organism
:param species: Species of organism
:param email: Email to use for the anonymous FTP connection password
:param organism_group: Organism group (default: 'invertebrate', see above)
:param file_regex: Regular expression to match files (default: None)
:return: Tuple of (genome assembly directory, genome assembly file, md5 checksum file)
"""
if type(organism_group) is str:
organism_group = OrganismGroup(organism_group)

path = f"/genomes/refseq/{organism_group}/{genus}_{species.replace(' ', '_')}/latest_assembly_versions"
with FTP(FTP_HOST) as ftp:
with FTP(FTP_HOST, timeout=30) as ftp:
# Use a passive connection to avoid firewall blocking and login.
ftp.set_pasv(True)
ftp.login()
ftp.login(user="anonymous", passwd=email)
# Get a list of files in the directory.
try:
files = ftp.mlsd(path)
Expand Down Expand Up @@ -76,44 +92,57 @@ def get_current_genome_assembly_files(
assembly_dir = directories[0]
# Get a list of files in the latest genome assembly directory.
try:
assembly_files = ftp.mlsd(f"{path}/{assembly_dir}")
assembly_ftp_dir = f"{path}/{assembly_dir}"
assembly_files = FTP_FILES_CACHE.get(assembly_ftp_dir)
if assembly_files is None:
assembly_files = [
file for file in ftp.mlsd(f"{path}/{assembly_dir}")
]
FTP_FILES_CACHE[assembly_ftp_dir] = assembly_files
# Filter files based on the regular expression filter.
files = filter_ftp_paths(assembly_files, file_regex)
filtered_files = filter_ftp_paths(assembly_files, file_regex)
except ftplib.all_errors as e:
logger.error(f"FTP error while processing {genus} {species}: {e}")
return None

files_with_md5 = []
checksums = {}

# Look for the md5 of the genome assembly file.
for file in files:
try:
# Fetch the md5 checksum file
with req.urlopen(
f"ftp://{FTP_HOST}{path}/{assembly_dir}/md5checksums.txt"
) as response:
md5_body = response.read()
decoded_body = md5_body.decode("utf-8")
# Look for the corresponding genome assembly file checksum.
for md5_line in decoded_body.splitlines():
if file in md5_line:
md5 = re.split(r"\s+", md5_line)[0]
files_with_md5.append(
(
assembly_dir,
f"ftp://{FTP_HOST}{path}/{assembly_dir}/{file}",
md5,
)
)
# Make sure we only found one checksum.
if len(files_with_md5) > 1:
logger.warning(
"Found multiple files with MD5 checksums, using the first one"

try:

def get_md5sum(line: str) -> str:
checksum, remote_file = re.split(r"\s+", line)
remote_file = Path(remote_file).name
checksums[remote_file] = checksum

ftp.retrlines(
f"RETR {path}/{assembly_dir}/md5checksums.txt",
callback=get_md5sum,
)
except ftplib.all_errors as e:
logger.error("Failed to get md5 checksums:\n%s", str(e))

for file in filtered_files:
if file in checksums:
md5sum = checksums[file]
files_with_md5.append(
(
assembly_dir,
f"ftp://{FTP_HOST}{path}/{assembly_dir}/{file}",
md5sum,
)
elif len(files_with_md5) == 0:
logger.warning("Could not find MD5 checksum for file")
except (urllib.error.URLError, urllib.error.HTTPError) as error:
logger.error(f"Failed to get md5 checksum for {file}:\n{error}")
continue
)

# Make sure we only found one checksum.
if len(files_with_md5) > 1:
logger.warning(
"Found multiple files with MD5 checksums, using the first one"
)
elif len(files_with_md5) == 0:
logger.warning("Could not find MD5 checksum for file")

return files_with_md5[0] if len(files_with_md5) >= 1 else None

else:
Expand Down Expand Up @@ -144,8 +173,9 @@ def search_ncbi_assemblies(genus: str, species: str) -> list[str]:
return None


@cache
def filter_ftp_paths(files: Iterator, file_regex: str = None) -> list[str]:
def filter_ftp_paths(
files: Iterator[FtpDirectoryListing], file_regex: str = None
) -> list[str]:
"""
Given an iterator for files from the `mlsd` FTP command, filter the files based on the regular expression.
Expand Down

0 comments on commit 06151e7

Please sign in to comment.