Skip to content

Commit

Permalink
Merge pull request #71 from PixelgenTechnologies/fix-multiprocess-log…
Browse files Browse the repository at this point in the history
…ging

Fix multiprocess logging
  • Loading branch information
fbdtemme authored Jan 30, 2024
2 parents 8ec5317 + 208bccb commit e3d895c
Show file tree
Hide file tree
Showing 21 changed files with 517 additions and 226 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[flake8]

extend-ignore = E501,E402,W503,E203,D213,D203,DOC301,DOC502
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,cue.mod
docstring-convention = all
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Change name and description of `Avg. Reads per Cell` and `Avg. Reads Usable per Cell` in QC report.
* Fix a bug in how discarded UMIs are calculated and reported.
* Fix deflated counts in the edgelist after collapse.
* Improved console output in verbose mode.
* Improved logging from multiprocessing jobs.

## [0.16.1] - 2024-01-12

Expand Down
38 changes: 26 additions & 12 deletions src/pixelator/amplicon/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,13 @@ def amplicon_fastq(
mode = "single-end" if len(inputs) == 1 else "paired-end"
stats = SequenceQualityStatsCollector(design)

start1_log_msg = "Starting the concatenation of %s to %s"
start2_log_msg = "Starting the concatenation of %s and %s to %s"
end1_log_msg = "Finished the concatenation of %s to %s"
end2_log_msg = "Finished the concatenation of %s and %s to %s"

amplicon = assay.get_region_by_id("amplicon")
if amplicon is None:
raise RuntimeError("Design does not have a region with id: amplicon")

# Single end mode
if mode == "single-end":
logger.debug(start1_log_msg, inputs[0], output)
logger.debug("Building amplicon of %s to %s", inputs[0], output)

with xopen(output, "wb") as f:
for record in pyfastx.Fastq(str(inputs[0]), build_index=False):
Expand All @@ -170,17 +165,31 @@ def amplicon_fastq(
stats.update(new_qual)

if mode == "paired-end":
logger.debug(start2_log_msg, inputs[0], inputs[1], output)
logger.debug(
"Building amplicon of %s and %s to %s", inputs[0], inputs[1], output
)

with xopen(output, "wb") as f:
for record1, record2 in zip(
pyfastx.Fastq(str(inputs[0]), build_index=False),
pyfastx.Fastq(str(inputs[1]), build_index=False),
for idx, (record1, record2) in enumerate(
zip(
pyfastx.Fastq(str(inputs[0]), build_index=False),
pyfastx.Fastq(str(inputs[1]), build_index=False),
)
):
if idx % 100000 == 0:
logger.debug(
"Generating amplicon for %s: %s reads processed",
str(output),
str(idx),
)

name, new_seq, new_qual = generate_amplicon(record1, record2, amplicon)
write_record(f, name, new_seq, new_qual)
stats.update(new_qual)

logger.info(
"Generating amplicon for %s: %s reads processed", str(output), stats.read_count
)
# add metrics to JSON file
avg_stats = stats.stats

Expand All @@ -189,6 +198,11 @@ def amplicon_fastq(
json.dump(data, json_file, sort_keys=True, indent=4)

if mode == "single-end":
logger.debug(end1_log_msg, inputs[0], output)
logger.debug("Finished building amplicon of %s to %s", inputs[0], output)
else:
logger.debug(end2_log_msg, inputs[0], inputs[1], output)
logger.debug(
"Finished building amplicon of %s and %s to %s",
inputs[0],
inputs[1],
output,
)
54 changes: 35 additions & 19 deletions src/pixelator/amplicon/statistics.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""
"""Collect statistics for amplicon.
Copyright (c) 2023 Pixelgen Technologies AB.
"""
import collections
import dataclasses
from functools import cache
from typing import Any, Dict, Tuple, Union
from typing import Any, Union

import numpy as np

Expand All @@ -31,8 +32,10 @@ def _count_elem_in_array_where_greater_or_equal_than(arr, value):
return result


@dataclasses.dataclass(frozen=True)
@dataclasses.dataclass(frozen=True, slots=True)
class SequenceQualityStats:
"""Container for sequence quality statistics."""

fraction_q30_upia: float
fraction_q30_upib: float
fraction_q30_umi: float
Expand All @@ -41,14 +44,19 @@ class SequenceQualityStats:
fraction_q30_bc: float
fraction_q30: float

def asdict(self) -> Dict[str, Any]:
def asdict(self) -> dict[str, Any]:
"""Return a dictionary representation of this instance."""
return {k: v for k, v in dataclasses.asdict(self).items()}


class SequenceQualityStatsCollector:
"""Accumulate read quality statistics for a given design."""

def __init__(self, design_name: str):
"""Accumulate read quality statistics for a given design.
:param design_name: The name of the design of the reads for which to statistics.
"""
design = config.get_assay(design_name)

if design is None:
Expand Down Expand Up @@ -81,19 +89,25 @@ def __init__(self, design_name: str):
raise ValueError("Assay does not contain a UMI region")

@cache
def get_position(self, region_id: str) -> Tuple[int, int]:
def get_position(self, region_id: str) -> tuple[int, int]:
"""Return the positions for a region.
:param region_id: id of the region
:returns: a tuple with start and end positions
:raise ValueError: An unknown region id was given
"""
r = self._positions.get(region_id)
if r is None:
raise ValueError(f"Unknown region: {region_id}")
return r

@staticmethod
def _read_stats(quali: np.ndarray) -> Tuple[int, int]:
def _read_stats(quali: np.ndarray) -> tuple[int, int]:
bases_in_read = _count_elem_in_array_where_greater_than(quali, 2)
q30_bases_in_read = _count_elem_in_array_where_greater_or_equal_than(quali, 30)
return bases_in_read, q30_bases_in_read

def _umi_stats(self, quali: np.ndarray) -> Tuple[int, int]:
def _umi_stats(self, quali: np.ndarray) -> tuple[int, int]:
# Q30 Bases in UMI
umi_regions = self.design.get_regions_by_type(RegionType.UMI)
umi_positions = [self.get_position(r.region_id) for r in umi_regions]
Expand All @@ -109,34 +123,37 @@ def _umi_stats(self, quali: np.ndarray) -> Tuple[int, int]:

return bases_in_umi, q30_bases_in_umi

def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> Tuple[int, int]:
def _get_stats_from_position(self, quali: np.ndarray, pos: str) -> tuple[int, int]:
upia_pos = self.get_position(pos)
slice_obj = slice(*upia_pos)
quali_subset = quali[slice_obj]
bases = _count_elem_in_array_where_greater_than(quali_subset, 2)
q30 = _count_elem_in_array_where_greater_or_equal_than(quali_subset, 30)
return bases, q30

def _upia_stats(self, quali: np.ndarray) -> Tuple[int, int]:
def _upia_stats(self, quali: np.ndarray) -> tuple[int, int]:
return self._get_stats_from_position(quali, "upi-a")

def _upib_stats(self, quali: np.ndarray) -> Tuple[int, int]:
def _upib_stats(self, quali: np.ndarray) -> tuple[int, int]:
return self._get_stats_from_position(quali, "upi-b")

def _pbs1_stats(self, quali: np.ndarray) -> Tuple[int, int]:
def _pbs1_stats(self, quali: np.ndarray) -> tuple[int, int]:
return self._get_stats_from_position(quali, "pbs-1")

def _pbs2_stats(self, quali: np.ndarray) -> Tuple[int, int]:
def _pbs2_stats(self, quali: np.ndarray) -> tuple[int, int]:
return self._get_stats_from_position(quali, "pbs-2")

def _bc_stats(self, quali: np.ndarray) -> Tuple[int, int]:
def _bc_stats(self, quali: np.ndarray) -> tuple[int, int]:
return self._get_stats_from_position(quali, "bc")

@property
def read_count(self) -> int:
"""Return the number of reads processed."""
return self._counter["read_count"]

@property
def stats(self) -> SequenceQualityStats:
"""
Return the accumulated statistics as a SequenceQualityStats object.
"""
"""Return the accumulated statistics as a SequenceQualityStats object."""
fraction_q30_upia = (
self._counter["q30_bases_in_upia"] / self._counter["bases_in_upia"]
)
Expand Down Expand Up @@ -170,9 +187,7 @@ def stats(self) -> SequenceQualityStats:
)

def update(self, qualities: Union[str, np.ndarray]) -> None:
"""
Update the statistics with the given read qualities.
"""
"""Update the statistics with the given read qualities."""
# Use numpy for vectorized operations
# Reinterpret cast to integers (same as ord)
if isinstance(qualities, str):
Expand All @@ -189,6 +204,7 @@ def update(self, qualities: Union[str, np.ndarray]) -> None:
bases_in_bc, q30_bases_in_bc = self._bc_stats(quali)

self._counter.update(
read_count=1,
bases_in_read=bases_in_read,
q30_bases_in_read=q30_bases_in_read,
bases_in_umi=bases_in_umi,
Expand Down
5 changes: 2 additions & 3 deletions src/pixelator/cli/adapterqc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from pixelator.cli.common import design_option, logger, output_option
from pixelator.qc import adapter_qc_fastq
from pixelator.utils import (
click_echo,
create_output_stage_dir,
get_extension,
get_process_pool_executor,
get_sample_name,
log_step_start,
sanity_check_inputs,
Expand Down Expand Up @@ -81,11 +81,10 @@ def adapterqc(
adapterqc_output = create_output_stage_dir(output, "adapterqc")

# run cutadapt (adapter mode) using parallel processing
with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor:
with get_process_pool_executor(ctx) as executor:
jobs = []
for fastq_file in input_files:
msg = f"Processing {fastq_file} with cutadapt (adapter mode)"
click_echo(msg, multiline=False)
logger.info(msg)

clean_name = get_sample_name(fastq_file)
Expand Down
16 changes: 10 additions & 6 deletions src/pixelator/cli/amplicon.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@

import click

from pixelator.cli.common import design_option, logger, output_option
from pixelator.amplicon import amplicon_fastq
from pixelator.cli.common import (
design_option,
logger,
output_option,
)
from pixelator.utils import (
click_echo,
create_output_stage_dir,
get_extension,
get_process_pool_executor,
group_input_reads,
log_step_start,
sanity_check_inputs,
Expand Down Expand Up @@ -66,6 +70,7 @@ def amplicon(
Process diverse raw pixel data (FASTQ) formats into common amplicon
"""
# log input parameters

log_step_start(
"amplicon",
input_files=input_files,
Expand All @@ -87,7 +92,7 @@ def amplicon(
)

# run amplicon using parallel processing
with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor:
with get_process_pool_executor(ctx) as executor:
jobs = []
for k, v in grouped_sorted_inputs.items():
extension = get_extension(v[0])
Expand All @@ -101,12 +106,11 @@ def amplicon(
)

if len(v) > 2:
msg = "Found more files than needed for concatenating fastq files"
msg = "Found more files than needed for creating an amplicon"
logger.error(msg)
raise RuntimeError(msg)

msg = f"Concatenating {','.join(str(p) for p in v)}"
click_echo(msg, multiline=False)
msg = f"Creating amplicon for {','.join(str(p) for p in v)}"
logger.info(msg)

jobs.append(
Expand Down
12 changes: 4 additions & 8 deletions src/pixelator/cli/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
from pixelator.analysis.colocalization.types import TransformationTypes
from pixelator.cli.common import logger, output_option
from pixelator.utils import (
click_echo,
create_output_stage_dir,
get_process_pool_executor,
get_sample_name,
log_step_start,
sanity_check_inputs,
Expand Down Expand Up @@ -164,20 +164,16 @@ def analysis(

# sanity check on the anlyses
if not any([compute_polarization, compute_colocalization]):
msg = "All the analysis are disabled, no scores will be computed"
click_echo(msg)
logger.warning(msg)
logger.warning("All the analysis are disabled, no scores will be computed")

# create output folder if it does not exist
analysis_output = create_output_stage_dir(output, "analysis")

# compute graph/clusters using parallel processing
with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor:
with get_process_pool_executor(ctx) as executor:
jobs = []
for zip_file in input_files:
msg = f"Computing analysis for file {zip_file}"
click_echo(msg)
logger.info(msg)
logger.info(f"Computing analysis for file {zip_file}")

clean_name = get_sample_name(zip_file)
metrics_file = analysis_output / f"{clean_name}.report.json"
Expand Down
21 changes: 10 additions & 11 deletions src/pixelator/cli/annotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from pixelator.cli.common import logger, output_option
from pixelator.config import config, load_antibody_panel
from pixelator.utils import (
click_echo,
create_output_stage_dir,
get_process_pool_executor,
get_sample_name,
log_step_start,
sanity_check_inputs,
Expand Down Expand Up @@ -120,13 +120,14 @@ def annotate(

# warn if both --dynamic-filter and hard-coded sizes are input
if min_size is not None and dynamic_filter in ["min", "both"]:
msg = "--dynamic-filter will overrule the value introduced in --min-size"
click_echo(msg, multiline=False)
logger.warning(msg)
logger.warning(
"--dynamic-filter will overrule the value introduced in --min-size"
)

if max_size is not None and dynamic_filter in ["max", "both"]:
msg = "--dynamic-filter will overrule the value introduced in --max-size"
click_echo(msg, multiline=False)
logger.warning(msg)
logger.warning(
"--dynamic-filter will overrule the value introduced in --max-size"
)

# create output folder if it does not exist
annotate_output = create_output_stage_dir(output, "annotate")
Expand All @@ -135,12 +136,10 @@ def annotate(
panel = load_antibody_panel(config, panel)

# compute graph/components using parallel processing
with futures.ProcessPoolExecutor(max_workers=ctx.obj["CORES"]) as executor:
with get_process_pool_executor(ctx) as executor:
jobs = []
for ann_file in input_files:
msg = f"Computing annotation for file {ann_file}"
click_echo(msg, multiline=False)
logger.info(msg)
logger.info(f"Computing annotation for file {ann_file}")

clean_name = get_sample_name(ann_file)
metrics_file = annotate_output / f"{clean_name}.report.json"
Expand Down
Loading

0 comments on commit e3d895c

Please sign in to comment.