diff --git a/.flake8 b/.flake8 index a5bfc1c7..5cda42b8 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,5 @@ [flake8] + extend-ignore = E501,E402,W503,E203,D213,D203,DOC301,DOC502 exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,cue.mod docstring-convention = all diff --git a/CHANGELOG.md b/CHANGELOG.md index e6baf52e..3dbde026 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Change name and description of `Avg. Reads per Cell` and `Avg. Reads Usable per Cell` in QC report. * Fix a bug in how discarded UMIs are calculated and reported. * Fix deflated counts in the edgelist after collapse. +* Improved console output in verbose mode. +* Improved logging from multiprocessing jobs. ## [0.16.1] - 2024-01-12 diff --git a/src/pixelator/amplicon/process.py b/src/pixelator/amplicon/process.py index 78e47691..7445d5f9 100644 --- a/src/pixelator/amplicon/process.py +++ b/src/pixelator/amplicon/process.py @@ -150,18 +150,13 @@ def amplicon_fastq( mode = "single-end" if len(inputs) == 1 else "paired-end" stats = SequenceQualityStatsCollector(design) - start1_log_msg = "Starting the concatenation of %s to %s" - start2_log_msg = "Starting the concatenation of %s and %s to %s" - end1_log_msg = "Finished the concatenation of %s to %s" - end2_log_msg = "Finished the concatenation of %s and %s to %s" - amplicon = assay.get_region_by_id("amplicon") if amplicon is None: raise RuntimeError("Design does not have a region with id: amplicon") # Single end mode if mode == "single-end": - logger.debug(start1_log_msg, inputs[0], output) + logger.debug("Building amplicon of %s to %s", inputs[0], output) with xopen(output, "wb") as f: for record in pyfastx.Fastq(str(inputs[0]), build_index=False): @@ -170,17 +165,31 @@ def amplicon_fastq( stats.update(new_qual) if mode == "paired-end": - logger.debug(start2_log_msg, inputs[0], inputs[1], output) + logger.debug( + "Building amplicon of %s and %s to %s", inputs[0], inputs[1], output + ) with xopen(output, "wb") as f: - for record1, record2 in zip( - pyfastx.Fastq(str(inputs[0]), build_index=False), - pyfastx.Fastq(str(inputs[1]), build_index=False), + for idx, (record1, record2) in enumerate( + zip( + pyfastx.Fastq(str(inputs[0]), build_index=False), + pyfastx.Fastq(str(inputs[1]), build_index=False), + ) ): + if idx % 100000 == 0: + logger.debug( + "Generating amplicon for %s: %s reads processed", + str(output), + str(idx), + ) + name, new_seq, new_qual = generate_amplicon(record1, record2, amplicon) write_record(f, name, new_seq, new_qual) stats.update(new_qual) + logger.info( + "Generating amplicon for %s: %s reads processed", str(output), stats.read_count + ) # add metrics to JSON file avg_stats = stats.stats @@ -189,6 +198,11 @@ def amplicon_fastq( json.dump(data, json_file, sort_keys=True, indent=4) if mode == "single-end": - logger.debug(end1_log_msg, inputs[0], output) + logger.debug("Finished building amplicon of %s to %s", inputs[0], output) else: - logger.debug(end2_log_msg, inputs[0], inputs[1], output) + logger.debug( + "Finished building amplicon of %s and %s to %s", + inputs[0], + inputs[1], + output, + ) diff --git a/src/pixelator/amplicon/statistics.py b/src/pixelator/amplicon/statistics.py index 3602a72a..428bbf8b 100644 --- a/src/pixelator/amplicon/statistics.py +++ b/src/pixelator/amplicon/statistics.py @@ -1,10 +1,11 @@ -""" +"""Collect statistics for amplicon. + Copyright (c) 2023 Pixelgen Technologies AB. """ import collections import dataclasses from functools import cache -from typing import Any, Dict, Tuple, Union +from typing import Any, Union import numpy as np @@ -31,8 +32,10 @@ def _count_elem_in_array_where_greater_or_equal_than(arr, value): return result -@dataclasses.dataclass(frozen=True) +@dataclasses.dataclass(frozen=True, slots=True) class SequenceQualityStats: + """Container for sequence quality statistics.""" + fraction_q30_upia: float fraction_q30_upib: float fraction_q30_umi: float @@ -41,7 +44,8 @@ class SequenceQualityStats: fraction_q30_bc: float fraction_q30: float - def asdict(self) -> Dict[str, Any]: + def asdict(self) -> dict[str, Any]: + """Return a dictionary representation of this instance.""" return {k: v for k, v in dataclasses.asdict(self).items()} @@ -49,6 +53,10 @@ class SequenceQualityStatsCollector: """Accumulate read quality statistics for a given design.""" def __init__(self, design_name: str): + """Accumulate read quality statistics for a given design. + + :param design_name: The name of the design of the reads for which to statistics. + """ design = config.get_assay(design_name) if design is None: @@ -81,19 +89,25 @@ def __init__(self, design_name: str): raise ValueError("Assay does not contain a UMI region") @cache - def get_position(self, region_id: str) -> Tuple[int, int]: + def get_position(self, region_id: str) -> tuple[int, int]: + """Return the positions for a region. + + :param region_id: id of the region + :returns: a tuple with start and end positions + :raise ValueError: An unknown region id was given + """ r = self._positions.get(region_id) if r is None: raise ValueError(f"Unknown region: {region_id}") return r @staticmethod - def _read_stats(quali: np.ndarray) -> Tuple[int, int]: + def _read_stats(quali: np.ndarray) -> tuple[int, int]: bases_in_read = _count_elem_in_array_where_greater_than(quali, 2) q30_bases_in_read = _count_elem_in_array_where_greater_or_equal_than(quali, 30) return bases_in_read, q30_bases_in_read - def _umi_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _umi_stats(self, quali: np.ndarray) -> tuple[int, int]: # Q30 Bases in UMI umi_regions = self.design.get_regions_by_type(RegionType.UMI) umi_positions = [self.get_position(r.region_id) for r in umi_regions] @@ -109,7 +123,7 @@ def _umi_stats(self, quali: np.ndarray) -> Tuple[int, int]: return bases_in_umi, q30_bases_in_umi - def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> Tuple[int, int]: + def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> tuple[int, int]: upia_pos = self.get_position(pos) slice_obj = slice(*upia_pos) quali_subset = quali[slice_obj] @@ -117,26 +131,29 @@ def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> Tuple[int, in q30 = _count_elem_in_array_where_greater_or_equal_than(quali_subset, 30) return bases, q30 - def _upia_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _upia_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "upi-a") - def _upib_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _upib_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "upi-b") - def _pbs1_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _pbs1_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "pbs-1") - def _pbs2_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _pbs2_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "pbs-2") - def _bc_stats(self, quali: np.ndarray) -> Tuple[int, int]: + def _bc_stats(self, quali: np.ndarray) -> tuple[int, int]: return self._get_stats_from_position(quali, "bc") + @property + def read_count(self) -> int: + """Return the number of reads processed.""" + return self._counter["read_count"] + @property def stats(self) -> SequenceQualityStats: - """ - Return the accumulated statistics as a SequenceQualityStats object. - """ + """Return the accumulated statistics as a SequenceQualityStats object.""" fraction_q30_upia = ( self._counter["q30_bases_in_upia"] / self._counter["bases_in_upia"] ) @@ -170,9 +187,7 @@ def stats(self) -> SequenceQualityStats: ) def update(self, qualities: Union[str, np.ndarray]) -> None: - """ - Update the statistics with the given read qualities. - """ + """Update the statistics with the given read qualities.""" # Use numpy for vectorized operations # Reinterpret cast to integers (same as ord) if isinstance(qualities, str): @@ -189,6 +204,7 @@ def update(self, qualities: Union[str, np.ndarray]) -> None: bases_in_bc, q30_bases_in_bc = self._bc_stats(quali) self._counter.update( + read_count=1, bases_in_read=bases_in_read, q30_bases_in_read=q30_bases_in_read, bases_in_umi=bases_in_umi, diff --git a/src/pixelator/cli/adapterqc.py b/src/pixelator/cli/adapterqc.py index b7fbcfd2..d332cb38 100644 --- a/src/pixelator/cli/adapterqc.py +++ b/src/pixelator/cli/adapterqc.py @@ -10,9 +10,9 @@ from pixelator.cli.common import design_option, logger, output_option from pixelator.qc import adapter_qc_fastq from pixelator.utils import ( - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -81,11 +81,10 @@ def adapterqc( adapterqc_output = create_output_stage_dir(output, "adapterqc") # run cutadapt (adapter mode) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in input_files: msg = f"Processing {fastq_file} with cutadapt (adapter mode)" - click_echo(msg, multiline=False) logger.info(msg) clean_name = get_sample_name(fastq_file) diff --git a/src/pixelator/cli/amplicon.py b/src/pixelator/cli/amplicon.py index 76d28f64..bf34482a 100644 --- a/src/pixelator/cli/amplicon.py +++ b/src/pixelator/cli/amplicon.py @@ -8,12 +8,16 @@ import click -from pixelator.cli.common import design_option, logger, output_option from pixelator.amplicon import amplicon_fastq +from pixelator.cli.common import ( + design_option, + logger, + output_option, +) from pixelator.utils import ( - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, group_input_reads, log_step_start, sanity_check_inputs, @@ -66,6 +70,7 @@ def amplicon( Process diverse raw pixel data (FASTQ) formats into common amplicon """ # log input parameters + log_step_start( "amplicon", input_files=input_files, @@ -87,7 +92,7 @@ def amplicon( ) # run amplicon using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for k, v in grouped_sorted_inputs.items(): extension = get_extension(v[0]) @@ -101,12 +106,11 @@ def amplicon( ) if len(v) > 2: - msg = "Found more files than needed for concatenating fastq files" + msg = "Found more files than needed for creating an amplicon" logger.error(msg) raise RuntimeError(msg) - msg = f"Concatenating {','.join(str(p) for p in v)}" - click_echo(msg, multiline=False) + msg = f"Creating amplicon for {','.join(str(p) for p in v)}" logger.info(msg) jobs.append( diff --git a/src/pixelator/cli/analysis.py b/src/pixelator/cli/analysis.py index e677086a..4eaf84ec 100644 --- a/src/pixelator/cli/analysis.py +++ b/src/pixelator/cli/analysis.py @@ -13,8 +13,8 @@ from pixelator.analysis.colocalization.types import TransformationTypes from pixelator.cli.common import logger, output_option from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -164,20 +164,16 @@ def analysis( # sanity check on the anlyses if not any([compute_polarization, compute_colocalization]): - msg = "All the analysis are disabled, no scores will be computed" - click_echo(msg) - logger.warning(msg) + logger.warning("All the analysis are disabled, no scores will be computed") # create output folder if it does not exist analysis_output = create_output_stage_dir(output, "analysis") # compute graph/clusters using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for zip_file in input_files: - msg = f"Computing analysis for file {zip_file}" - click_echo(msg) - logger.info(msg) + logger.info(f"Computing analysis for file {zip_file}") clean_name = get_sample_name(zip_file) metrics_file = analysis_output / f"{clean_name}.report.json" diff --git a/src/pixelator/cli/annotate.py b/src/pixelator/cli/annotate.py index f2a82ab9..7919606e 100644 --- a/src/pixelator/cli/annotate.py +++ b/src/pixelator/cli/annotate.py @@ -11,8 +11,8 @@ from pixelator.cli.common import logger, output_option from pixelator.config import config, load_antibody_panel from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -120,13 +120,14 @@ def annotate( # warn if both --dynamic-filter and hard-coded sizes are input if min_size is not None and dynamic_filter in ["min", "both"]: - msg = "--dynamic-filter will overrule the value introduced in --min-size" - click_echo(msg, multiline=False) - logger.warning(msg) + logger.warning( + "--dynamic-filter will overrule the value introduced in --min-size" + ) + if max_size is not None and dynamic_filter in ["max", "both"]: - msg = "--dynamic-filter will overrule the value introduced in --max-size" - click_echo(msg, multiline=False) - logger.warning(msg) + logger.warning( + "--dynamic-filter will overrule the value introduced in --max-size" + ) # create output folder if it does not exist annotate_output = create_output_stage_dir(output, "annotate") @@ -135,12 +136,10 @@ def annotate( panel = load_antibody_panel(config, panel) # compute graph/components using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for ann_file in input_files: - msg = f"Computing annotation for file {ann_file}" - click_echo(msg, multiline=False) - logger.info(msg) + logger.info(f"Computing annotation for file {ann_file}") clean_name = get_sample_name(ann_file) metrics_file = annotate_output / f"{clean_name}.report.json" diff --git a/src/pixelator/cli/collapse.py b/src/pixelator/cli/collapse.py index b41d85d9..ee1b8dde 100644 --- a/src/pixelator/cli/collapse.py +++ b/src/pixelator/cli/collapse.py @@ -14,8 +14,8 @@ from pixelator.collapse import collapse_fastq from pixelator.config import config, get_position_in_parent, load_antibody_panel from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -165,11 +165,11 @@ def collapse( if umib_start is None and umib_end is not None: click.ClickException("You must specify both the start and end position in UMIB") if umia_start is None and umia_end is None: - click_echo("UMIA will be ignored in the collapsing", multiline=False) + logger.info("UMIA will be ignored in the collapsing") elif umia_end <= umia_start: click.ClickException("UMIA end or start positions seems to be incorrect") if umib_start is None and umib_end is None: - click_echo("UMIB will be ignored in the collapsing", multiline=False) + logger.info("UMIB will be ignored in the collapsing") elif umib_end <= umib_start: click.ClickException("UMIB end or start positions seems to be incorrect") @@ -198,7 +198,7 @@ def collapse( ) # run cutadapt (demux mode) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in files: # get the marker from the file name diff --git a/src/pixelator/cli/common.py b/src/pixelator/cli/common.py index 6d0dc105..f5abc58c 100644 --- a/src/pixelator/cli/common.py +++ b/src/pixelator/cli/common.py @@ -3,80 +3,25 @@ Copyright (c) 2022 Pixelgen Technologies AB. """ -import traceback import collections import functools -import logging import logging.handlers -import os from pathlib import Path from typing import Dict, Mapping, Optional -import sys -import click -import warnings - -from numba.core.errors import NumbaDeprecationWarning +import click BASE_DIR = str(Path(__file__).parent) - -pixelator_root_logger = logging.getLogger("pixelator") logger = logging.getLogger("pixelator.cli") -# Silence deprecation warnings -warnings.simplefilter("ignore", category=NumbaDeprecationWarning) -warnings.filterwarnings( - "ignore", - message="geopandas not available. Some functionality will be disabled.", - category=UserWarning, -) - - -def init_logger(log_file: str, verbose: bool) -> None: - """ - Helper function to create and initialize a logging object - with the arguments given - - :param log_file: the path to the log file - :param verbose: True to enable verbose mode (DEBUG) - :returns: None - """ - mode = "a" if os.path.isfile(log_file) else "w" - handler = logging.handlers.WatchedFileHandler(log_file, mode=mode) - formatter = logging.Formatter("%(asctime)s %(levelname)s:%(name)s:%(message)s") - handler.setFormatter(formatter) - pixelator_root_logger.addHandler(handler) - pixelator_root_logger.setLevel(logging.DEBUG if verbose else logging.INFO) - - # Disable matplot lib debug logs (that will clog all debug logs) - logging.getLogger("matplotlib.font_manager").setLevel(logging.ERROR) - logging.getLogger("matplotlib.ticker").setLevel(logging.ERROR) - logging.getLogger("numba").setLevel(logging.ERROR) - - def handle_unhandled_exception(exc_type, exc_value, exc_traceback): - if issubclass(exc_type, KeyboardInterrupt): - # Will call default excepthook - sys.__excepthook__(exc_type, exc_value, exc_traceback) - return - - message = traceback.print_exception(exc_type, exc_value, exc_traceback) - click.echo(message) - - # Create a critical level log message with info from the except hook. - pixelator_root_logger.critical( - "Unhandled exception", exc_info=(exc_type, exc_value, exc_traceback) - ) - - # Assign the excepthook to the handler - sys.excepthook = handle_unhandled_exception - - # code snipped obtained from # https://stackoverflow.com/questions/47972638/how-can-i-define-the-order-of-click-sub-commands-in-help # the purpose is to order subcommands in order of addition class OrderedGroup(click.Group): - def __init__( + """Custom click.Group that keeps insertion order for subcommands.""" + + def __init__( # noqa: D107 self, name: Optional[str] = None, commands: Optional[Dict[str, click.Command]] = None, @@ -88,10 +33,13 @@ def __init__( def list_commands( # type: ignore self, ctx: click.Context ) -> Mapping[str, click.Command]: + """Return a list of subcommands.""" return self.commands def output_option(func): + """Wrap a Click entrypoint to add the --output option.""" + @click.option( "--output", required=True, @@ -109,6 +57,7 @@ def wrapper(*args, **kwargs): def design_option(func): + """Decorate a click command and add the --design option.""" from pixelator.config import config assay_options = list(config.assays.keys()) diff --git a/src/pixelator/cli/demux.py b/src/pixelator/cli/demux.py index b78c9807..fce50bda 100644 --- a/src/pixelator/cli/demux.py +++ b/src/pixelator/cli/demux.py @@ -8,15 +8,19 @@ import click -from pixelator.cli.common import design_option, logger, output_option +from pixelator.cli.common import ( + design_option, + logger, + output_option, +) from pixelator.config import config from pixelator.config.panel import load_antibody_panel from pixelator.demux import demux_fastq from pixelator.utils import ( build_barcodes_file, - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -145,12 +149,10 @@ def demux( ) # run cutadapt (demux mode) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in input_files: - msg = f"Processing {fastq_file} with cutadapt (demux mode)" - click_echo(msg, multiline=False) - logger.info(msg) + logger.info(f"Processing {fastq_file} with cutadapt (demux mode)") name = get_sample_name(fastq_file) extension = get_extension(fastq_file) diff --git a/src/pixelator/cli/graph.py b/src/pixelator/cli/graph.py index 3ee50fb7..fd613f9b 100644 --- a/src/pixelator/cli/graph.py +++ b/src/pixelator/cli/graph.py @@ -10,8 +10,8 @@ from pixelator.cli.common import logger, output_option from pixelator.graph import connect_components from pixelator.utils import ( - click_echo, create_output_stage_dir, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -90,12 +90,10 @@ def graph( graph_output = create_output_stage_dir(output, "graph") # compute graph/components using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for pixelsf in input_files: - msg = f"Computing clusters for file {pixelsf}" - click_echo(msg) - logger.info(msg) + logger.info(f"Computing clusters for file {pixelsf}") clean_name = get_sample_name(pixelsf) metrics_file = graph_output / f"{clean_name}.report.json" diff --git a/src/pixelator/cli/logging.py b/src/pixelator/cli/logging.py new file mode 100644 index 00000000..17a11680 --- /dev/null +++ b/src/pixelator/cli/logging.py @@ -0,0 +1,274 @@ +"""Logging setup for pixelator. + +Copyright (c) 2022 Pixelgen Technologies AB. +""" +import atexit +import functools +import logging +import logging.handlers +import multiprocessing +import sys +import time +import typing +import warnings +from pathlib import Path +from typing import Callable + +import click +from numba import NumbaDeprecationWarning + +from pixelator.types import PathType + + +# Silence deprecation warnings +warnings.simplefilter("ignore", category=NumbaDeprecationWarning) +warnings.filterwarnings( + "ignore", + message="geopandas not available. Some functionality will be disabled.", + category=UserWarning, +) + + +# Disable matplot lib debug logs (that will clog all debug logs) +logging.getLogger("matplotlib.font_manager").setLevel(logging.ERROR) +logging.getLogger("matplotlib.ticker").setLevel(logging.ERROR) +logging.getLogger("numba").setLevel(logging.ERROR) + + +# ------------------------------------------------------------ +# Click logging +# ------------------------------------------------------------ + + +class StyleDict(typing.TypedDict): + """Style dictionary for kwargs to `click.style`.""" + + fg: str + + +class ColorFormatter(logging.Formatter): + """Click formatter with colored levels""" + + colors: dict[str, StyleDict] = { + "debug": StyleDict(fg="blue"), + "info": StyleDict(fg="green"), + "warning": StyleDict(fg="yellow"), + "error": StyleDict(fg="red"), + "exception": StyleDict(fg="red"), + "critical": StyleDict(fg="red"), + } + + def format(self, record: logging.LogRecord) -> str: + """Format a record with colored level. + + :param record: The record to format. + :returns str: A formatted log record. + """ + if not record.exc_info: + level = record.levelname.lower() + msg = record.getMessage() + if level in self.colors: + timestamp = self.formatTime(record, self.datefmt) + colored_level = click.style( + f"{level.upper():<10}", **self.colors[level] + ) + prefix = f"{timestamp} [{colored_level}] " + msg = "\n".join(prefix + x for x in msg.splitlines()) + return msg + return logging.Formatter.format(self, record) + + +class DefaultCliFormatter(logging.Formatter): + """Click formatter with colored levels""" + + def format(self, record: logging.LogRecord) -> str: + """Format a record for CLI output.""" + if not record.exc_info: + level = record.levelname.lower() + msg = record.getMessage() + + if level == "info": + return msg + + return f"{level.upper()}: {msg}" + return logging.Formatter.format(self, record) + + +class ClickHandler(logging.Handler): + """Click logging handler. + + Messages are forwarded to stdout using `click.echo`. + """ + + def __init__(self, level: int = 0, use_stderr: bool = True): + """Initialize the click handler. + + :param level: The logging level. + :param use_stderr: Log to sys.stderr instead of sys.stdout. + """ + super().__init__(level=level) + self._use_stderr = use_stderr + + def emit(self, record: logging.LogRecord) -> None: + """Do whatever it takes to actually log the specified logging record. + + :param record: The record to log. + """ + try: + msg = self.format(record) + click.echo(msg, err=self._use_stderr) + except Exception: + self.handleError(record) + + +class LoggingSetup: + """Logging setup for multiprocessing. + + This class is used to set up logging for multiprocessing. + All messages are passed to a separate process that handles the logging to a file. + """ + + def __init__(self, log_file: PathType | None, verbose: bool, logger=None): + """Initialize the logging setup. + + :param log_file: the filename of the log output + :param verbose: enable verbose logging and console output + :param logger: the logger to configure, default is the root logger + """ + self.log_file = Path(log_file) if log_file is not None else None + self.verbose = verbose + self._root_logger = logger or logging.getLogger() + self._queue: multiprocessing.Queue = multiprocessing.Queue(-1) + + self._listener_process = multiprocessing.Process( + name="log-process", + target=self._listener_process_main, + args=( + self._queue, + functools.partial( + self._listener_logging_setup, self.log_file, self.verbose + ), + ), + ) + + def _shutdown_listener(self, timeout: int = 10) -> None: + """Send the stop token to the logging process and wait for it to finish. + + :param timeout: The number of seconds to wait for the process to finish. + """ + self._queue.put(None) + + start = time.time() + while time.time() - start <= timeout: + if self._listener_process.is_alive(): + time.sleep(0.1) + else: + # we cannot join a process that is not started + if self._listener_process.ident is not None: + self._listener_process.join() + break + else: + # We only enter this if we didn't 'break' above during the while loop! + self._listener_process.terminate() + + def initialize(self): + """Configure logging and start the listener process.""" + # We do not need the listener process if there is no file to log to + if self.log_file: + self._listener_process.start() + + handlers = [] + handlers.append(logging.handlers.QueueHandler(self._queue)) + self._root_logger.setLevel(logging.DEBUG if self.verbose else logging.INFO) + + console_handler = ClickHandler() + if self.verbose: + console_handler.setFormatter(ColorFormatter(datefmt="%Y-%m-%d %H:%M:%S")) + else: + console_handler.setFormatter(DefaultCliFormatter()) + + handlers.append(console_handler) + self._root_logger.handlers = handlers + atexit.register(self._shutdown_listener) + + def __enter__(self): + """Enter the context manager. + + This will initialize the logging setup. + """ + self.initialize() + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the context manager. + + This will shut down the logging process if needed. + """ + self._shutdown_listener() + atexit.unregister(self._shutdown_listener) + # Reraise exception higher up the stack + return False + + @staticmethod + def _listener_logging_setup(log_file: Path | None, verbose: bool): + """Initialize the logging in the listener process.""" + logger = logging.getLogger() + + if log_file: + handler = logging.FileHandler(str(log_file), mode="w") + formatter = logging.Formatter( + "%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG if verbose else logging.INFO) + + @staticmethod + def _listener_process_main( + queue: multiprocessing.Queue, configure: Callable[[], None] + ): + """Entrypoint for the logging process. + + :param queue: the queue to read log messages from + :param configure: the function to call to configure the logging + :raises Exception: if an unexpected error occurs + when handling log messages + """ + configure() + + logging.info("Logging process started.") + while True: + try: + record = queue.get() + if ( + record is None + ): # We send this as a sentinel to tell the listener to quit. + break + logger = logging.getLogger(record.name) + logger.handle(record) # No level or filter logic applied - just do it! + except Exception: + import traceback + + print("An unexpected error occurred in the log handler") + print(traceback.format_exc()) + raise + + logging.info("Logging process stopped.") + + +def handle_unhandled_exception(exc_type, exc_value, exc_traceback): + """Handle "unhandled" exceptions.""" + pixelator_root_logger = logging.getLogger("pixelator") + + if issubclass(exc_type, KeyboardInterrupt): + # Will call default excepthook + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + # Create a critical level log message with info from the except hook. + pixelator_root_logger.critical( + "Unhandled exception", exc_info=(exc_type, exc_value, exc_traceback) + ) + + +# Assign the excepthook to the handler +sys.excepthook = handle_unhandled_exception diff --git a/src/pixelator/cli/main.py b/src/pixelator/cli/main.py index 0aa4726c..37b1f98e 100644 --- a/src/pixelator/cli/main.py +++ b/src/pixelator/cli/main.py @@ -15,9 +15,10 @@ from pixelator.cli.analysis import analysis from pixelator.cli.annotate import annotate from pixelator.cli.collapse import collapse -from pixelator.cli.common import OrderedGroup, init_logger +from pixelator.cli.common import OrderedGroup, logger from pixelator.cli.demux import demux from pixelator.cli.graph import graph +from pixelator.cli.logging import LoggingSetup from pixelator.cli.misc import list_single_cell_designs, list_single_cell_panels from pixelator.cli.plugin import add_cli_plugins from pixelator.cli.preqc import preqc @@ -57,20 +58,23 @@ help="The number of cpu cores to use for parallel processing", ) @click.pass_context -def main_cli(ctx, verbose, profile, log_file, cores): +def main_cli(ctx, verbose: bool, profile: bool, log_file: str, cores: int): """Run the main CLI entrypoint for pixelator.""" # early out if run in help mode if any(x in sys.argv for x in ["--help", "--version"]): return 0 + if verbose: + logger.info("Running in VERBOSE mode") + # activate profiling mode if profile: - click_echo("Running in profiling mode") + logger.info("Running in profiling mode") yappi.start() def exit(): yappi.stop() - click_echo("Profiling completed") + logger.info("Profiling completed") processes = yappi.get_thread_stats() # Make sure to get profile metrics for each thread for p in processes: @@ -79,18 +83,14 @@ def exit(): atexit.register(exit) - if verbose: - click_echo("Running in VERBOSE mode") - - if log_file is not None: - init_logger(log_file, verbose) - # Pass arguments to other commands ctx.ensure_object(dict) + ctx.obj["LOGGER"] = LoggingSetup(log_file, verbose=verbose) ctx.obj["VERBOSE"] = verbose ctx.obj["CORES"] = max(1, cores) + ctx.obj["LOGGER"].initialize() return 0 @@ -117,7 +117,6 @@ def exit(): ) def single_cell(): """Build the click group for single-cell commands.""" - pass # Add single-cell top level command to cli diff --git a/src/pixelator/cli/preqc.py b/src/pixelator/cli/preqc.py index c9380dbb..43cfaf3e 100644 --- a/src/pixelator/cli/preqc.py +++ b/src/pixelator/cli/preqc.py @@ -16,9 +16,9 @@ from pixelator.config import config from pixelator.qc import qc_fastq from pixelator.utils import ( - click_echo, create_output_stage_dir, get_extension, + get_process_pool_executor, get_sample_name, log_step_start, sanity_check_inputs, @@ -170,12 +170,10 @@ def preqc( preqc_output = create_output_stage_dir(output, "preqc") # run fastq (pre QC and filtering) using parallel processing - with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor: + with get_process_pool_executor(ctx) as executor: jobs = [] for fastq_file in input_files: - msg = f"Processing {fastq_file} with fastp" - click_echo(msg, multiline=False) - logger.info(msg) + logger.info(f"Processing {fastq_file} with fastp") clean_name = get_sample_name(fastq_file) extension = get_extension(fastq_file) diff --git a/src/pixelator/cli/report.py b/src/pixelator/cli/report.py index 63e1a9c0..ce04662d 100644 --- a/src/pixelator/cli/report.py +++ b/src/pixelator/cli/report.py @@ -11,7 +11,7 @@ from pixelator.cli.common import output_option from pixelator.report import make_report -from pixelator.utils import click_echo, create_output_stage_dir, log_step_start, timer +from pixelator.utils import create_output_stage_dir, log_step_start, timer logger = logging.getLogger(__name__) @@ -86,7 +86,7 @@ def report( ) # create html reports - click_echo(f"Creating report for data present in {input_folder}") + logger.info(f"Creating report for data present in {input_folder}") make_report( input_path=input_folder, diff --git a/src/pixelator/collapse/process.py b/src/pixelator/collapse/process.py index 8b683cb4..ed8697dd 100644 --- a/src/pixelator/collapse/process.py +++ b/src/pixelator/collapse/process.py @@ -34,7 +34,7 @@ from pixelator.types import PathType from pixelator.utils import gz_size -logger = logging.getLogger(__name__) +logger = logging.getLogger("pixelator.collapse") np.random.seed(SEED) @@ -111,7 +111,7 @@ def build_binary_data(seqs: list[str]) -> npt.NDArray[np.uint8]: return data -def get_collapsed_fragments_for_component( +def get_collapsed_fragments_for_component( # noqa: DOC402,DOC404 components: list[set[UniqueFragment]], counts: dict[UniqueFragment, int] ) -> Generator[CollapsedFragment, None, None]: """Take the representative sequence from a component based on its counts. @@ -411,15 +411,6 @@ def filter_by_minimum_upib_count( """ unique_reads = {k: v for k, v in unique_reads.items() if len(v) >= min_count} # in case there are no reads after filtering - if not unique_reads: - logger.warning( - ( - "The input file %s does not any contain" - "reads after filtering by count >= %i" - ), - input, - min_count, - ) return unique_reads @@ -658,6 +649,14 @@ def collapse_fastq( if min_count and min_count > 1: unique_reads = filter_by_minimum_upib_count(unique_reads, min_count) if not unique_reads: + logger.warning( + ( + "The input file %s does not any contain" + "reads after filtering by count >= %i" + ), + input_file, + min_count, + ) return None if algorithm == "adjacency": diff --git a/src/pixelator/types.py b/src/pixelator/types.py index 3be37584..8d983162 100644 --- a/src/pixelator/types.py +++ b/src/pixelator/types.py @@ -1,5 +1,4 @@ -""" -This module contains helper typehints for the pixelator package. +"""Helper typehints for the pixelator package. Copyright (c) 2023 Pixelgen Technologies AB. """ diff --git a/src/pixelator/utils.py b/src/pixelator/utils.py index 441b7470..f514100a 100644 --- a/src/pixelator/utils.py +++ b/src/pixelator/utils.py @@ -1,5 +1,4 @@ -""" -Common functions and utilities for Pixelator +"""Common functions and utilities for Pixelator. Copyright (c) 2022 Pixelgen Technologies AB. """ @@ -13,6 +12,8 @@ import tempfile import textwrap import time +import typing +from concurrent import futures from functools import wraps from pathlib import Path, PurePath from typing import Any, Dict, List, Optional, Sequence, Set, TYPE_CHECKING, Union @@ -37,9 +38,9 @@ def build_barcodes_file( panel: AntibodyPanel, anchored: bool, rev_complement: bool ) -> str: - """ - Utility function to create a FASTA file of barcodes from a - panel dataframe. The FASTA file will have the marker id as + """Create a FASTA file of barcodes from a panel dataframe. + + The FASTA file will have the marker id as name and the barcode sequence as sequence. The parameter rev_complement control if sequence needs to be in reverse complement form or not. When anchored is true a dollar sign @@ -71,9 +72,7 @@ def build_barcodes_file( def click_echo(msg: str, multiline: bool = False): - """ - Helper function that print a line to the console - with long-line wrapping. + """Print a line to the console with optional long-line wrapping. :param msg: the message to print :param multiline: True to use text wrapping or False otherwise (default) @@ -85,10 +84,10 @@ def click_echo(msg: str, multiline: bool = False): def create_output_stage_dir(root: PathType, name: str) -> Path: - """ - Create a new subfolder with `name` under the given `root` directory. + """Create a new subfolder with `name` under the given `root` directory. - :param root: the root directory + :param root: the parent directory + :param name: the name of the directory to create :returns: the created folder (Path) """ output = Path(root) / name @@ -98,8 +97,7 @@ def create_output_stage_dir(root: PathType, name: str) -> Path: def flatten(list_of_collections: List[Union[List, Set]]) -> List: - """ - Flattens a list of lists or list of sets. + """Flattens a list of lists or list of sets. :param list_of_collections: list of lists or list of sets :returns: list containing flattened items @@ -108,19 +106,19 @@ def flatten(list_of_collections: List[Union[List, Set]]) -> List: def get_extension(filename: PathType, len_ext: int = 2) -> str: - """ - Utility function to extract file extensions. + """Extract file extensions from a filename. :param filename: the file name - :param len: the extension length + :param len_ext: the number of expected extensions parts + e.g.: fq.gz gives len_ext=2 :returns: the file extension (str) """ return "".join(PurePath(filename).suffixes[-len_ext:]).lstrip(".") def get_sample_name(filename: PathType) -> str: - """ - Extract the sample name from a sample's filename. + """Extract the sample name from a sample's filename. + The sample name is expected to be from the start of the filename until the first dot. @@ -133,8 +131,7 @@ def get_sample_name(filename: PathType) -> str: def group_input_reads( inputs: Sequence[PathType], input1_pattern: str, input2_pattern: str ) -> Dict[str, List[Path]]: - """ - Group input files by read pairs and sample id + """Group input files by read pairs and sample id. :param inputs: list of input files :param input1_pattern: pattern to match read1 files @@ -181,10 +178,9 @@ def group_fn(s): def gz_size(filename: str) -> int: - """ - Extract the size of a gzip compressed file. + """Extract the size of a gzip compressed file. - :param fname: file name + :param filename: file name :returns: size of the file uncompressed (in bits) """ with gzip.open(filename, "rb") as f: @@ -197,15 +193,13 @@ def log_step_start( output: Optional[str] = None, **kwargs, ) -> None: - """ - Utility function to add information about the start of a - pixelator step to the logs + """Add information about the start of a pixelator step to the logs. :param step_name: name of the step that is starting - :param input_files: optional collection of input file paths + :param input_files: collection of input file paths :param output: optional path to output - :param kwargs: any additional parameters that you wish to log - :returns: None + :param **kwargs: any additional parameters that you wish to log + :rtype: None """ logger.info("Start pixelator %s %s", step_name, __version__) @@ -221,19 +215,13 @@ def log_step_start( def np_encoder(object: Any): - """ - A very simple encoder to allow JSON serialization - of numpy data types - """ + """Encoder for JSON serialization of numpy data types.""" # noqa: D401 if isinstance(object, np.generic): return object.item() def remove_csv_whitespaces(df: pd.DataFrame) -> None: - """ - Utility function to remove leading and trailing - blank spaces from csv files slurped by pandas - """ + """Remove leading and trailing blank spaces from csv files slurped by pandas.""" # fill NaNs as empty strings to be able to do `.str` df.fillna("", inplace=True) df.columns = df.columns.str.strip() @@ -242,11 +230,11 @@ def remove_csv_whitespaces(df: pd.DataFrame) -> None: def reverse_complement(seq: str) -> str: - """ - Helper function to compute the reverse complement of a DNA seq + """Compute the reverse complement of a DNA seq. :param seq: the DNA sequence - :returns: the reverse complement of the input sequence + :return: the reverse complement of the input sequence + :rtype: str """ return seq.translate(_TRTABLE)[::-1] @@ -255,14 +243,13 @@ def sanity_check_inputs( input_files: Sequence[PathType], allowed_extensions: Union[Sequence[str], Optional[str]] = None, ) -> None: - """ - Perform basic sanity checking of input files + """Perform basic sanity checking of input files. :param input_files: the files to sanity check :param allowed_extensions: the expected file extension of the files, e.g. 'fastq.gz' or a tuple of allowed types eg. ('fastq.gz', 'fq.gz') - :returns: None :raises AssertionError: when any of validation fails + :returns None: """ for input_file in input_files: input_file = Path(input_file) @@ -290,13 +277,14 @@ def sanity_check_inputs( ) -def single_value(xs: Union[List, Set]) -> Any: - """ - Extract the first value in a List or Set if the - collection has a single value. +T = typing.TypeVar("T") + + +def single_value(xs: Union[List[T], Set[T]]) -> T: + """Extract the first value in a List or Set if the collection has a single value. :param xs: a collection of values - :returns: the first value in the collection + :returns T: the first value in the collection :raises AssertionError: if the collection is empty or has more than one value """ if len(xs) == 0: @@ -307,9 +295,7 @@ def single_value(xs: Union[List, Set]) -> Any: def timer(func): - """ - Function decorator used to time the different steps - """ + """Time the different steps of a function.""" @wraps(func) def wrapper(*args, **kwds): @@ -324,14 +310,12 @@ def wrapper(*args, **kwds): def write_parameters_file( click_context: click.Context, output_file: Path, command_path: Optional[str] = None -): - """ - Write the parameters used in for a command to a JSON file +) -> None: + """Write the parameters used in for a command to a JSON file. :param click_context: the click context object :param output_file: the output file :param command_path: the command to use as command name - :returns: None """ command_path_fixed = command_path or click_context.command_path parameters = click_context.command.params @@ -361,3 +345,18 @@ def write_parameters_file( with open(output_file, "w") as fh: json.dump(data, fh, indent=4) + + +def get_process_pool_executor(ctx, **kwargs): + """Return a process pool with some default settings. + + The number of cores will be set to the number of cores available from the click ctx. + The multiprocess logger will be initialized in each worker process. + + :args ctx: click context object + :kwargs: additional kwargs to pass to the ProcessPoolExecutor constructor + """ + return futures.ProcessPoolExecutor( + max_workers=ctx.obj["CORES"], + **kwargs, + ) diff --git a/tests/amplicon/test_statistics.py b/tests/amplicon/test_statistics.py index 8784c29b..0fa1c1bc 100644 --- a/tests/amplicon/test_statistics.py +++ b/tests/amplicon/test_statistics.py @@ -22,6 +22,7 @@ def test_sequence_quality_stats_collector_update(): collector.update(rng.normal(30, 5, size=132)) collector.update(rng.normal(30, 5, size=132)) + assert collector.read_count == 2 assert collector.stats == SequenceQualityStats( fraction_q30_upia=0.42, fraction_q30_upib=0.46, diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index d7c3aeeb..da6fda6a 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -4,13 +4,14 @@ Copyright (c) 2022 Pixelgen Technologies AB. """ import logging +import tempfile +from concurrent.futures import ProcessPoolExecutor from gzip import BadGzipFile -from tempfile import NamedTemporaryFile import pytest from pixelator import __version__ -from pixelator.cli.common import init_logger +from pixelator.cli.logging import LoggingSetup from pixelator.utils import ( gz_size, log_step_start, @@ -96,11 +97,52 @@ def my_func(): def test_verbose_logging_is_activated(): - with NamedTemporaryFile() as test_log_file: - init_logger(log_file=test_log_file.name, verbose=True) - - root_logger = logging.getLogger("pixelator") + test_log_file = tempfile.NamedTemporaryFile() + with LoggingSetup(test_log_file.name, verbose=True): + root_logger = logging.getLogger() assert root_logger.getEffectiveLevel() == logging.DEBUG - init_logger(log_file=test_log_file.name, verbose=False) + +def test_verbose_logging_is_deactivated(): + test_log_file = tempfile.NamedTemporaryFile() + with LoggingSetup(test_log_file.name, verbose=False): + root_logger = logging.getLogger() assert root_logger.getEffectiveLevel() == logging.INFO + + +def helper_log_fn(args): + import logging + + root_logger = logging.getLogger() + root_logger.log(*args) + + +@pytest.mark.parametrize("verbose", [True, False]) +def test_multiprocess_logging(verbose): + """Test that logging works in a multiprocess environment.""" + test_log_file = tempfile.NamedTemporaryFile() + + with LoggingSetup(test_log_file.name, verbose=verbose): + tasks = [ + (logging.DEBUG, "This is a debug message"), + (logging.INFO, "This is an info message"), + (logging.WARNING, "This is a warning message"), + (logging.ERROR, "This is an error message"), + (logging.CRITICAL, "This is a critical message"), + ] + + with ProcessPoolExecutor(max_workers=4) as executor: + for r in executor.map(helper_log_fn, tasks): + pass + + # Test that the console output has the expected messages + with open(test_log_file.name, "r") as f: + log_content = f.read() + + if verbose: + assert "This is a debug message" in log_content + + assert "This is an info message" in log_content + assert "This is a warning message" in log_content + assert "This is an error message" in log_content + assert "This is a critical message" in log_content