From 695741c84568be41d4e59b45af42cc7d39491583 Mon Sep 17 00:00:00 2001 From: JoeStrout Date: Thu, 22 Aug 2024 09:45:17 -0800 Subject: [PATCH] =?UTF-8?q?Add=20precomp=5Fannotations=20file=E2=80=A6=20(?= =?UTF-8?q?#768)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add precomp_annotations file, for writing annotations in precomputed file format. * Added unit test for new precomp_annotations file. * Refactor precomp_annotations to use cloudfiles. * Push test coverage of precomp_annotations.py to 100%. * Generalize handling of GS paths to any non-local path. * Test and handle case of line that spans chunk boundary. * Yet another tweak to ensure codecov hits 100% on every test. --- .../test_precomp_annotations.py | 75 +++ .../db_annotations/precomp_annotations.py | 551 ++++++++++++++++++ 2 files changed, 626 insertions(+) create mode 100644 tests/unit/db_annotations/test_precomp_annotations.py create mode 100644 zetta_utils/db_annotations/precomp_annotations.py diff --git a/tests/unit/db_annotations/test_precomp_annotations.py b/tests/unit/db_annotations/test_precomp_annotations.py new file mode 100644 index 000000000..69d0f5db1 --- /dev/null +++ b/tests/unit/db_annotations/test_precomp_annotations.py @@ -0,0 +1,75 @@ +import os +import shutil + +import pytest + +from zetta_utils.db_annotations import precomp_annotations +from zetta_utils.db_annotations.precomp_annotations import LineAnnotation, SpatialFile +from zetta_utils.geometry.vec import Vec3D +from zetta_utils.layer.volumetric.index import VolumetricIndex + + +def test_round_trip(): + temp_dir = os.path.expanduser("~/temp/test_precomp_anno") + os.makedirs(temp_dir, exist_ok=True) + file_dir = os.path.join(temp_dir, "round_trip") + + lines = [ + LineAnnotation(line_id=1, start=(1640.0, 1308.0, 61.0), end=(1644.0, 1304.0, 57.0)), + LineAnnotation(line_id=2, start=(1502.0, 1709.0, 589.0), end=(1498.0, 1701.0, 589.0)), + LineAnnotation(line_id=3, start=(254.0, 68.0, 575.0), end=(258.0, 62.0, 575.0)), + LineAnnotation(line_id=4, start=(1061.0, 657.0, 507.0), end=(1063.0, 653.0, 502.0)), + LineAnnotation(line_id=5, start=(1298.0, 889.0, 315.0), end=(1295.0, 887.0, 314.0)), + ] + # Note: line 2 above, with the chunk_sizes below, will span 2 chunks, and so will + # be written out to both of them. + + index = VolumetricIndex.from_coords([0, 0, 0], [2000, 2000, 600], Vec3D(10, 10, 40)) + + sf = SpatialFile(file_dir, index) + assert sf.chunk_sizes == [(2000, 2000, 600)] + + chunk_sizes = [[2000, 2000, 600], [1000, 1000, 600], [500, 500, 300]] + sf = SpatialFile(file_dir, index, chunk_sizes) + os.makedirs(os.path.join(file_dir, "spatial0", "junkforcodecoverage")) + sf.clear() + sf.write_annotations([]) # (does nothing) + sf.write_annotations(lines) + sf.post_process() + + # Now create a *new* SpatialFile, given just the directory. + sf = SpatialFile(file_dir) + assert sf.index == index + assert sf.chunk_sizes == chunk_sizes + + lines_read = sf.read_all() + assert len(lines_read) == len(lines) + for line in lines: + assert line in lines_read + + # Above is typical usage. Below, we do some odd things + # to trigger other code paths we want to test. + lines_read = sf.read_all(-1, False) # allow duplicates + assert len(lines_read) == len(lines) + 1 + + shutil.rmtree(os.path.join(file_dir, "spatial0")) + entries = precomp_annotations.subdivide([], sf.index, sf.chunk_sizes, file_dir) + assert len(entries) == 3 + precomp_annotations.read_data(file_dir, entries[0]) + + shutil.rmtree(file_dir) # clean up when done + + +def test_edge_cases(): + with pytest.raises(ValueError): + precomp_annotations.path_join() + + # pylint: disable=use-implicit-booleaness-not-comparison + assert precomp_annotations.read_lines("/dev/null") == [] + + assert precomp_annotations.path_join("gs://foo/", "bar") == "gs://foo/bar" + + +if __name__ == "__main__": + test_round_trip() + test_edge_cases() diff --git a/zetta_utils/db_annotations/precomp_annotations.py b/zetta_utils/db_annotations/precomp_annotations.py new file mode 100644 index 000000000..d45a9e570 --- /dev/null +++ b/zetta_utils/db_annotations/precomp_annotations.py @@ -0,0 +1,551 @@ +""" +Module to support writing of annotations in precomputed format. + +(Note: unlike the other files in this directory, this one has +nothing to do with storage of annotations in a database. It's +all about writing -- and to some extent, reading -- precomputed +files as used by Neuroglancer.) + +References: + https://github.com/google/neuroglancer/issues/227 + https://github.com/google/neuroglancer/blob/master/src/datasource/precomputed/annotations.md +""" + +import io +import json +import os +import struct +from itertools import product +from math import ceil +from random import shuffle +from typing import IO, Optional, Sequence + +from cloudfiles import CloudFile, CloudFiles + +from zetta_utils import builder, log +from zetta_utils.geometry.vec import Vec3D +from zetta_utils.layer.volumetric.index import VolumetricIndex + +logger = log.get_logger("zetta_utils") + + +def point_in_bounds(point: Vec3D | Sequence[float], bounds: VolumetricIndex): + bounds_start = bounds.start + bounds_end = bounds.stop + for d in (0, 1, 2): + if point[d] < bounds_start[d] or point[d] > bounds_end[d]: + return False + return True + + +def is_local_filesystem(path: str) -> bool: + return path.startswith("file://") or "://" not in path + + +def path_join(*paths: str): + if not paths: + raise ValueError("At least one path is required") + + if not is_local_filesystem(paths[0]): + # Join paths using "/" for GCS or other URL-like paths + cleaned_paths = [path.strip("/") for path in paths] + return "/".join(cleaned_paths) + else: + # Use os.path.join for local file paths + return os.path.join(*paths) + + +class LineAnnotation: + def __init__(self, line_id: int, start: Sequence[float], end: Sequence[float]): + """ + Initialize a LineAnnotation instance. + + :param id: An integer representing the ID of the annotation. + :param start: A tuple of three floats representing the start coordinate (x, y, z). + :param end: A tuple of three floats representing the end coordinate (x, y, z). + + Coordinates are in units defined by "dimensions" in the info file. + """ + self.start = start + self.end = end + self.id = line_id + + def __repr__(self): + """ + Return a string representation of the LineAnnotation instance. + """ + return f"LineAnnotation(line_id={self.id}, start={self.start}, end={self.end})" + + def __eq__(self, other): + return ( + isinstance(other, LineAnnotation) + and self.id == other.id + and self.start == other.start + and self.end == other.end + ) + + def write(self, output: IO[bytes]): + """ + Write this annotation in binary format to the given output writer. + """ + output.write(struct.pack("<3f", *self.start)) + output.write(struct.pack("<3f", *self.end)) + + @staticmethod + def read(in_stream: IO[bytes]): + """ + Read an annotation in binary format from the given input reader. + """ + return LineAnnotation( + 0, # (ID will be filled in later) + struct.unpack("<3f", in_stream.read(12)), + struct.unpack("<3f", in_stream.read(12)), + ) + + def in_bounds(self, bounds: VolumetricIndex): + """ + Return whether either end of this line is in the given bounds. + (NOTE: better might be to check whether the line intersects the bounds, even + if neither end is within. But doing the simple thing for now.)""" + return point_in_bounds(self.start, bounds) or point_in_bounds(self.end, bounds) + + +class SpatialEntry: + """ + This is a helper class, mainly used internally, to define each level of subdivision + in a spatial (precomputed annotation) file. A level is defined by: + + chunk_size: 3-element list or tuple defining the size of a chunk in X, Y, and Z (in voxels). + grid_shape: 3-element list/tuple defining how many chunks there are in each dimension. + key: a string e.g. "spatial1" used as a prefix for the chunk file on disk. + limit: affects how the data is subsampled for display; should generally be the number + of annotations in this chunk, or 1 for no subsampling. It's confusing, but see: + https://github.com/google/neuroglancer/issues/227#issuecomment-2246350747 + """ + + def __init__(self, chunk_size: Sequence[int], grid_shape: Sequence[int], key: str, limit: int): + self.chunk_size = chunk_size + self.grid_shape = grid_shape + self.key = key + self.limit = limit + + def __repr__(self): + return ( + f"SpatialEntry(chunk_size={self.chunk_size}, grid_shape={self.grid_shape}, " + f'key="{self.key}", limit={self.limit})' + ) + + def to_json(self): + return f"""{{ + "chunk_size" : {list(self.chunk_size)}, + "grid_shape" : {list(self.grid_shape)}, + "key" : "{self.key}", + "limit": {self.limit} + }}""" + + +def write_bytes(file_or_gs_path: str, data: bytes): + """ + Write bytes to a local file or Google Cloud Storage. + + :param file_or_gs_path: path to file to write (local or GCS path) + :param data: bytes to write + """ + if not "//" in file_or_gs_path: + file_or_gs_path = "file://" + file_or_gs_path + cf = CloudFile(file_or_gs_path) + cf.put(data) + + +def write_lines(file_or_gs_path: str, lines: Sequence[LineAnnotation], randomize: bool = True): + """ + Write a set of lines to the given file, in 'multiple annotation encoding' format: + 1. Line count (uint64le) + 2. Data for each line (excluding ID), one after the other + 3. The line IDs (also as uint64le) + + :param file_path: local file or GS path of file to write + :param lines: iterable of LineAnnotation objects + :param randomize: if True, the lines will be written in random + order (without mutating the lines parameter) + """ + lines = list(lines) + if randomize: + lines = lines[:] + shuffle(lines) + + buffer = io.BytesIO() + # first write the count + buffer.write(struct.pack(" list[LineAnnotation]: + """ + Read a set of lines from the given file, which should be in + 'multiple annotation encoding' as defined in write_lines above. + """ + data = read_bytes(file_or_gs_path) + lines: list[LineAnnotation] = [] + if data is None or len(data) == 0: + return lines + with io.BytesIO(data) as buffer: + # first read the count + line_count = struct.unpack("= 0 else len(self.chunk_sizes) + spatial_level + result = [] + bounds_size = self.index.shape + chunk_size = Vec3D(*self.chunk_sizes[level]) + grid_shape = ceil(bounds_size / chunk_size) + level_key = f"spatial{level}" + level_dir = path_join(self.path, level_key) + for x in range(0, grid_shape[0]): + for y in range(0, grid_shape[1]): + for z in range(0, grid_shape[2]): + anno_file_path = path_join(level_dir, f"{x}_{y}_{z}") + result += read_lines(anno_file_path) + if filter_duplicates: + result_dict = {line.id: line for line in result} + result = list(result_dict.values()) + return result + + def post_process(self): + """ + Read all our data from the lowest-level chunks on disk, then rewrite: + 1. The higher-level chunks, if any; and + 2. The info file, with correct limits for each level. + This is useful after writing out a bunch of data with + write_annotations(data, False), which writes to only the lowest-level chunks. + """ + # read data (from lowest level chunks) + all_data = self.read_all() + + # write data to all levels EXCEPT the last one + levels_to_write = range(0, len(self.chunk_sizes) - 1) + spatial_entries = subdivide( + all_data, self.index, self.chunk_sizes, self.path, levels_to_write + ) + + # rewrite the info file, with the updated spatial entries + self.write_info_file(spatial_entries)