diff --git a/fgpyo/sam/__init__.py b/fgpyo/sam/__init__.py index 17769e2c..34644c17 100644 --- a/fgpyo/sam/__init__.py +++ b/fgpyo/sam/__init__.py @@ -201,6 +201,7 @@ ILLUMINA_READ_NAME_DELIMITER: str = ":" """Illumina read names are delimited with a colon.""" + @enum.unique class SamFileType(enum.Enum): """Enumeration of valid SAM/BAM/CRAM file types. @@ -946,11 +947,12 @@ class SamOrder(enum.Enum): QueryName = "queryname" #: queryname sorted Unknown = "unknown" # Unknown SAM / BAM / CRAM sort order + def extract_umis_from_read_name( read_name: str, read_name_delimiter: str = ILLUMINA_READ_NAME_DELIMITER, umi_delimiter: str = ILLUMINA_UMI_DELIMITER, - strict: bool = False + strict: bool = False, ) -> Optional[str]: """Extract UMI(s) from a read name. The UMI is expected to be the final component of the read name, delimited by the @@ -972,11 +974,13 @@ def extract_umis_from_read_name( """ if strict: colons = read_name.count(":") - if colons == 6: #number of fields is 7 + if colons == 6: # number of fields is 7 return None elif colons != 7: - raise ValueError(f"Trying to extract UMIs from read with {colons + 1} parts " - f"(7 or 8 expected): {read_name}") + raise ValueError( + f"Trying to extract UMIs from read with {colons + 1} parts " + f"(7 or 8 expected): {read_name}" + ) raw_umi = read_name.split(read_name_delimiter)[-1] # Check each UMI individually umis = raw_umi.split(umi_delimiter) @@ -992,6 +996,7 @@ def extract_umis_from_read_name( ) return SAM_UMI_DELIMITER.join(umis) + def copy_umi_from_read_name(rec: AlignedSegment, remove_umi: bool = False) -> None: """ Copy a UMI from an alignment's read name to its `RX` SAM tag. @@ -1008,16 +1013,19 @@ def copy_umi_from_read_name(rec: AlignedSegment, remove_umi: bool = False) -> No ValueError: If the record already has a populated `RX` SAM tag. """ - umi = extract_umis_from_read_name(read_name=rec.qname,umi_delimiter=ILLUMINA_READ_NAME_DELIMITER) + umi = extract_umis_from_read_name( + read_name=rec.query_name, umi_delimiter=ILLUMINA_READ_NAME_DELIMITER + ) if not _is_valid_umi(umi): - raise ValueError( - f"Invalid UMI(s) found in read name: {read_name}", - ) + raise ValueError( + f"Invalid UMI(s) found in read name: {rec.query_name}", + ) else: rec.set_tag(tag="RX", value=umi, value_type="Z") if remove_umi: - last_index = rec.qname.rfind(ILLUMINA_READ_NAME_DELIMITER) - rec.qname = rec.qname[:last_index] if last_index != -1 else rec.qname + last_index = rec.query_name.rfind(ILLUMINA_READ_NAME_DELIMITER) + rec.query_name = rec.query_name[:last_index] if last_index != -1 else rec.query_name + def _is_valid_umi(umi: str) -> bool: """Check whether a UMI is valid. @@ -1029,4 +1037,4 @@ def _is_valid_umi(umi: str) -> bool: True if the UMI is valid, False otherwise. """ - return len(umi) > 0 and set(umi).issubset(VALID_UMI_CHARACTERS) \ No newline at end of file + return len(umi) > 0 and set(umi).issubset(VALID_UMI_CHARACTERS)