Skip to content

Commit

Permalink
ref: improved testing and fixed pipeline issues
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Nov 27, 2023
1 parent f7016df commit b5b1704
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 40 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
- BREAKING CHANGE: Remove preselection capabilities, because it is not
well integrated into the pipeline. For more information, please see
issue #15.
- feat: introduce logic submodule for running pipelines
- feat: implement HDF5Writer.store_log
- enh: add Segmenter.hardware_processor property
- enh: introduce pipeline identifier for data pixel size
- enh: reduce pixel_size accuracy to 8 digits after the decimal point
for pipeline reproducibility
- enh: warn the user when creating a basin-based file without basin paths
- ref: deprecate pixel size correction in HDF5Data
- ref: increment DCNUM_PPID_GENERATION to 7
- ref: several changes and deprecations in the PPID helper functions
Expand Down
85 changes: 52 additions & 33 deletions src/dcnum/logic/ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ def __init__(self, job: DCNumPipelineJob, *args, **kwargs):
ret_dict=True)
self.event_count = 0

self._data = None
self._data_raw = None
self._data_temp_in = None
# current job state
self._state = "init"
# overall progress [0, 1]
Expand Down Expand Up @@ -77,25 +78,42 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

@property
def data(self):
if self._data is None:
def draw(self) -> HDF5Data:
"""Raw input data"""
if self._data_raw is None:
# Initialize with the proper kwargs (pixel_size)
self._data = HDF5Data(self.job["path_in"],
**self.job["data_kwargs"])
return self._data
self._data_raw = HDF5Data(self.job["path_in"],
**self.job["data_kwargs"])
return self._data_raw

@property
def dtin(self) -> HDF5Data:
"""Input data with (corrected) background image"""
if self._data_temp_in is None:
if not self.path_temp_in.exists():
# create basin-based input file
create_with_basins(path_out=self.path_temp_in,
basin_paths=[self.draw.path])
# Initialize with the proper kwargs (pixel_size)
self._data_temp_in = HDF5Data(self.path_temp_in,
**self.job["data_kwargs"])
assert len(self._data_temp_in) > 0
assert "image_bg" in self._data_temp_in
return self._data_temp_in

@property
def path_temp_in(self):
po = pathlib.Path(self.job["path_out"])
return po.with_name(po.stem + "_dcn_input_basin.rtdc~")
return po.with_name(po.stem + "_input_bb.rtdc~")

@property
def path_temp_out(self):
po = pathlib.Path(self.job["path_out"])
return po.with_name(po.stem + "_dcn_output_temp.rtdc~")
return po.with_name(po.stem + "_output.rtdc~")

def close(self):
self.data.close()
self.draw.close()
self.dtin.close()
# clean up logging
self.log_queue.cancel_join_thread()
self.log_queue.close()
Expand All @@ -121,22 +139,22 @@ def run(self):
# "pipeline:dcnum hash" in case individual steps of the pipeline
# have been run by a rogue data analyst.
datdict = {
"gen_id": self.data.h5.attrs.get("pipeline:dcnum generation", "0"),
"dat_id": self.data.h5.attrs.get("pipeline:dcnum data", "0"),
"bg_id": self.data.h5.attrs.get("pipeline:dcnum background", "0"),
"seg_id": self.data.h5.attrs.get("pipeline:dcnum segmenter", "0"),
"feat_id": self.data.h5.attrs.get("pipeline:dcnum feature", "0"),
"gate_id": self.data.h5.attrs.get("pipeline:dcnum gate", "0"),
"gen_id": self.draw.h5.attrs.get("pipeline:dcnum generation", "0"),
"dat_id": self.draw.h5.attrs.get("pipeline:dcnum data", "0"),
"bg_id": self.draw.h5.attrs.get("pipeline:dcnum background", "0"),
"seg_id": self.draw.h5.attrs.get("pipeline:dcnum segmenter", "0"),
"feat_id": self.draw.h5.attrs.get("pipeline:dcnum feature", "0"),
"gate_id": self.draw.h5.attrs.get("pipeline:dcnum gate", "0"),
}
# The hash of a potential previous pipeline run.
dathash = self.data.h5.attrs.get("pipeline:dcnum hash", "0")
dathash = self.draw.h5.attrs.get("pipeline:dcnum hash", "0")
# The number of events extracted in a potential previous pipeline run.
evyield = self.data.h5.attrs.get("pipeline:dcnum yield", -1),
evyield = self.draw.h5.attrs.get("pipeline:dcnum yield", -1),
redo_sanity = (
# Whether pipeline hash is invalid.
ppid.compute_pipeline_hash(**datdict) != dathash
# Whether the input file is the original output of the pipeline.
or len(self.data) != evyield
or len(self.draw) != evyield
)
# Do we have to recompute the background data? In addition to the
# hash sanity check above, check the generation, input data,
Expand All @@ -156,17 +174,13 @@ def run(self):
or (datdict["feat_id"] != self.ppdict["feat_id"])
or (datdict["gate_id"] != self.ppdict["gate_id"]))

# Create a basin-based temporary input file
create_with_basins(path_out=self.path_temp_in,
basin_paths=[self.data.path])

self._state = "background"

if redo_bg:
# The 'image_bg' feature is written to `self.path_temp_in`.
# If `self.data.path` already has the correct 'image_bg'
# feature, then this one is used automatically (because
# `self.path_temp_in` is basin-based).
# If `job["path_in"]` already has the correct 'image_bg'
# feature, then we never reach this case here
# (note that `self.path_temp_in` is basin-based).
self.task_background()

self._progress = 0.1
Expand Down Expand Up @@ -229,10 +243,15 @@ def task_background(self):
feature.
"""
self.logger.info("Starting background computation")
if self._data_temp_in is not None:
# Close the temporary input data file, so we can write to it.
self._data_temp_in.close()
self._data_temp_in = None
# Start background computation
bg_code = self.job["background_code"]
bg_cls = get_available_background_methods()[bg_code]
with bg_cls(
input_data=self.data.image.h5ds,
input_data=self.job["path_in"],
output_path=self.path_temp_in,
# always compress, the disk is usually the bottleneck
compress=True,
Expand Down Expand Up @@ -260,9 +279,9 @@ def task_segment_extract(self):
# Start segmentation thread
seg_cls = get_available_segmenters()[self.job["segmenter_code"]]
if seg_cls.requires_background_correction:
imdat = self.data.image_corr
imdat = self.dtin.image_corr
else:
imdat = self.data.image
imdat = self.dtin.image

if self.job["debug"]:
num_slots = 1
Expand All @@ -289,8 +308,8 @@ def task_segment_extract(self):

# Start feature extractor thread
fe_kwargs = QueueEventExtractor.get_init_kwargs(
data=self.data,
gate=gate.Gate(self.data, **self.job["gate_kwargs"]),
data=self.dtin,
gate=gate.Gate(self.dtin, **self.job["gate_kwargs"]),
log_queue=self.log_queue)
fe_kwargs["extract_kwargs"] = self.job["feature_kwargs"]

Expand All @@ -305,15 +324,15 @@ def task_segment_extract(self):

# Start the data collection thread
thr_coll = QueueCollectorThread(
data=self.data,
data=self.dtin,
event_queue=fe_kwargs["event_queue"],
writer_dq=writer_dq,
feat_nevents=fe_kwargs["feat_nevents"],
write_threshold=500,
)
thr_coll.start()

data_size = len(self.data)
data_size = len(self.dtin)
t0 = time.monotonic()

# So in principle we are done here. We do not have to do anything
Expand Down Expand Up @@ -364,7 +383,7 @@ def task_segment_extract(self):
self.event_count = thr_coll.written_events
if self.event_count == 0:
self.logger.error(
f"No events found in {self.data.path}! Please check the "
f"No events found in {self.draw.path}! Please check the "
f"input file or revise your pipeline.")

self.logger.info("Finished segmentation and feature extraction")
Expand Down
12 changes: 11 additions & 1 deletion src/dcnum/write/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import pathlib
from typing import List
import warnings

import h5py
import hdf5plugin
Expand All @@ -10,6 +11,11 @@
from .._version import version


class CreatingFileWithoutBasinWarning(UserWarning):
"""Issued when creating a basin-based dataset without basins"""
pass


class HDF5Writer:
def __init__(self, path, mode="a", ds_kwds=None):
"""Write deformability cytometry HDF5 data"""
Expand Down Expand Up @@ -181,6 +187,10 @@ def create_with_basins(
commonly used for relative and absolute paths).
"""
path_out = pathlib.Path(path_out)
if not basin_paths:
warnings.warn(f"Creating basin-based file '{path_out}' without any "
f"basins, since the list `basin_paths' is empty!",
CreatingFileWithoutBasinWarning)
with HDF5Writer(path_out, mode="w") as hw:
# Get the metadata from the first available basin path

Expand Down Expand Up @@ -211,7 +221,7 @@ def create_with_basins(
# Copy the metadata from the representative path.
if prep is not None:
# copy metadata
with h5py.File(prep) as h5:
with h5py.File(prep, libver="latest") as h5:
copy_metadata(h5_src=h5, h5_dst=hw.h5)
# extract features
features = sorted(h5["events"].keys())
Expand Down
4 changes: 4 additions & 0 deletions tests/test_feat_background_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import h5py
import numpy as np

import pytest

from dcnum.feat.feat_background import bg_sparse_median
from dcnum.read import HDF5Data
from dcnum.write import create_with_basins
Expand Down Expand Up @@ -77,6 +79,8 @@ def test_base_background_input_is_output(tmp_path):
assert "image_bg" in hd


@pytest.mark.filterwarnings(
"ignore::dcnum.write.writer.CreatingFileWithoutBasinWarning")
def test_base_background_output_basin_none(
tmp_path):
"""In dcnum 0.13.0, we introduced `create_with_basins`"""
Expand Down
6 changes: 6 additions & 0 deletions tests/test_feat_background_bg_roll_median.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def test_compute_median_for_slice():
assert not np.all(comp_in_b == comp_out_b)


@pytest.mark.filterwarnings(
"ignore::dcnum.write.writer.CreatingFileWithoutBasinWarning")
@pytest.mark.parametrize("event_count", [720, 730]) # should be independent
def test_median_map_iterator(tmp_path, event_count):
output_path = tmp_path / "test.h5"
Expand All @@ -63,6 +65,8 @@ def test_median_map_iterator(tmp_path, event_count):
assert jobs[6].stop == 7 * 5


@pytest.mark.filterwarnings(
"ignore::dcnum.write.writer.CreatingFileWithoutBasinWarning")
@pytest.mark.parametrize("event_count", [720, 730])
def test_median_process_next_batch(tmp_path, event_count):
output_path = tmp_path / "test.h5"
Expand Down Expand Up @@ -96,6 +100,8 @@ def test_median_process_next_batch(tmp_path, event_count):
assert np.all(ds[:90, 1, 0] == 7)


@pytest.mark.filterwarnings(
"ignore::dcnum.write.writer.CreatingFileWithoutBasinWarning")
@pytest.mark.parametrize("event_count, chunk_count", [[720, 8], [730, 9]])
def test_median_process_full(tmp_path, event_count, chunk_count):
output_path = tmp_path / "test.h5"
Expand Down
2 changes: 2 additions & 0 deletions tests/test_feat_background_bg_sparsemed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from helper_methods import retrieve_data


@pytest.mark.filterwarnings(
"ignore::dcnum.write.writer.CreatingFileWithoutBasinWarning")
@pytest.mark.parametrize("event_count,kernel_size,split_time",
[(720, 10, 0.01),
(730, 10, 0.01),
Expand Down
Loading

0 comments on commit b5b1704

Please sign in to comment.