Skip to content

Commit

Permalink
feat: better scale inheritence
Browse files Browse the repository at this point in the history
  • Loading branch information
supersergiy committed Feb 4, 2025
1 parent 6646edf commit 7f3b538
Show file tree
Hide file tree
Showing 7 changed files with 660 additions and 10 deletions.
19 changes: 19 additions & 0 deletions zetta_utils/layer/precomputed.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ def get_info(path: str) -> dict[str, Any]:
PrecomputedAnnotationDType = Literal["POINT", "LINE", "AXIS_ALIGNED_BOUNDING_BOX", "ELLIPSOID"]
PrecomputedDType = Union[PrecomputedVolumeDType, PrecomputedAnnotationDType]

NON_INHERITABLE_SCALE_KEYS = [
"sharding",
"encoding",
"voxel_offset",
"size",
"chunk_sizes",
"resolution",
"key",
]


@attrs.mutable
class InfoSpecParams:
Expand Down Expand Up @@ -212,6 +222,15 @@ def from_reference(
encoding = reference_scale["encoding"]
if chunk_size is None:
chunk_size = reference_scale["chunk_sizes"][0]

if extra_scale_data is None:
extra_scale_data = {}
extra_scale_data = {
**{
k: v for k, v in reference_scale.items() if k not in NON_INHERITABLE_SCALE_KEYS
},
**extra_scale_data,
}
else:
missing_params = []
if type is None:
Expand Down
2 changes: 2 additions & 0 deletions zetta_utils/layer/volumetric/cloudvol/deprecated/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import backend
from . import build
314 changes: 314 additions & 0 deletions zetta_utils/layer/volumetric/cloudvol/deprecated/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
# pylint: disable=missing-docstring
from __future__ import annotations

from copy import deepcopy
from typing import Any, Dict, Optional, Union

import attrs
import cachetools
import cloudvolume as cv
import numpy as np
from cloudvolume import CloudVolume
from cloudvolume.exceptions import ScaleUnavailableError
from numpy import typing as npt

from zetta_utils.common import abspath, is_local
from zetta_utils.geometry import Vec3D

from ....deprecated.precomputed import InfoExistsModes, PrecomputedInfoSpec, get_info
from ... import VolumetricBackend, VolumetricIndex

_cv_cache: cachetools.LRUCache = cachetools.LRUCache(maxsize=16)
_cv_cached: Dict[str, set] = {}

IN_MEM_CACHE_NUM_BYTES_PER_CV = 128 * 1024 ** 2

# To avoid reloading info file - note that an empty provenance is passed
# since otherwise the CloudVolume's __new__ will download the provenance
# TODO: Use `assume_metadata` off of the cached info, using `get_info`.
# Cannot use regular hashkey as the resolutions used need to be tracked
def _get_cv_cached(
path: str,
resolution: Optional[Vec3D] = None,
cache_bytes_limit: Optional[int] = None,
**kwargs,
) -> cv.frontends.precomputed.CloudVolumePrecomputed:
path_ = abspath(path)
if cache_bytes_limit is None:
cache_bytes_limit = IN_MEM_CACHE_NUM_BYTES_PER_CV
if (path_, resolution) in _cv_cache:
return _cv_cache[(path_, resolution)]
if resolution is not None:
try:
result = CloudVolume(
path_,
info=get_info(path_),
provenance={},
mip=tuple(resolution),
lru_bytes=cache_bytes_limit,
**kwargs,
)
except ScaleUnavailableError as e:
raise ScaleUnavailableError(f"{path_} - {e}") from e
else:
result = CloudVolume(
path_,
info=get_info(path_),
provenance={},
lru_bytes=cache_bytes_limit,
**kwargs,
)
_cv_cache[(path_, resolution)] = result
if path_ not in _cv_cached:
_cv_cached[path_] = set()
_cv_cached[path_].add(resolution)
return result


def _clear_cv_cache(path: str | None = None) -> None: # pragma: no cover
if path is None:
_cv_cached.clear()
_cv_cache.clear()
return
path_ = abspath(path)
resolutions = _cv_cached.pop(path_, None)
if resolutions is not None:
for resolution in resolutions:
_cv_cache.pop((path_, resolution), None)


@attrs.mutable
class CVBackend(VolumetricBackend): # pylint: disable=too-few-public-methods
"""
Backend for peforming IO on Neuroglancer datasts using CloudVolume library.
Read data will be a ``npt.NDArray`` in ``CXYZ`` dimension order.
Write data is expected to be a ``npt.NDArray`` or ``np.ndarray`` in ``CXYZ``
dimension order.
:param path: CloudVolume path. Can be given as relative or absolute.
:param cv_kwargs: Parameters that will be passed to the CloudVolume constructor.
``mip`` keyword must not be present in ``cv_kwargs``, as the read resolution
is passed in as a part of index to the backend.
:param info_spec: Specification for the info file for the layer. If None, the
info is assumed to exist.
:param on_info_exists: Behavior mode for when both `info_spec` is given and
the layer info already exists.
"""

path: str
cv_kwargs: Dict[str, Any] = attrs.field(factory=dict)
info_spec: Optional[PrecomputedInfoSpec] = None
on_info_exists: InfoExistsModes = "expect_same"

def __attrs_post_init__(self):
if "mip" in self.cv_kwargs:
raise ValueError( # pragma: no cover
"Attempting to initialize CVBackend with a static MIP is not supported. "
"Please provide the intended resolution through the index"
)

self._set_cv_defaults()
if self.info_spec is None:
self.info_spec = PrecomputedInfoSpec()
overwritten = self.info_spec.update_info(self.path, self.on_info_exists)
if overwritten:
_clear_cv_cache(self.path)

def _set_cv_defaults(self):
self.cv_kwargs.setdefault("bounded", False)
self.cv_kwargs.setdefault("progress", False)
self.cv_kwargs.setdefault("autocrop", False)
self.cv_kwargs.setdefault("non_aligned_writes", False)
self.cv_kwargs.setdefault("cache", False)
self.cv_kwargs.setdefault("compress_cache", False)
self.cv_kwargs.setdefault("compress", None)
self.cv_kwargs.setdefault("cdn_cache", False)
self.cv_kwargs.setdefault("fill_missing", True)
self.cv_kwargs.setdefault("delete_black_uploads", True)
self.cv_kwargs.setdefault("agglomerate", True)
self.cv_kwargs.setdefault("lru_encoding", "raw")

@property
def name(self) -> str: # pragma: no cover
return self.path

@name.setter
def name(self, name: str) -> None: # pragma: no cover
raise NotImplementedError(
"cannot set `name` for CVBackend directly;"
" use `backend.with_changes(name='name')` instead."
)

@property
def dtype(self) -> np.dtype:
result = _get_cv_cached(self.path, **self.cv_kwargs)

return np.dtype(result.data_type)

@property
def num_channels(self) -> int: # pragma: no cover
result = _get_cv_cached(self.path, **self.cv_kwargs)
return result.num_channels

@property
def is_local(self) -> bool: # pragma: no cover
return is_local(self.path)

@property
def enforce_chunk_aligned_writes(self) -> bool: # pragma: no cover
return not self.cv_kwargs["non_aligned_writes"]

@enforce_chunk_aligned_writes.setter
def enforce_chunk_aligned_writes(self, value: bool) -> None: # pragma: no cover
raise NotImplementedError(
"cannot set `enforce_chunk_aligned_writes` for CVBackend directly;"
" use `backend.with_changes(non_aligned_writes=value:bool)` instead."
)

@property
def allow_cache(self) -> bool: # pragma: no cover
return self.cv_kwargs["cache"]

@allow_cache.setter
def allow_cache(self, value: Union[bool, str]) -> None: # pragma: no cover
raise NotImplementedError(
"cannot set `allow_cache` for CVBackend directly;"
" use `backend.with_changes(allow_cache=value:Union[bool, str])` instead."
)

@property
def use_compression(self) -> bool: # pragma: no cover
return self.cv_kwargs["compress"]

@use_compression.setter
def use_compression(self, value: bool) -> None: # pragma: no cover
raise NotImplementedError(
"cannot set `use_compression` for CVBackend directly;"
" use `backend.with_changes(use_compression=value:bool)` instead."
)

def clear_disk_cache(self) -> None: # pragma: no cover
info = get_info(self.path)
for scale in info["scales"]:
res = Vec3D[float](*scale["resolution"])
_get_cv_cached(self.path, resolution=res, **self.cv_kwargs).cache.flush()

def clear_cache(self) -> None: # pragma: no cover
_clear_cv_cache(self.path)

def read(self, idx: VolumetricIndex) -> npt.NDArray:
# Data out: cxyz
cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
data_raw = cvol[idx.to_slices()]

result = np.transpose(data_raw, (3, 0, 1, 2))
return np.array(result)

def write(self, idx: VolumetricIndex, data: npt.NDArray):
# Data in: cxyz
# Write format: xyzc (b == 1)
if self.enforce_chunk_aligned_writes:
self.assert_idx_is_chunk_aligned(idx)

if data.size == 1 and len(data.shape) == 1:
data_final = data[0]
elif len(data.shape) == 4:
data_final = np.transpose(data, (1, 2, 3, 0))
else:
raise ValueError(
"Data written to CloudVolume backend must be in `cxyz` dimension format, "
f"but got a tensor of with ndim == {data.ndim}"
)

cvol = _get_cv_cached(self.path, idx.resolution, **self.cv_kwargs)
slices = idx.to_slices()
# Enable autocrop for writes only
cvol.autocrop = True

if (cvol.dtype == "uint64") and (data_final.dtype == np.int64):
if data_final.min() < np.int64(0):
raise ValueError("Attempting to write negative values to a uint64 CloudVolume")
data_final = data_final.astype(np.uint64)
cvol[slices] = data_final
cvol.autocrop = False

def with_changes(self, **kwargs) -> CVBackend:
"""Currently untyped. Supports:
"name" = value: str
"use_compression" = value: str
"enforce_chunk_aligned_writes" = value: bool
"voxel_offset_res" = (voxel_offset, resolution): Tuple[Vec3D[int], Vec3D]
"chunk_size_res" = (chunk_size, resolution): Tuple[Vec3D[int], Vec3D]
"dataset_size_res" = (dataset_size, resolution): Tuple[Vec3D[int], Vec3D]
"allow_cache" = value: Union[bool, str]
"""
assert self.info_spec is not None

info_spec = deepcopy(self.info_spec)
cv_kwargs = deepcopy(self.cv_kwargs)

implemented_keys = [
"name",
"allow_cache",
"use_compression",
"enforce_chunk_aligned_writes",
"voxel_offset_res",
"chunk_size_res",
"dataset_size_res",
]
keys_to_kwargs = {"name": "path"}
keys_to_infospec_fn = {
"voxel_offset_res": info_spec.set_voxel_offset,
"chunk_size_res": info_spec.set_chunk_size,
"dataset_size_res": info_spec.set_dataset_size,
}
keys_to_cv_kwargs = {
"use_compression": "compress",
"enforce_chunk_aligned_writes": "non_aligned_writes",
}
keys_to_reverse = ["enforce_chunk_aligned_writes"]
evolve_kwargs = {}
for k, v in kwargs.items():
if k not in implemented_keys:
raise KeyError(f"key `{k}` received, expected one of `{implemented_keys}`")
if k in keys_to_cv_kwargs:
if k in keys_to_reverse:
v = not v
cv_kwargs[keys_to_cv_kwargs[k]] = v
if k in keys_to_kwargs:
evolve_kwargs[keys_to_kwargs[k]] = v
if k in keys_to_infospec_fn:
keys_to_infospec_fn[k](v)

if "name" in kwargs:
_clear_cv_cache(kwargs["name"])
_clear_cv_cache(self.path)
result = attrs.evolve(
self,
**evolve_kwargs,
info_spec=info_spec,
cv_kwargs=cv_kwargs,
on_info_exists="overwrite",
)
return result

def get_voxel_offset(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
return Vec3D[int](*cvol.voxel_offset)

def get_chunk_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
return Vec3D[int](*cvol.chunk_size)

def get_dataset_size(self, resolution: Vec3D) -> Vec3D[int]:
cvol = _get_cv_cached(self.path, resolution=resolution, **self.cv_kwargs)
return Vec3D[int](*cvol.volume_size)

def get_bounds(self, resolution: Vec3D) -> VolumetricIndex: # pragma: no cover
offset = self.get_voxel_offset(resolution)
size = self.get_dataset_size(resolution)
return VolumetricIndex.from_coords(offset, offset + size, resolution)

def pformat(self) -> str: # pragma: no cover
return self.name
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from zetta_utils import builder
from zetta_utils.tensor_ops import InterpolationMode

from ... import DataProcessor, IndexProcessor, JointIndexDataProcessor
from ...deprecated.precomputed import InfoExistsModes, PrecomputedInfoSpec
from .. import VolumetricIndex, VolumetricLayer, build_volumetric_layer
from . import CVBackend
from .... import DataProcessor, IndexProcessor, JointIndexDataProcessor
from ....deprecated.precomputed import InfoExistsModes, PrecomputedInfoSpec
from ... import VolumetricIndex, VolumetricLayer, build_volumetric_layer
from .backend import CVBackend

# from typeguard import typechecked

Expand Down Expand Up @@ -124,7 +124,7 @@ def build_cv_layer( # pylint: disable=too-many-locals
path=path,
cv_kwargs=cv_kwargs,
on_info_exists=on_info_exists,
info_spec=PrecomputedInfoSpec( # type: ignore # deprecated and new are compatible
info_spec=PrecomputedInfoSpec(
type=info_type,
data_type=info_data_type,
num_channels=info_num_channels,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import backend
from . import deprecated
Loading

0 comments on commit 7f3b538

Please sign in to comment.