forked from haesleinhuepf/napari-skimage-regionprops
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_all_frames.py
145 lines (119 loc) · 5.16 KB
/
_all_frames.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from toolz import curry
from typing import Callable
from functools import wraps
import inspect
import numpy as np
import pandas as pd
from ._utilities import isimage
try:
import napari
except Exception as e:
import warnings
warnings.warn(str(e))
@curry
def analyze_all_frames(function: Callable) -> Callable:
from napari_workflows._workflow import _get_layer_from_data
@wraps(function)
def worker_function(*args, **kwargs):
args = list(args)
sig = inspect.signature(function)
# create mapping from position and keyword arguments to parameters
# will raise a TypeError if the provided arguments do not match the signature
# https://docs.python.org/3/library/inspect.html#inspect.Signature.bind
bound = sig.bind(*args, **kwargs)
# set default values for missing arguments
# https://docs.python.org/3/library/inspect.html#inspect.BoundArguments.apply_defaults
bound.apply_defaults()
# Retrieve the viewer parameter so that we can know which current timepoint is selected
viewer = None
for key, value in bound.arguments.items():
if isinstance(value, napari.Viewer):
viewer = value
viewer_key = key
labels_layer = None
image_layer = None
original_args = copy_dict(bound.arguments)
if viewer is not None:
variable_timepoint = list(viewer.dims.current_step)
current_timepoint = variable_timepoint[0]
max_time = int(viewer.dims.range[-4][1])
# find a labels layer to attach result
for key, value in original_args.items():
if isimage(value):
layer = _get_layer_from_data(viewer, value)
if isinstance(layer, napari.layers.Labels):
labels_layer = layer
labels_layer_key = key
if isinstance(layer, napari.layers.Image):
image_layer = layer
image_layer_key = key
else:
max_time = 0
for key, value in original_args.items():
if isimage(value):
if len(value.shape) == 4 and max_time < value.shape[0]:
max_time = value.shape[0]
original_args = copy_dict(bound.arguments)
result = None
for f in range(max_time):
print("analyzing frame", f)
args = copy_dict(original_args)
if viewer is None:
for key, value in args.items():
if isimage(value):
if len(value.shape) == 4:
new_value = value[f]
if new_value.shape[0] == 1:
new_value = new_value[0]
args[key] = new_value
elif len(value.shape) == 3:
# keep a 3D label image for example
pass
else:
raise NotImplementedError("Analyzing all frames only supports combination of 3D and 4D-data")
else:
# in case of 4D-data (timelapse) crop out the current 3D timepoint
if len(viewer.dims.current_step) != 4:
raise NotImplementedError("Analyzing all frames only supports 4D-data")
variable_timepoint[0] = f
viewer.dims.current_step = variable_timepoint
_refresh_viewer(viewer)
from napari_workflows._workflow import _break_down_4d_to_2d_kwargs
args[labels_layer_key] = labels_layer.data
args[image_layer_key] = image_layer.data
_break_down_4d_to_2d_kwargs(args, f, viewer)
args[viewer_key] = None
bound.arguments = args
# call the decorated function
result_single_frame = function(*bound.args, **bound.kwargs)
result_single_frame['frame'] = [f] * len(result_single_frame['label'])
if result is None:
result = pd.DataFrame(result_single_frame)
else:
result = pd.concat([result, pd.DataFrame(result_single_frame)], ignore_index=True)
if viewer is not None:
# reset viewer
variable_timepoint[0] = current_timepoint
viewer.dims.current_step = variable_timepoint
_refresh_viewer(viewer)
if labels_layer is not None:
labels_layer.properties = result.to_dict(orient='list')
from ._table import add_table
add_table(labels_layer, viewer)
else:
return result.to_dict()
return worker_function
def copy_dict(source, result=None):
if result is None:
result = {}
for k, v in source.items():
result[k] = v
return result
def _refresh_viewer(viewer):
if viewer is None:
return
from napari_workflows import WorkflowManager
wm = WorkflowManager.install(viewer)
w = wm.workflow
while(wm._search_first_invalid_layer (w.roots()) is not None):
wm._update_invalid_layer()