Skip to content

Commit

Permalink
Performance optimization in precomp_annotations.subdivide
Browse files Browse the repository at this point in the history
(substantially speeds up finalization of large annotation files)
  • Loading branch information
JoeStrout authored and nkemnitz committed Dec 20, 2024
1 parent c40fc57 commit 58801ff
Showing 1 changed file with 45 additions and 19 deletions.
64 changes: 45 additions & 19 deletions zetta_utils/db_annotations/precomp_annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def read_data(dir_path, spatial_entry):
return result


# pylint: disable=too-many-locals,too-many-nested-blocks,cell-var-from-loop
def subdivide(data, bounds: VolumetricIndex, chunk_sizes, write_to_dir=None, levels_to_write=None):
"""
Subdivide the given data and bounds into chunks and subchunks of
Expand All @@ -328,22 +329,45 @@ def subdivide(data, bounds: VolumetricIndex, chunk_sizes, write_to_dir=None, lev
grid_shape = ceil(bounds_size / chunk_size)
logger.info(f"subdividing {bounds} by {chunk_size}, for grid_shape {grid_shape}")
level_key = f"spatial{level}"
for x, y, z in product(range(grid_shape[0]), range(grid_shape[1]), range(grid_shape[2])):
chunk_start = bounds.start + Vec3D(x, y, z) * chunk_size
chunk_end = chunk_start + chunk_size
chunk_bounds = VolumetricIndex.from_coords(chunk_start, chunk_end, bounds.resolution)
# pylint: disable=cell-var-from-loop
chunk_data: Sequence[LineAnnotation] = list(
filter(lambda d: d.in_bounds(chunk_bounds), data)
# total_qty = grid_shape[0] * grid_shape[1] * grid_shape[2]
qty_done = 0
for x in range(grid_shape[0]):
x_start = bounds.start[0] + x * chunk_size[0]
x_end = x_start + chunk_size[0]
x_idx = VolumetricIndex.from_coords(
(x_start, bounds.start[1], bounds.start[2]),
(x_end, bounds.stop[1], bounds.stop[2]),
bounds.resolution,
)
# logger.info(f'spatial{level}/{x}_{y}_{z} contains {len(chunk_data)} lines')
limit = max(limit, len(chunk_data))
if write_to_dir is not None and level in levels_to_write:
level_dir = path_join(write_to_dir, level_key)
if not os.path.exists(level_dir):
os.makedirs(level_dir)
anno_file_path = path_join(level_dir, f"{x}_{y}_{z}")
write_lines(anno_file_path, chunk_data)
data_within_x = list(filter(lambda d: d.in_bounds(x_idx), data))
for y in range(grid_shape[1]):
y_start = bounds.start[1] + y * chunk_size[1]
y_end = y_start + chunk_size[1]
y_idx = VolumetricIndex.from_coords(
(x_start, y_start, bounds.start[2]),
(x_end, y_end, bounds.stop[2]),
bounds.resolution,
)
data_within_xy = list(filter(lambda d: d.in_bounds(y_idx), data_within_x))
for z in range(grid_shape[2]):
qty_done += 1
chunk_start = bounds.start + Vec3D(x, y, z) * chunk_size
chunk_end = chunk_start + chunk_size
chunk_bounds = VolumetricIndex.from_coords(
chunk_start, chunk_end, bounds.resolution
)
# pylint: disable=cell-var-from-loop
chunk_data: Sequence[LineAnnotation] = list(
filter(lambda d: d.in_bounds(chunk_bounds), data_within_xy)
)
# logger.info(f'spatial{level}/{x}_{y}_{z} contains {len(chunk_data)} lines')
limit = max(limit, len(chunk_data))
if write_to_dir is not None and level in levels_to_write:
level_dir = path_join(write_to_dir, level_key)
if not os.path.exists(level_dir):
os.makedirs(level_dir)
anno_file_path = path_join(level_dir, f"{x}_{y}_{z}")
write_lines(anno_file_path, chunk_data)
spatial_entries.append(SpatialEntry(chunk_size, grid_shape, level_key, limit))

return spatial_entries
Expand Down Expand Up @@ -545,9 +569,12 @@ def read_all(self, spatial_level: int = -1, filter_duplicates: bool = True):
grid_shape = ceil(bounds_size / chunk_size)
level_key = f"spatial{level}"
level_dir = path_join(self.path, level_key)
# total_chunks = grid_shape[0] * grid_shape[1] * grid_shape[2]
chunks_read = 0
for x in range(0, grid_shape[0]):
for y in range(0, grid_shape[1]):
for z in range(0, grid_shape[2]):
chunks_read += 1
anno_file_path = path_join(level_dir, f"{x}_{y}_{z}")
result += read_lines(anno_file_path)
if filter_duplicates:
Expand All @@ -571,9 +598,9 @@ def read_in_bounds(self, index: VolumetricIndex, strict: bool = False):
level_dir = path_join(self.path, level_key)
start_chunk = (index.start - self.index.start) // chunk_size
end_chunk = (index.stop - self.index.start) // chunk_size
for x in range(max(0, start_chunk[0]), min(grid_shape[0], end_chunk[0])):
for y in range(max(0, start_chunk[1]), min(grid_shape[1], end_chunk[1])):
for z in range(max(0, start_chunk[2]), min(grid_shape[2], end_chunk[2])):
for x in range(max(0, start_chunk[0]), min(grid_shape[0], end_chunk[0] + 1)):
for y in range(max(0, start_chunk[1]), min(grid_shape[1], end_chunk[1] + 1)):
for z in range(max(0, start_chunk[2]), min(grid_shape[2], end_chunk[2] + 1)):
anno_file_path = path_join(level_dir, f"{x}_{y}_{z}")
result += read_lines(anno_file_path)
if strict:
Expand Down Expand Up @@ -699,7 +726,6 @@ def build_annotation_layer( # pylint: disable=too-many-locals, too-many-branche

@mazepa.taskable_operation
def post_process_annotation_layer_op(target: AnnotationLayer): # pragma: no cover
print(f"Post-processing: {target.path}")
target.post_process()


Expand Down

0 comments on commit 58801ff

Please sign in to comment.