From 2e7653bbce90627ddb75c78a82ac663345482343 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Wed, 24 Jan 2024 18:25:32 +0100 Subject: [PATCH 01/13] silence some linter warnings --- .flake8 | 3 +- src/pixelator/types.py | 3 +- src/pixelator/utils.py | 109 ++++++++++++++++++++--------------------- 3 files changed, 57 insertions(+), 58 deletions(-) diff --git a/.flake8 b/.flake8 index a5bfc1c7..8362dc81 100644 --- a/.flake8 +++ b/.flake8 @@ -1,5 +1,6 @@ [flake8] -extend-ignore = E501,E402,W503,E203,D213,D203,DOC301,DOC502 + +extend-ignore = E501,E402,W503,E203,D213,D203,DOC301,DOC502,DOC203 exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,cue.mod docstring-convention = all style = sphinx diff --git a/src/pixelator/types.py b/src/pixelator/types.py index 3be37584..8d983162 100644 --- a/src/pixelator/types.py +++ b/src/pixelator/types.py @@ -1,5 +1,4 @@ -""" -This module contains helper typehints for the pixelator package. +"""Helper typehints for the pixelator package. Copyright (c) 2023 Pixelgen Technologies AB. """ diff --git a/src/pixelator/utils.py b/src/pixelator/utils.py index 441b7470..f514100a 100644 --- a/src/pixelator/utils.py +++ b/src/pixelator/utils.py @@ -1,5 +1,4 @@ -""" -Common functions and utilities for Pixelator +"""Common functions and utilities for Pixelator. Copyright (c) 2022 Pixelgen Technologies AB. """ @@ -13,6 +12,8 @@ import tempfile import textwrap import time +import typing +from concurrent import futures from functools import wraps from pathlib import Path, PurePath from typing import Any, Dict, List, Optional, Sequence, Set, TYPE_CHECKING, Union @@ -37,9 +38,9 @@ def build_barcodes_file( panel: AntibodyPanel, anchored: bool, rev_complement: bool ) -> str: - """ - Utility function to create a FASTA file of barcodes from a - panel dataframe. The FASTA file will have the marker id as + """Create a FASTA file of barcodes from a panel dataframe. + + The FASTA file will have the marker id as name and the barcode sequence as sequence. The parameter rev_complement control if sequence needs to be in reverse complement form or not. When anchored is true a dollar sign @@ -71,9 +72,7 @@ def build_barcodes_file( def click_echo(msg: str, multiline: bool = False): - """ - Helper function that print a line to the console - with long-line wrapping. + """Print a line to the console with optional long-line wrapping. :param msg: the message to print :param multiline: True to use text wrapping or False otherwise (default) @@ -85,10 +84,10 @@ def click_echo(msg: str, multiline: bool = False): def create_output_stage_dir(root: PathType, name: str) -> Path: - """ - Create a new subfolder with `name` under the given `root` directory. + """Create a new subfolder with `name` under the given `root` directory. - :param root: the root directory + :param root: the parent directory + :param name: the name of the directory to create :returns: the created folder (Path) """ output = Path(root) / name @@ -98,8 +97,7 @@ def create_output_stage_dir(root: PathType, name: str) -> Path: def flatten(list_of_collections: List[Union[List, Set]]) -> List: - """ - Flattens a list of lists or list of sets. + """Flattens a list of lists or list of sets. :param list_of_collections: list of lists or list of sets :returns: list containing flattened items @@ -108,19 +106,19 @@ def flatten(list_of_collections: List[Union[List, Set]]) -> List: def get_extension(filename: PathType, len_ext: int = 2) -> str: - """ - Utility function to extract file extensions. + """Extract file extensions from a filename. :param filename: the file name - :param len: the extension length + :param len_ext: the number of expected extensions parts + e.g.: fq.gz gives len_ext=2 :returns: the file extension (str) """ return "".join(PurePath(filename).suffixes[-len_ext:]).lstrip(".") def get_sample_name(filename: PathType) -> str: - """ - Extract the sample name from a sample's filename. + """Extract the sample name from a sample's filename. + The sample name is expected to be from the start of the filename until the first dot. @@ -133,8 +131,7 @@ def get_sample_name(filename: PathType) -> str: def group_input_reads( inputs: Sequence[PathType], input1_pattern: str, input2_pattern: str ) -> Dict[str, List[Path]]: - """ - Group input files by read pairs and sample id + """Group input files by read pairs and sample id. :param inputs: list of input files :param input1_pattern: pattern to match read1 files @@ -181,10 +178,9 @@ def group_fn(s): def gz_size(filename: str) -> int: - """ - Extract the size of a gzip compressed file. + """Extract the size of a gzip compressed file. - :param fname: file name + :param filename: file name :returns: size of the file uncompressed (in bits) """ with gzip.open(filename, "rb") as f: @@ -197,15 +193,13 @@ def log_step_start( output: Optional[str] = None, **kwargs, ) -> None: - """ - Utility function to add information about the start of a - pixelator step to the logs + """Add information about the start of a pixelator step to the logs. :param step_name: name of the step that is starting - :param input_files: optional collection of input file paths + :param input_files: collection of input file paths :param output: optional path to output - :param kwargs: any additional parameters that you wish to log - :returns: None + :param **kwargs: any additional parameters that you wish to log + :rtype: None """ logger.info("Start pixelator %s %s", step_name, __version__) @@ -221,19 +215,13 @@ def log_step_start( def np_encoder(object: Any): - """ - A very simple encoder to allow JSON serialization - of numpy data types - """ + """Encoder for JSON serialization of numpy data types.""" # noqa: D401 if isinstance(object, np.generic): return object.item() def remove_csv_whitespaces(df: pd.DataFrame) -> None: - """ - Utility function to remove leading and trailing - blank spaces from csv files slurped by pandas - """ + """Remove leading and trailing blank spaces from csv files slurped by pandas.""" # fill NaNs as empty strings to be able to do `.str` df.fillna("", inplace=True) df.columns = df.columns.str.strip() @@ -242,11 +230,11 @@ def remove_csv_whitespaces(df: pd.DataFrame) -> None: def reverse_complement(seq: str) -> str: - """ - Helper function to compute the reverse complement of a DNA seq + """Compute the reverse complement of a DNA seq. :param seq: the DNA sequence - :returns: the reverse complement of the input sequence + :return: the reverse complement of the input sequence + :rtype: str """ return seq.translate(_TRTABLE)[::-1] @@ -255,14 +243,13 @@ def sanity_check_inputs( input_files: Sequence[PathType], allowed_extensions: Union[Sequence[str], Optional[str]] = None, ) -> None: - """ - Perform basic sanity checking of input files + """Perform basic sanity checking of input files. :param input_files: the files to sanity check :param allowed_extensions: the expected file extension of the files, e.g. 'fastq.gz' or a tuple of allowed types eg. ('fastq.gz', 'fq.gz') - :returns: None :raises AssertionError: when any of validation fails + :returns None: """ for input_file in input_files: input_file = Path(input_file) @@ -290,13 +277,14 @@ def sanity_check_inputs( ) -def single_value(xs: Union[List, Set]) -> Any: - """ - Extract the first value in a List or Set if the - collection has a single value. +T = typing.TypeVar("T") + + +def single_value(xs: Union[List[T], Set[T]]) -> T: + """Extract the first value in a List or Set if the collection has a single value. :param xs: a collection of values - :returns: the first value in the collection + :returns T: the first value in the collection :raises AssertionError: if the collection is empty or has more than one value """ if len(xs) == 0: @@ -307,9 +295,7 @@ def single_value(xs: Union[List, Set]) -> Any: def timer(func): - """ - Function decorator used to time the different steps - """ + """Time the different steps of a function.""" @wraps(func) def wrapper(*args, **kwds): @@ -324,14 +310,12 @@ def wrapper(*args, **kwds): def write_parameters_file( click_context: click.Context, output_file: Path, command_path: Optional[str] = None -): - """ - Write the parameters used in for a command to a JSON file +) -> None: + """Write the parameters used in for a command to a JSON file. :param click_context: the click context object :param output_file: the output file :param command_path: the command to use as command name - :returns: None """ command_path_fixed = command_path or click_context.command_path parameters = click_context.command.params @@ -361,3 +345,18 @@ def write_parameters_file( with open(output_file, "w") as fh: json.dump(data, fh, indent=4) + + +def get_process_pool_executor(ctx, **kwargs): + """Return a process pool with some default settings. + + The number of cores will be set to the number of cores available from the click ctx. + The multiprocess logger will be initialized in each worker process. + + :args ctx: click context object + :kwargs: additional kwargs to pass to the ProcessPoolExecutor constructor + """ + return futures.ProcessPoolExecutor( + max_workers=ctx.obj["CORES"], + **kwargs, + ) From a3cfe9b53f981b4da5a0bea2bf98234fbdaf45c1 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Wed, 24 Jan 2024 18:41:45 +0100 Subject: [PATCH 02/13] fix multiprocess logging and improve cli output - Properly handle log messages from process pool workers. - Verbose mode now forwards debug log messages to console output. - Normal mode now prints all INFO level output to the console. - satisfy docstring linters --- src/pixelator/amplicon/process.py | 21 ++- src/pixelator/amplicon/statistics.py | 54 ++++--- src/pixelator/cli/adapterqc.py | 5 +- src/pixelator/cli/amplicon.py | 16 +- src/pixelator/cli/analysis.py | 12 +- src/pixelator/cli/annotate.py | 21 ++- src/pixelator/cli/collapse.py | 8 +- src/pixelator/cli/common.py | 67 +------- src/pixelator/cli/demux.py | 14 +- src/pixelator/cli/graph.py | 8 +- src/pixelator/cli/logging.py | 228 +++++++++++++++++++++++++++ src/pixelator/cli/main.py | 22 +-- src/pixelator/cli/preqc.py | 8 +- src/pixelator/cli/report.py | 4 +- src/pixelator/collapse/process.py | 21 ++- tests/utils/test_utils.py | 16 +- 16 files changed, 361 insertions(+), 164 deletions(-) create mode 100644 src/pixelator/cli/logging.py diff --git a/src/pixelator/amplicon/process.py b/src/pixelator/amplicon/process.py index 78e47691..57e873c7 100644 --- a/src/pixelator/amplicon/process.py +++ b/src/pixelator/amplicon/process.py @@ -150,10 +150,11 @@ def amplicon_fastq( mode = "single-end" if len(inputs) == 1 else "paired-end" stats = SequenceQualityStatsCollector(design) - start1_log_msg = "Starting the concatenation of %s to %s" - start2_log_msg = "Starting the concatenation of %s and %s to %s" - end1_log_msg = "Finished the concatenation of %s to %s" - end2_log_msg = "Finished the concatenation of %s and %s to %s" + start1_log_msg = "Building amplicon of %s to %s" + start2_log_msg = "Building amplicon of %s and %s to %s" + end1_log_msg = "Finished building amplicon of %s to %s" + end2_log_msg = "Finished building amplicon of %s and %s to %s" + progress_log_msg = "Generating amplicon for %s: %s reads processed" amplicon = assay.get_region_by_id("amplicon") if amplicon is None: @@ -173,14 +174,20 @@ def amplicon_fastq( logger.debug(start2_log_msg, inputs[0], inputs[1], output) with xopen(output, "wb") as f: - for record1, record2 in zip( - pyfastx.Fastq(str(inputs[0]), build_index=False), - pyfastx.Fastq(str(inputs[1]), build_index=False), + for idx, (record1, record2) in enumerate( + zip( + pyfastx.Fastq(str(inputs[0]), build_index=False), + pyfastx.Fastq(str(inputs[1]), build_index=False), + ) ): + if idx % 100000 == 0: + logger.debug(progress_log_msg, str(output), str(idx)) + name, new_seq, new_qual = generate_amplicon(record1, record2, amplicon) write_record(f, name, new_seq, new_qual) stats.update(new_qual) + logger.info(progress_log_msg, str(output), stats.read_count) # add metrics to JSON file avg_stats = stats.stats diff --git a/src/pixelator/amplicon/statistics.py b/src/pixelator/amplicon/statistics.py index 3602a72a..428bbf8b 100644 --- a/src/pixelator/amplicon/statistics.py +++ b/src/pixelator/amplicon/statistics.py @@ -1,10 +1,11 @@ -""" +"""Collect statistics for amplicon. + Copyright (c) 2023 Pixelgen Technologies AB. """ import collections import dataclasses from functools import cache -from typing import Any, Dict, Tuple, Union +from typing import Any, Union import numpy as np @@ -31,8 +32,10 @@ def _count_elem_in_array_where_greater_or_equal_than(arr, value): return result -@dataclasses.dataclass(frozen=True) +@dataclasses.dataclass(frozen=True, slots=True) class SequenceQualityStats: + """Container for sequence quality statistics.""" + fraction_q30_upia: float fraction_q30_upib: float fraction_q30_umi: float @@ -41,7 +44,8 @@ class SequenceQualityStats: fraction_q30_bc: float fraction_q30: float - def asdict(self) -> Dict[str, Any]: + def asdict(self) -> dict[str, Any]: + """Return a dictionary representation of this instance.""" return {k: v for k, v in dataclasses.asdict(self).items()} @@ -49,6 +53,10 @@ class SequenceQualityStatsCollector: """Accumulate read quality statistics for a given design.""" def __init__(self, design_name: str): + """Accumulate read quality statistics for a given design. + + :param design_name: The name of the design of the reads for which to statistics. + """ design = config.get_assay(design_name) if design is None: @@ -81,19 +89,25 @@ def __init__(self, design_name: str): raise ValueError("Assay does not contain a UMI region") @cache - def get_position(self, region_id: str) -> Tuple[int, int]: + def get_position(self, region_id: str) -> tuple[int, int]: + """Return the positions for a region. + + :param region_id: id of the region + :returns: a tuple with start and end positions + :raise ValueError: An unknown region id was given + """ r = self._positions.get(region_id) if r is None: raise ValueError(f"Unknown region: {region_id}") return r @staticmethod - def _read_stats(quali: np.ndarray) -> Tuple[int, int]: + def _read_stats(quali: np.ndarray) -> tuple[int, int]: bases_in_read = _count_elem_in_array_where_greater_than(quali, 2) q30_bases_in_read = _count_elem_in_array_where_greater_or_equal_than(quali, 30) return bases_in_read, q30_bases_in_read - def _umi_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _umi_stats(self, quali: np.ndarray) -> tuple[int, int]: # Q30 Bases in UMI umi_regions = self.design.get_regions_by_type(RegionType.UMI) umi_positions = [self.get_position(r.region_id) for r in umi_regions] @@ -109,7 +123,7 @@ def _umi_stats(self, quali: np.ndarray) -> Tuple[int, int]: return bases_in_umi, q30_bases_in_umi - def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> Tuple[int, int]: + def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> tuple[int, int]: upia_pos = self.get_position(pos) slice_obj = slice(*upia_pos) quali_subset = quali[slice_obj] @@ -117,26 +131,29 @@ def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> Tuple[int, in q30 = _count_elem_in_array_where_greater_or_equal_than(quali_subset, 30) return bases, q30 - def _upia_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _upia_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "upi-a") - def _upib_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _upib_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "upi-b") - def _pbs1_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _pbs1_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "pbs-1") - def _pbs2_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _pbs2_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "pbs-2") - def _bc_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _bc_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "bc") + @property + def read_count(self) -> int: + """Return the number of reads processed.""" + return self._counter["read_count"] + @property def stats(self) -> SequenceQualityStats: - """ - Return the accumulated statistics as a SequenceQualityStats object. - """ + """Return the accumulated statistics as a SequenceQualityStats object.""" fraction_q30_upia = ( self._counter["q30_bases_in_upia"] / self._counter["bases_in_upia"] ) @@ -170,9 +187,7 @@ def stats(self) -> SequenceQualityStats: ) def update(self, qualities: Union[str, np.ndarray]) -> None: - """ - Update the statistics with the given read qualities. - """ + """Update the statistics with the given read qualities.""" # Use numpy for vectorized operations # Reinterpret cast to integers (same as ord) if isinstance(qualities, str): @@ -189,6 +204,7 @@ def update(self, qualities: Union[str, np.ndarray]) -> None: bases_in_bc, q30_bases_in_bc = self._bc_stats(quali) self._counter.update( + read_count=1, bases_in_read=bases_in_read, q30_bases_in_read=q30_bases_in_read, bases_in_umi=bases_in_umi, diff --git a/src/pixelator/cli/adapterqc.py b/src/pixelator/cli/adapterqc.py index b7fbcfd2..d332cb38 100644 --- a/src/pixelator/cli/adapterqc.py +++ b/src/pixelator/cli/adapterqc.py @@ -10,9 +10,9 @@ from pixelator.cli.common import design_option, logger, output_option from pixelator.qc import adapter_qc_fastq from pixelator.utils import ( - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -81,11 +81,10 @@ def adapterqc( adapterqc_output = create_output_stage_dir(output, "adapterqc") # run cutadapt (adapter mode) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in input_files: msg = f"Processing {fastq_file} with cutadapt (adapter mode)" - click_echo(msg, multiline=False) logger.info(msg) clean_name = get_sample_name(fastq_file) diff --git a/src/pixelator/cli/amplicon.py b/src/pixelator/cli/amplicon.py index 76d28f64..bf34482a 100644 --- a/src/pixelator/cli/amplicon.py +++ b/src/pixelator/cli/amplicon.py @@ -8,12 +8,16 @@ import click -from pixelator.cli.common import design_option, logger, output_option from pixelator.amplicon import amplicon_fastq +from pixelator.cli.common import ( + design_option, + logger, + output_option, +) from pixelator.utils import ( - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, group_input_reads, log_step_start, sanity_check_inputs, @@ -66,6 +70,7 @@ def amplicon( Process diverse raw pixel data (FASTQ) formats into common amplicon """ # log input parameters + log_step_start( "amplicon", input_files=input_files, @@ -87,7 +92,7 @@ def amplicon( ) # run amplicon using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for k, v in grouped_sorted_inputs.items(): extension = get_extension(v[0]) @@ -101,12 +106,11 @@ def amplicon( ) if len(v) > 2: - msg = "Found more files than needed for concatenating fastq files" + msg = "Found more files than needed for creating an amplicon" logger.error(msg) raise RuntimeError(msg) - msg = f"Concatenating {','.join(str(p) for p in v)}" - click_echo(msg, multiline=False) + msg = f"Creating amplicon for {','.join(str(p) for p in v)}" logger.info(msg) jobs.append( diff --git a/src/pixelator/cli/analysis.py b/src/pixelator/cli/analysis.py index e677086a..4eaf84ec 100644 --- a/src/pixelator/cli/analysis.py +++ b/src/pixelator/cli/analysis.py @@ -13,8 +13,8 @@ from pixelator.analysis.colocalization.types import TransformationTypes from pixelator.cli.common import logger, output_option from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -164,20 +164,16 @@ def analysis( # sanity check on the anlyses if not any([compute_polarization, compute_colocalization]): - msg = "All the analysis are disabled, no scores will be computed" - click_echo(msg) - logger.warning(msg) + logger.warning("All the analysis are disabled, no scores will be computed") # create output folder if it does not exist analysis_output = create_output_stage_dir(output, "analysis") # compute graph/clusters using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for zip_file in input_files: - msg = f"Computing analysis for file {zip_file}" - click_echo(msg) - logger.info(msg) + logger.info(f"Computing analysis for file {zip_file}") clean_name = get_sample_name(zip_file) metrics_file = analysis_output / f"{clean_name}.report.json" diff --git a/src/pixelator/cli/annotate.py b/src/pixelator/cli/annotate.py index f2a82ab9..7919606e 100644 --- a/src/pixelator/cli/annotate.py +++ b/src/pixelator/cli/annotate.py @@ -11,8 +11,8 @@ from pixelator.cli.common import logger, output_option from pixelator.config import config, load_antibody_panel from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -120,13 +120,14 @@ def annotate( # warn if both --dynamic-filter and hard-coded sizes are input if min_size is not None and dynamic_filter in ["min", "both"]: - msg = "--dynamic-filter will overrule the value introduced in --min-size" - click_echo(msg, multiline=False) - logger.warning(msg) + logger.warning( + "--dynamic-filter will overrule the value introduced in --min-size" + ) + if max_size is not None and dynamic_filter in ["max", "both"]: - msg = "--dynamic-filter will overrule the value introduced in --max-size" - click_echo(msg, multiline=False) - logger.warning(msg) + logger.warning( + "--dynamic-filter will overrule the value introduced in --max-size" + ) # create output folder if it does not exist annotate_output = create_output_stage_dir(output, "annotate") @@ -135,12 +136,10 @@ def annotate( panel = load_antibody_panel(config, panel) # compute graph/components using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for ann_file in input_files: - msg = f"Computing annotation for file {ann_file}" - click_echo(msg, multiline=False) - logger.info(msg) + logger.info(f"Computing annotation for file {ann_file}") clean_name = get_sample_name(ann_file) metrics_file = annotate_output / f"{clean_name}.report.json" diff --git a/src/pixelator/cli/collapse.py b/src/pixelator/cli/collapse.py index b41d85d9..ee1b8dde 100644 --- a/src/pixelator/cli/collapse.py +++ b/src/pixelator/cli/collapse.py @@ -14,8 +14,8 @@ from pixelator.collapse import collapse_fastq from pixelator.config import config, get_position_in_parent, load_antibody_panel from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -165,11 +165,11 @@ def collapse( if umib_start is None and umib_end is not None: click.ClickException("You must specify both the start and end position in UMIB") if umia_start is None and umia_end is None: - click_echo("UMIA will be ignored in the collapsing", multiline=False) + logger.info("UMIA will be ignored in the collapsing") elif umia_end <= umia_start: click.ClickException("UMIA end or start positions seems to be incorrect") if umib_start is None and umib_end is None: - click_echo("UMIB will be ignored in the collapsing", multiline=False) + logger.info("UMIB will be ignored in the collapsing") elif umib_end <= umib_start: click.ClickException("UMIB end or start positions seems to be incorrect") @@ -198,7 +198,7 @@ def collapse( ) # run cutadapt (demux mode) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in files: # get the marker from the file name diff --git a/src/pixelator/cli/common.py b/src/pixelator/cli/common.py index 6d0dc105..f5abc58c 100644 --- a/src/pixelator/cli/common.py +++ b/src/pixelator/cli/common.py @@ -3,80 +3,25 @@ Copyright (c) 2022 Pixelgen Technologies AB. """ -import traceback import collections import functools -import logging import logging.handlers -import os from pathlib import Path from typing import Dict, Mapping, Optional -import sys -import click -import warnings - -from numba.core.errors import NumbaDeprecationWarning +import click BASE_DIR = str(Path(__file__).parent) - -pixelator_root_logger = logging.getLogger("pixelator") logger = logging.getLogger("pixelator.cli") -# Silence deprecation warnings -warnings.simplefilter("ignore", category=NumbaDeprecationWarning) -warnings.filterwarnings( - "ignore", - message="geopandas not available. Some functionality will be disabled.", - category=UserWarning, -) - - -def init_logger(log_file: str, verbose: bool) -> None: - """ - Helper function to create and initialize a logging object - with the arguments given - - :param log_file: the path to the log file - :param verbose: True to enable verbose mode (DEBUG) - :returns: None - """ - mode = "a" if os.path.isfile(log_file) else "w" - handler = logging.handlers.WatchedFileHandler(log_file, mode=mode) - formatter = logging.Formatter("%(asctime)s %(levelname)s:%(name)s:%(message)s") - handler.setFormatter(formatter) - pixelator_root_logger.addHandler(handler) - pixelator_root_logger.setLevel(logging.DEBUG if verbose else logging.INFO) - - # Disable matplot lib debug logs (that will clog all debug logs) - logging.getLogger("matplotlib.font_manager").setLevel(logging.ERROR) - logging.getLogger("matplotlib.ticker").setLevel(logging.ERROR) - logging.getLogger("numba").setLevel(logging.ERROR) - - def handle_unhandled_exception(exc_type, exc_value, exc_traceback): - if issubclass(exc_type, KeyboardInterrupt): - # Will call default excepthook - sys.__excepthook__(exc_type, exc_value, exc_traceback) - return - - message = traceback.print_exception(exc_type, exc_value, exc_traceback) - click.echo(message) - - # Create a critical level log message with info from the except hook. - pixelator_root_logger.critical( - "Unhandled exception", exc_info=(exc_type, exc_value, exc_traceback) - ) - - # Assign the excepthook to the handler - sys.excepthook = handle_unhandled_exception - - # code snipped obtained from # https://stackoverflow.com/questions/47972638/how-can-i-define-the-order-of-click-sub-commands-in-help # the purpose is to order subcommands in order of addition class OrderedGroup(click.Group): - def __init__( + """Custom click.Group that keeps insertion order for subcommands.""" + + def __init__( # noqa: D107 self, name: Optional[str] = None, commands: Optional[Dict[str, click.Command]] = None, @@ -88,10 +33,13 @@ def __init__( def list_commands( # type: ignore self, ctx: click.Context ) -> Mapping[str, click.Command]: + """Return a list of subcommands.""" return self.commands def output_option(func): + """Wrap a Click entrypoint to add the --output option.""" + @click.option( "--output", required=True, @@ -109,6 +57,7 @@ def wrapper(*args, **kwargs): def design_option(func): + """Decorate a click command and add the --design option.""" from pixelator.config import config assay_options = list(config.assays.keys()) diff --git a/src/pixelator/cli/demux.py b/src/pixelator/cli/demux.py index b78c9807..fce50bda 100644 --- a/src/pixelator/cli/demux.py +++ b/src/pixelator/cli/demux.py @@ -8,15 +8,19 @@ import click -from pixelator.cli.common import design_option, logger, output_option +from pixelator.cli.common import ( + design_option, + logger, + output_option, +) from pixelator.config import config from pixelator.config.panel import load_antibody_panel from pixelator.demux import demux_fastq from pixelator.utils import ( build_barcodes_file, - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -145,12 +149,10 @@ def demux( ) # run cutadapt (demux mode) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in input_files: - msg = f"Processing {fastq_file} with cutadapt (demux mode)" - click_echo(msg, multiline=False) - logger.info(msg) + logger.info(f"Processing {fastq_file} with cutadapt (demux mode)") name = get_sample_name(fastq_file) extension = get_extension(fastq_file) diff --git a/src/pixelator/cli/graph.py b/src/pixelator/cli/graph.py index 3ee50fb7..fd613f9b 100644 --- a/src/pixelator/cli/graph.py +++ b/src/pixelator/cli/graph.py @@ -10,8 +10,8 @@ from pixelator.cli.common import logger, output_option from pixelator.graph import connect_components from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -90,12 +90,10 @@ def graph( graph_output = create_output_stage_dir(output, "graph") # compute graph/components using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for pixelsf in input_files: - msg = f"Computing clusters for file {pixelsf}" - click_echo(msg) - logger.info(msg) + logger.info(f"Computing clusters for file {pixelsf}") clean_name = get_sample_name(pixelsf) metrics_file = graph_output / f"{clean_name}.report.json" diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py new file mode 100644 index 00000000..f1455b39 --- /dev/null +++ b/src/pixelator/cli/logging.py @@ -0,0 +1,228 @@ +"""Logging setup for pixelator. + +Copyright (c) 2022 Pixelgen Technologies AB. +""" +import atexit +import functools +import logging +import logging.handlers +import multiprocessing +import sys +import warnings +from pathlib import Path +from typing import Callable + +import click +from numba import NumbaDeprecationWarning + +from pixelator.utils import click_echo + +root_logger = logging.getLogger() +pixelator_root_logger = logging.getLogger("pixelator") + + +# Silence deprecation warnings +warnings.simplefilter("ignore", category=NumbaDeprecationWarning) +warnings.filterwarnings( + "ignore", + message="geopandas not available. Some functionality will be disabled.", + category=UserWarning, +) + + +# Disable matplot lib debug logs (that will clog all debug logs) +logging.getLogger("matplotlib.font_manager").setLevel(logging.ERROR) +logging.getLogger("matplotlib.ticker").setLevel(logging.ERROR) +logging.getLogger("numba").setLevel(logging.ERROR) + + +# ------------------------------------------------------------ +# Click logging +# ------------------------------------------------------------ + +LOGGER_KEY = __name__ + ".logger" +DEFAULT_LEVEL = logging.INFO + + +class ColorFormatter(logging.Formatter): + """Click formatter with colored levels""" + + colors = { + "debug": dict(fg="blue"), + "info": dict(fg="green"), + "warning": dict(fg="yellow"), + "error": dict(fg="orange"), + "exception": dict(fg="red"), + "critical": dict(fg="red"), + } + + def format(self, record: logging.LogRecord) -> str: + """Format a record with colored level. + + :param record: The record to format. + :returns: A formatted log record. + """ + if not record.exc_info: + level = record.levelname.lower() + msg = record.getMessage() + if level in self.colors: + timestamp = self.formatTime(record, self.datefmt) + colored_level = click.style( + f"{level.upper():<10}", **self.colors[level] + ) + prefix = f"{timestamp} [{colored_level}] " + msg = "\n".join(prefix + x for x in msg.splitlines()) + return msg + return logging.Formatter.format(self, record) + + +class DefaultCliFormatter(ColorFormatter): + """Click formatter with colored levels""" + + def format(self, record): + """Format a record for CLI output.""" + if not record.exc_info: + level = record.levelname.lower() + msg = record.getMessage() + + if level == "info": + return msg + + return f"{level.upper()}: {msg}" + return logging.Formatter.format(self, record) + + +class ClickHandler(logging.Handler): + """Click logging handler. + + Messages are forwarded to stdout using `click.echo`. + """ + + _use_stderr = True + + def emit(self, record): + """ + Do whatever it takes to actually log the specified logging record. + + This version is intended to be implemented by subclasses and so + raises a NotImplementedError. + """ + try: + msg = self.format(record) + click.echo(msg, err=self._use_stderr) + except Exception: + self.handleError(record) + + +class LoggingSetup: + """Logging setup for multiprocessing. + + This class is used to set up logging for multiprocessing. + All messages are passed to a separate process that handles the logging to a file. + + + """ + + def __init__(self, log_file: Path, verbose: bool, logger=None): + """Initialize the logging setup. + + :param log_file: the filename of the log output + :param verbose: enable verbose logging and console output + :param logger: the logger to configure, default is the root logger + """ + self.log_file = log_file + self.verbose = verbose + self._root_logger = logger or logging.getLogger() + self._queue: multiprocessing.Queue = multiprocessing.Queue(-1) + + self._listener_process = multiprocessing.Process( + name="log-process", + target=self._listener_process_main, + args=( + self._queue, + functools.partial( + self._listener_logging_setup, self.log_file, self.verbose + ), + ), + ) + atexit.register(self._shutdown_listener, self._queue, self._listener_process) + + @staticmethod + def _shutdown_listener(queue, process): + """Send the stop token to the logging process and wait for it to finish.""" + queue.put(None) + process.join() + + def initialize(self): + """Configure logging and start the listener process.""" + self._listener_process.start() + + handler = logging.handlers.QueueHandler(self._queue) + self._root_logger.handlers = [handler] + self._root_logger.setLevel(logging.DEBUG if self.verbose else logging.INFO) + + console_handler = ClickHandler() + if not self.verbose: + console_handler.setFormatter(DefaultCliFormatter()) + self._root_logger.addHandler(console_handler) + else: + console_handler.setFormatter(ColorFormatter(datefmt="%Y-%m-%d %H:%M:%S")) + self._root_logger.addHandler(console_handler) + + @staticmethod + def _listener_logging_setup(log_file: str, verbose: bool): + """Initialize the logging in the listener process.""" + logger = logging.getLogger() + handler = logging.FileHandler(str(log_file), mode="w") + formatter = logging.Formatter( + "%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG if verbose else logging.INFO) + + @staticmethod + def _listener_process_main( + queue: multiprocessing.Queue, configure: Callable[[], None] + ): + """Entrypoint for the logging process. + + :param queue: the queue to read log messages from + :param configure: the function to call to configure the logging + """ + configure() + + logging.info("Logging process started.") + while True: + try: + record = queue.get() + if ( + record is None + ): # We send this as a sentinel to tell the listener to quit. + break + logger = logging.getLogger(record.name) + logger.handle(record) # No level or filter logic applied - just do it! + except Exception: + import traceback + + click_echo("An unexpected error occurred in the log handler") + click_echo(traceback.format_exc()) + + logging.info("Logging process stopped.") + + +def handle_unhandled_exception(exc_type, exc_value, exc_traceback): + """Handle "unhandled" exceptions.""" + if issubclass(exc_type, KeyboardInterrupt): + # Will call default excepthook + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + # Create a critical level log message with info from the except hook. + pixelator_root_logger.critical( + "Unhandled exception", exc_info=(exc_type, exc_value, exc_traceback) + ) + + +# Assign the excepthook to the handler +sys.excepthook = handle_unhandled_exception diff --git a/src/pixelator/cli/main.py b/src/pixelator/cli/main.py index 0aa4726c..2838ef8f 100644 --- a/src/pixelator/cli/main.py +++ b/src/pixelator/cli/main.py @@ -5,6 +5,7 @@ import atexit import multiprocessing import sys +from pathlib import Path import click import yappi @@ -15,9 +16,10 @@ from pixelator.cli.analysis import analysis from pixelator.cli.annotate import annotate from pixelator.cli.collapse import collapse -from pixelator.cli.common import OrderedGroup, init_logger +from pixelator.cli.common import OrderedGroup, logger from pixelator.cli.demux import demux from pixelator.cli.graph import graph +from pixelator.cli.logging import LoggingSetup from pixelator.cli.misc import list_single_cell_designs, list_single_cell_panels from pixelator.cli.plugin import add_cli_plugins from pixelator.cli.preqc import preqc @@ -57,20 +59,23 @@ help="The number of cpu cores to use for parallel processing", ) @click.pass_context -def main_cli(ctx, verbose, profile, log_file, cores): +def main_cli(ctx, verbose: bool, profile: bool, log_file: str, cores: int): """Run the main CLI entrypoint for pixelator.""" # early out if run in help mode if any(x in sys.argv for x in ["--help", "--version"]): return 0 + if verbose: + logger.info("Running in VERBOSE mode") + # activate profiling mode if profile: - click_echo("Running in profiling mode") + logger.info("Running in profiling mode") yappi.start() def exit(): yappi.stop() - click_echo("Profiling completed") + logger.info("Profiling completed") processes = yappi.get_thread_stats() # Make sure to get profile metrics for each thread for p in processes: @@ -79,18 +84,14 @@ def exit(): atexit.register(exit) - if verbose: - click_echo("Running in VERBOSE mode") - - if log_file is not None: - init_logger(log_file, verbose) - # Pass arguments to other commands ctx.ensure_object(dict) + ctx.obj["LOGGER"] = LoggingSetup(Path(log_file), verbose=verbose) ctx.obj["VERBOSE"] = verbose ctx.obj["CORES"] = max(1, cores) + ctx.obj["LOGGER"].initialize_worker() return 0 @@ -117,7 +118,6 @@ def exit(): ) def single_cell(): """Build the click group for single-cell commands.""" - pass # Add single-cell top level command to cli diff --git a/src/pixelator/cli/preqc.py b/src/pixelator/cli/preqc.py index c9380dbb..43cfaf3e 100644 --- a/src/pixelator/cli/preqc.py +++ b/src/pixelator/cli/preqc.py @@ -16,9 +16,9 @@ from pixelator.config import config from pixelator.qc import qc_fastq from pixelator.utils import ( - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -170,12 +170,10 @@ def preqc( preqc_output = create_output_stage_dir(output, "preqc") # run fastq (pre QC and filtering) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in input_files: - msg = f"Processing {fastq_file} with fastp" - click_echo(msg, multiline=False) - logger.info(msg) + logger.info(f"Processing {fastq_file} with fastp") clean_name = get_sample_name(fastq_file) extension = get_extension(fastq_file) diff --git a/src/pixelator/cli/report.py b/src/pixelator/cli/report.py index 63e1a9c0..ce04662d 100644 --- a/src/pixelator/cli/report.py +++ b/src/pixelator/cli/report.py @@ -11,7 +11,7 @@ from pixelator.cli.common import output_option from pixelator.report import make_report -from pixelator.utils import click_echo, create_output_stage_dir, log_step_start, timer +from pixelator.utils import create_output_stage_dir, log_step_start, timer logger = logging.getLogger(__name__) @@ -86,7 +86,7 @@ def report( ) # create html reports - click_echo(f"Creating report for data present in {input_folder}") + logger.info(f"Creating report for data present in {input_folder}") make_report( input_path=input_folder, diff --git a/src/pixelator/collapse/process.py b/src/pixelator/collapse/process.py index 8b683cb4..94e09f14 100644 --- a/src/pixelator/collapse/process.py +++ b/src/pixelator/collapse/process.py @@ -34,7 +34,7 @@ from pixelator.types import PathType from pixelator.utils import gz_size -logger = logging.getLogger(__name__) +logger = logging.getLogger("pixelator.collapse") np.random.seed(SEED) @@ -111,7 +111,7 @@ def build_binary_data(seqs: list[str]) -> npt.NDArray[np.uint8]: return data -def get_collapsed_fragments_for_component( +def get_collapsed_fragments_for_component( # noqa: DOC402,DOC404 components: list[set[UniqueFragment]], counts: dict[UniqueFragment, int] ) -> Generator[CollapsedFragment, None, None]: """Take the representative sequence from a component based on its counts. @@ -411,15 +411,6 @@ def filter_by_minimum_upib_count( """ unique_reads = {k: v for k, v in unique_reads.items() if len(v) >= min_count} # in case there are no reads after filtering - if not unique_reads: - logger.warning( - ( - "The input file %s does not any contain" - "reads after filtering by count >= %i" - ), - input, - min_count, - ) return unique_reads @@ -658,6 +649,14 @@ def collapse_fastq( if min_count and min_count > 1: unique_reads = filter_by_minimum_upib_count(unique_reads, min_count) if not unique_reads: + logger.warning( + ( + "The input file %s does not any contain" + "reads after filtering by count >= %i" + ), + input_file, + min_count, + ) return None if algorithm == "adjacency": diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index d7c3aeeb..beba63f4 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -4,13 +4,13 @@ Copyright (c) 2022 Pixelgen Technologies AB. """ import logging +import tempfile from gzip import BadGzipFile -from tempfile import NamedTemporaryFile import pytest from pixelator import __version__ -from pixelator.cli.common import init_logger +from pixelator.cli.logging import LoggingSetup from pixelator.utils import ( gz_size, log_step_start, @@ -96,11 +96,13 @@ def my_func(): def test_verbose_logging_is_activated(): - with NamedTemporaryFile() as test_log_file: - init_logger(log_file=test_log_file.name, verbose=True) - - root_logger = logging.getLogger("pixelator") + test_log_file = tempfile.TemporaryFile() + log_context = LoggingSetup(test_log_file, verbose=True) + with log_context: + root_logger = logging.getLogger() assert root_logger.getEffectiveLevel() == logging.DEBUG - init_logger(log_file=test_log_file.name, verbose=False) + log_context = LoggingSetup(test_log_file, verbose=True) + with log_context: + root_logger = logging.getLogger() assert root_logger.getEffectiveLevel() == logging.INFO From efaaaab45a40227a3f1c938d32d429ca1cb3114a Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Thu, 25 Jan 2024 13:41:50 +0100 Subject: [PATCH 03/13] fix: update test and some linting --- src/pixelator/cli/logging.py | 93 ++++++++++++++++++++++++++---------- src/pixelator/cli/main.py | 5 +- tests/utils/test_utils.py | 11 +++-- 3 files changed, 77 insertions(+), 32 deletions(-) diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py index f1455b39..4d6bee9b 100644 --- a/src/pixelator/cli/logging.py +++ b/src/pixelator/cli/logging.py @@ -8,6 +8,8 @@ import logging.handlers import multiprocessing import sys +import time +import typing import warnings from pathlib import Path from typing import Callable @@ -15,6 +17,7 @@ import click from numba import NumbaDeprecationWarning +from pixelator.types import PathType from pixelator.utils import click_echo root_logger = logging.getLogger() @@ -44,16 +47,22 @@ DEFAULT_LEVEL = logging.INFO +class StyleDict(typing.TypedDict): + """Style dictionary for kwargs to `click.style`.""" + + fg: str + + class ColorFormatter(logging.Formatter): """Click formatter with colored levels""" - colors = { - "debug": dict(fg="blue"), - "info": dict(fg="green"), - "warning": dict(fg="yellow"), - "error": dict(fg="orange"), - "exception": dict(fg="red"), - "critical": dict(fg="red"), + colors: dict[str, StyleDict] = { + "debug": StyleDict(fg="blue"), + "info": StyleDict(fg="green"), + "warning": StyleDict(fg="yellow"), + "error": StyleDict(fg="orange"), + "exception": StyleDict(fg="red"), + "critical": StyleDict(fg="red"), } def format(self, record: logging.LogRecord) -> str: @@ -123,14 +132,14 @@ class LoggingSetup: """ - def __init__(self, log_file: Path, verbose: bool, logger=None): + def __init__(self, log_file: PathType | None, verbose: bool, logger=None): """Initialize the logging setup. :param log_file: the filename of the log output :param verbose: enable verbose logging and console output :param logger: the logger to configure, default is the root logger """ - self.log_file = log_file + self.log_file = Path(log_file) if log_file is not None else None self.verbose = verbose self._root_logger = logger or logging.getLogger() self._queue: multiprocessing.Queue = multiprocessing.Queue(-1) @@ -145,17 +154,32 @@ def __init__(self, log_file: Path, verbose: bool, logger=None): ), ), ) - atexit.register(self._shutdown_listener, self._queue, self._listener_process) - @staticmethod - def _shutdown_listener(queue, process): - """Send the stop token to the logging process and wait for it to finish.""" - queue.put(None) - process.join() + def _shutdown_listener(self, timeout: int = 10) -> None: + """Send the stop token to the logging process and wait for it to finish. + + :param timeout: The number of seconds to wait for the process to finish. + """ + self._queue.put(None) + + start = time.time() + while time.time() - start <= timeout: + if self._listener_process.is_alive(): + time.sleep(0.1) + else: + # we cannot join a process that is not started + if self._listener_process.ident is not None: + self._listener_process.join() + break + else: + # We only enter this if we didn't 'break' above during the while loop! + self._listener_process.terminate() def initialize(self): """Configure logging and start the listener process.""" - self._listener_process.start() + # We do not need the listener process if there is no file to log to + if self.log_file: + self._listener_process.start() handler = logging.handlers.QueueHandler(self._queue) self._root_logger.handlers = [handler] @@ -169,17 +193,38 @@ def initialize(self): console_handler.setFormatter(ColorFormatter(datefmt="%Y-%m-%d %H:%M:%S")) self._root_logger.addHandler(console_handler) + atexit.register(self._shutdown_listener) + + def __enter__(self): + """Enter the context manager. + + This will initialize the logging setup. + """ + self.initialize() + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the context manager. + + This will shut down the logging process if needed. + """ + self._shutdown_listener() + atexit.unregister(self._shutdown_listener) + # Reraise exception higher up the stack + return False + @staticmethod - def _listener_logging_setup(log_file: str, verbose: bool): + def _listener_logging_setup(log_file: Path | None, verbose: bool): """Initialize the logging in the listener process.""" logger = logging.getLogger() - handler = logging.FileHandler(str(log_file), mode="w") - formatter = logging.Formatter( - "%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s" - ) - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging.DEBUG if verbose else logging.INFO) + + if log_file: + handler = logging.FileHandler(str(log_file), mode="w") + formatter = logging.Formatter( + "%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG if verbose else logging.INFO) @staticmethod def _listener_process_main( diff --git a/src/pixelator/cli/main.py b/src/pixelator/cli/main.py index 2838ef8f..37b1f98e 100644 --- a/src/pixelator/cli/main.py +++ b/src/pixelator/cli/main.py @@ -5,7 +5,6 @@ import atexit import multiprocessing import sys -from pathlib import Path import click import yappi @@ -87,11 +86,11 @@ def exit(): # Pass arguments to other commands ctx.ensure_object(dict) - ctx.obj["LOGGER"] = LoggingSetup(Path(log_file), verbose=verbose) + ctx.obj["LOGGER"] = LoggingSetup(log_file, verbose=verbose) ctx.obj["VERBOSE"] = verbose ctx.obj["CORES"] = max(1, cores) - ctx.obj["LOGGER"].initialize_worker() + ctx.obj["LOGGER"].initialize() return 0 diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index beba63f4..90a6e7ab 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -96,13 +96,14 @@ def my_func(): def test_verbose_logging_is_activated(): - test_log_file = tempfile.TemporaryFile() - log_context = LoggingSetup(test_log_file, verbose=True) - with log_context: + test_log_file = tempfile.NamedTemporaryFile() + with LoggingSetup(test_log_file.name, verbose=True): root_logger = logging.getLogger() assert root_logger.getEffectiveLevel() == logging.DEBUG - log_context = LoggingSetup(test_log_file, verbose=True) - with log_context: + +def test_verbose_logging_is_deactivated(): + test_log_file = tempfile.NamedTemporaryFile() + with LoggingSetup(test_log_file.name, verbose=False): root_logger = logging.getLogger() assert root_logger.getEffectiveLevel() == logging.INFO From 9f12d4d4bcd81e01103cf34c312d2bcfbdfe602a Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Thu, 25 Jan 2024 13:45:45 +0100 Subject: [PATCH 04/13] chore: update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51f6cab7..e2bf433e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Performance improvements and reduced bundle size in QC report. * Fix deflated counts in the edgelist after collapse. +* Improved console output in verbose mode. +* Improved logging from multiprocessing jobs. ## [0.16.1] - 2024-01-12 From 8258fffe688a63ec19a6bbf17a775df6957da4b3 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Thu, 25 Jan 2024 15:18:55 +0100 Subject: [PATCH 05/13] test: add multiprocess logging test case --- src/pixelator/cli/logging.py | 24 +++++++++++----------- tests/utils/test_utils.py | 39 ++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py index 4d6bee9b..43cff11b 100644 --- a/src/pixelator/cli/logging.py +++ b/src/pixelator/cli/logging.py @@ -60,7 +60,7 @@ class ColorFormatter(logging.Formatter): "debug": StyleDict(fg="blue"), "info": StyleDict(fg="green"), "warning": StyleDict(fg="yellow"), - "error": StyleDict(fg="orange"), + "error": StyleDict(fg="red"), "exception": StyleDict(fg="red"), "critical": StyleDict(fg="red"), } @@ -85,10 +85,10 @@ def format(self, record: logging.LogRecord) -> str: return logging.Formatter.format(self, record) -class DefaultCliFormatter(ColorFormatter): +class DefaultCliFormatter(logging.Formatter): """Click formatter with colored levels""" - def format(self, record): + def format(self, record: logging.LogRecord) -> str: """Format a record for CLI output.""" if not record.exc_info: level = record.levelname.lower() @@ -107,7 +107,7 @@ class ClickHandler(logging.Handler): Messages are forwarded to stdout using `click.echo`. """ - _use_stderr = True + _use_stderr = False def emit(self, record): """ @@ -118,7 +118,7 @@ def emit(self, record): """ try: msg = self.format(record) - click.echo(msg, err=self._use_stderr) + click.echo(msg, file=sys.stdout, err=self._use_stderr) except Exception: self.handleError(record) @@ -181,18 +181,18 @@ def initialize(self): if self.log_file: self._listener_process.start() - handler = logging.handlers.QueueHandler(self._queue) - self._root_logger.handlers = [handler] + handlers = [] + handlers.append(logging.handlers.QueueHandler(self._queue)) self._root_logger.setLevel(logging.DEBUG if self.verbose else logging.INFO) console_handler = ClickHandler() - if not self.verbose: - console_handler.setFormatter(DefaultCliFormatter()) - self._root_logger.addHandler(console_handler) - else: + if self.verbose: console_handler.setFormatter(ColorFormatter(datefmt="%Y-%m-%d %H:%M:%S")) - self._root_logger.addHandler(console_handler) + else: + console_handler.setFormatter(DefaultCliFormatter()) + handlers.append(console_handler) + self._root_logger.handlers = handlers atexit.register(self._shutdown_listener) def __enter__(self): diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index 90a6e7ab..da6fda6a 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -5,6 +5,7 @@ """ import logging import tempfile +from concurrent.futures import ProcessPoolExecutor from gzip import BadGzipFile import pytest @@ -107,3 +108,41 @@ def test_verbose_logging_is_deactivated(): with LoggingSetup(test_log_file.name, verbose=False): root_logger = logging.getLogger() assert root_logger.getEffectiveLevel() == logging.INFO + + +def helper_log_fn(args): + import logging + + root_logger = logging.getLogger() + root_logger.log(*args) + + +@pytest.mark.parametrize("verbose", [True, False]) +def test_multiprocess_logging(verbose): + """Test that logging works in a multiprocess environment.""" + test_log_file = tempfile.NamedTemporaryFile() + + with LoggingSetup(test_log_file.name, verbose=verbose): + tasks = [ + (logging.DEBUG, "This is a debug message"), + (logging.INFO, "This is an info message"), + (logging.WARNING, "This is a warning message"), + (logging.ERROR, "This is an error message"), + (logging.CRITICAL, "This is a critical message"), + ] + + with ProcessPoolExecutor(max_workers=4) as executor: + for r in executor.map(helper_log_fn, tasks): + pass + + # Test that the console output has the expected messages + with open(test_log_file.name, "r") as f: + log_content = f.read() + + if verbose: + assert "This is a debug message" in log_content + + assert "This is an info message" in log_content + assert "This is a warning message" in log_content + assert "This is an error message" in log_content + assert "This is a critical message" in log_content From dcedbc1cc9e8768544e74b7e8eaa0f0644e1c2c4 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Thu, 25 Jan 2024 16:34:20 +0100 Subject: [PATCH 06/13] style: reformat --- src/pixelator/collapse/process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pixelator/collapse/process.py b/src/pixelator/collapse/process.py index 94e09f14..ed8697dd 100644 --- a/src/pixelator/collapse/process.py +++ b/src/pixelator/collapse/process.py @@ -111,7 +111,7 @@ def build_binary_data(seqs: list[str]) -> npt.NDArray[np.uint8]: return data -def get_collapsed_fragments_for_component( # noqa: DOC402,DOC404 +def get_collapsed_fragments_for_component( # noqa: DOC402,DOC404 components: list[set[UniqueFragment]], counts: dict[UniqueFragment, int] ) -> Generator[CollapsedFragment, None, None]: """Take the representative sequence from a component based on its counts. From fc7c5e9b424992a0d0dd4169850470a354e6979e Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Thu, 25 Jan 2024 17:05:08 +0100 Subject: [PATCH 07/13] tests: add SequenceQualityStatsCollector.read_count test --- tests/amplicon/test_statistics.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/amplicon/test_statistics.py b/tests/amplicon/test_statistics.py index 8784c29b..0fa1c1bc 100644 --- a/tests/amplicon/test_statistics.py +++ b/tests/amplicon/test_statistics.py @@ -22,6 +22,7 @@ def test_sequence_quality_stats_collector_update(): collector.update(rng.normal(30, 5, size=132)) collector.update(rng.normal(30, 5, size=132)) + assert collector.read_count == 2 assert collector.stats == SequenceQualityStats( fraction_q30_upia=0.42, fraction_q30_upib=0.46, From b690aedc3606a0d2fa149d1bc5c865e3472648da Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Fri, 26 Jan 2024 11:32:41 +0100 Subject: [PATCH 08/13] style: inline log message format strings --- src/pixelator/amplicon/process.py | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/src/pixelator/amplicon/process.py b/src/pixelator/amplicon/process.py index 57e873c7..7445d5f9 100644 --- a/src/pixelator/amplicon/process.py +++ b/src/pixelator/amplicon/process.py @@ -150,19 +150,13 @@ def amplicon_fastq( mode = "single-end" if len(inputs) == 1 else "paired-end" stats = SequenceQualityStatsCollector(design) - start1_log_msg = "Building amplicon of %s to %s" - start2_log_msg = "Building amplicon of %s and %s to %s" - end1_log_msg = "Finished building amplicon of %s to %s" - end2_log_msg = "Finished building amplicon of %s and %s to %s" - progress_log_msg = "Generating amplicon for %s: %s reads processed" - amplicon = assay.get_region_by_id("amplicon") if amplicon is None: raise RuntimeError("Design does not have a region with id: amplicon") # Single end mode if mode == "single-end": - logger.debug(start1_log_msg, inputs[0], output) + logger.debug("Building amplicon of %s to %s", inputs[0], output) with xopen(output, "wb") as f: for record in pyfastx.Fastq(str(inputs[0]), build_index=False): @@ -171,7 +165,9 @@ def amplicon_fastq( stats.update(new_qual) if mode == "paired-end": - logger.debug(start2_log_msg, inputs[0], inputs[1], output) + logger.debug( + "Building amplicon of %s and %s to %s", inputs[0], inputs[1], output + ) with xopen(output, "wb") as f: for idx, (record1, record2) in enumerate( @@ -181,13 +177,19 @@ def amplicon_fastq( ) ): if idx % 100000 == 0: - logger.debug(progress_log_msg, str(output), str(idx)) + logger.debug( + "Generating amplicon for %s: %s reads processed", + str(output), + str(idx), + ) name, new_seq, new_qual = generate_amplicon(record1, record2, amplicon) write_record(f, name, new_seq, new_qual) stats.update(new_qual) - logger.info(progress_log_msg, str(output), stats.read_count) + logger.info( + "Generating amplicon for %s: %s reads processed", str(output), stats.read_count + ) # add metrics to JSON file avg_stats = stats.stats @@ -196,6 +198,11 @@ def amplicon_fastq( json.dump(data, json_file, sort_keys=True, indent=4) if mode == "single-end": - logger.debug(end1_log_msg, inputs[0], output) + logger.debug("Finished building amplicon of %s to %s", inputs[0], output) else: - logger.debug(end2_log_msg, inputs[0], inputs[1], output) + logger.debug( + "Finished building amplicon of %s and %s to %s", + inputs[0], + inputs[1], + output, + ) From 40201af72b6bac7e3d1d2874ada8f43886cd8139 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Fri, 26 Jan 2024 11:33:11 +0100 Subject: [PATCH 09/13] style: remove DOC203 from ignored checks --- .flake8 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.flake8 b/.flake8 index 8362dc81..5cda42b8 100644 --- a/.flake8 +++ b/.flake8 @@ -1,6 +1,6 @@ [flake8] -extend-ignore = E501,E402,W503,E203,D213,D203,DOC301,DOC502,DOC203 +extend-ignore = E501,E402,W503,E203,D213,D203,DOC301,DOC502 exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,cue.mod docstring-convention = all style = sphinx From e556f6fef4102252d4c881aea8c257647f061cb4 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Fri, 26 Jan 2024 11:45:31 +0100 Subject: [PATCH 10/13] feat: use stderr for cli logging by default --- src/pixelator/cli/logging.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py index 43cff11b..7a911826 100644 --- a/src/pixelator/cli/logging.py +++ b/src/pixelator/cli/logging.py @@ -69,7 +69,7 @@ def format(self, record: logging.LogRecord) -> str: """Format a record with colored level. :param record: The record to format. - :returns: A formatted log record. + :returns str: A formatted log record. """ if not record.exc_info: level = record.levelname.lower() @@ -105,17 +105,16 @@ class ClickHandler(logging.Handler): """Click logging handler. Messages are forwarded to stdout using `click.echo`. - """ - _use_stderr = False + :param use_stderr: Log to sys.stderr instead of sys.stdout. + """ - def emit(self, record): - """ - Do whatever it takes to actually log the specified logging record. + def __init__(self, use_stderr: bool = True): + """Initialize the click handler.""" + self._use_stderr = use_stderr - This version is intended to be implemented by subclasses and so - raises a NotImplementedError. - """ + def emit(self, record: logging.LogRecord) -> None: + """Do whatever it takes to actually log the specified logging record.""" try: msg = self.format(record) click.echo(msg, file=sys.stdout, err=self._use_stderr) @@ -128,8 +127,6 @@ class LoggingSetup: This class is used to set up logging for multiprocessing. All messages are passed to a separate process that handles the logging to a file. - - """ def __init__(self, log_file: PathType | None, verbose: bool, logger=None): From 8d1d9fdf2ea17423db2ae252b635ba69e5731386 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Fri, 26 Jan 2024 11:59:34 +0100 Subject: [PATCH 11/13] fix: reraise exceptions in logging process error logic --- src/pixelator/cli/logging.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py index 7a911826..184eccde 100644 --- a/src/pixelator/cli/logging.py +++ b/src/pixelator/cli/logging.py @@ -18,7 +18,6 @@ from numba import NumbaDeprecationWarning from pixelator.types import PathType -from pixelator.utils import click_echo root_logger = logging.getLogger() pixelator_root_logger = logging.getLogger("pixelator") @@ -231,6 +230,8 @@ def _listener_process_main( :param queue: the queue to read log messages from :param configure: the function to call to configure the logging + :raises Exception: if an unexpected error occurs + when handling log messages """ configure() @@ -247,8 +248,9 @@ def _listener_process_main( except Exception: import traceback - click_echo("An unexpected error occurred in the log handler") - click_echo(traceback.format_exc()) + print("An unexpected error occurred in the log handler") + print(traceback.format_exc()) + raise logging.info("Logging process stopped.") From 88ea7d6016df09683dae8ca5f636b1ccc501470b Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Fri, 26 Jan 2024 12:01:44 +0100 Subject: [PATCH 12/13] fix: add missing super().__init__ call in ClickHandler --- src/pixelator/cli/logging.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py index 184eccde..082337ba 100644 --- a/src/pixelator/cli/logging.py +++ b/src/pixelator/cli/logging.py @@ -104,12 +104,16 @@ class ClickHandler(logging.Handler): """Click logging handler. Messages are forwarded to stdout using `click.echo`. - - :param use_stderr: Log to sys.stderr instead of sys.stdout. """ - def __init__(self, use_stderr: bool = True): - """Initialize the click handler.""" + def __init__(self, *args, use_stderr: bool = True, **kwargs): + """Initialize the click handler. + + :param use_stderr: Log to sys.stderr instead of sys.stdout. + :param args: The arguments to pass to the base class. + :param kwargs: The keyword arguments to pass to base class. + """ + super().__init__(*args, **kwargs) self._use_stderr = use_stderr def emit(self, record: logging.LogRecord) -> None: From 208bccbbe48a9b3bc9406b3251867fc0dd604d17 Mon Sep 17 00:00:00 2001 From: fbdtemme Date: Tue, 30 Jan 2024 12:06:52 +0100 Subject: [PATCH 13/13] refactor: small tweaks --- src/pixelator/cli/logging.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py index 082337ba..17a11680 100644 --- a/src/pixelator/cli/logging.py +++ b/src/pixelator/cli/logging.py @@ -19,9 +19,6 @@ from pixelator.types import PathType -root_logger = logging.getLogger() -pixelator_root_logger = logging.getLogger("pixelator") - # Silence deprecation warnings warnings.simplefilter("ignore", category=NumbaDeprecationWarning) @@ -42,9 +39,6 @@ # Click logging # ------------------------------------------------------------ -LOGGER_KEY = __name__ + ".logger" -DEFAULT_LEVEL = logging.INFO - class StyleDict(typing.TypedDict): """Style dictionary for kwargs to `click.style`.""" @@ -106,21 +100,23 @@ class ClickHandler(logging.Handler): Messages are forwarded to stdout using `click.echo`. """ - def __init__(self, *args, use_stderr: bool = True, **kwargs): + def __init__(self, level: int = 0, use_stderr: bool = True): """Initialize the click handler. + :param level: The logging level. :param use_stderr: Log to sys.stderr instead of sys.stdout. - :param args: The arguments to pass to the base class. - :param kwargs: The keyword arguments to pass to base class. """ - super().__init__(*args, **kwargs) + super().__init__(level=level) self._use_stderr = use_stderr def emit(self, record: logging.LogRecord) -> None: - """Do whatever it takes to actually log the specified logging record.""" + """Do whatever it takes to actually log the specified logging record. + + :param record: The record to log. + """ try: msg = self.format(record) - click.echo(msg, file=sys.stdout, err=self._use_stderr) + click.echo(msg, err=self._use_stderr) except Exception: self.handleError(record) @@ -261,6 +257,8 @@ def _listener_process_main( def handle_unhandled_exception(exc_type, exc_value, exc_traceback): """Handle "unhandled" exceptions.""" + pixelator_root_logger = logging.getLogger("pixelator") + if issubclass(exc_type, KeyboardInterrupt): # Will call default excepthook sys.__excepthook__(exc_type, exc_value, exc_traceback)