diff --git a/caveclient/endpoints.py b/caveclient/endpoints.py index f2a950dd..711806d9 100644 --- a/caveclient/endpoints.py +++ b/caveclient/endpoints.py @@ -299,10 +299,11 @@ skeleton_v1 = "{skeleton_server_address}/skeletoncache/api/v1" skeletonservice_endpoints_v1 = { "get_version": skeleton_common + "/version", + "get_versions": skeleton_common + "/versions", "skeleton_info": skeleton_v1 + "/{datastack_name}/precomputed/skeleton/info", "bulk_skeleton_info": skeleton_v1 + "/{datastack_name}/bulk/skeleton/info", "skeleton_info_versioned": skeleton_v1 - + "/{datastack_name}/precomputed/skeleton/info/{skvn}", + + "/{datastack_name}/precomputed/skeleton/{skvn}/info", "get_cache_contents_via_ridprefixes": skeleton_v1 + "/{datastack_name}/precomputed/skeleton/query_cache/{root_id_prefixes}/{limit}", "get_cache_contents_via_skvn_ridprefixes": skeleton_v1 diff --git a/caveclient/skeletonservice.py b/caveclient/skeletonservice.py index 031c23dc..8d52b6d5 100644 --- a/caveclient/skeletonservice.py +++ b/caveclient/skeletonservice.py @@ -12,23 +12,12 @@ from cachetools import TTLCache, cached from packaging.version import Version +from .auth import AuthClient from .base import ( + ClientBase, + _api_endpoints, _check_version_compatibility, ) - -try: - import cloudvolume - - CLOUDVOLUME_AVAILABLE = True -except ImportError: - logging.warning( - "cloudvolume not available. 'precomputed' output format will not work." - ) - - CLOUDVOLUME_AVAILABLE = False - -from .auth import AuthClient -from .base import ClientBase, _api_endpoints from .endpoints import skeletonservice_api_versions, skeletonservice_common SERVER_KEY = "skeleton_server_address" @@ -88,28 +77,6 @@ def __init__( self._datastack_name = datastack_name - def _test_get_version(self) -> Optional[Version]: - logging.info("_test_get_version()") - endpoint_mapping = self.default_url_mapping - endpoint = self._endpoints.get("get_version_test", None) - logging.info(f"endpoint: {endpoint}") - if endpoint is None: - return None - - url = endpoint.format_map(endpoint_mapping) - logging.info(f"url: {url}") - response = self.session.get(url) - logging.info(f"response: {response}") - if response.status_code == 404: # server doesn't have this endpoint yet - logging.info("404") - return None - else: - version_str = response.json() - logging.info(f"version_str: {type(version_str)} {version_str}") - version = Version(version_str) - logging.info(f"version: {version}") - return version - def _test_l2cache_exception(self): raise NoL2CacheException( "This is a test of SkeletonClient's behavior when no L2Cache is found." @@ -151,6 +118,44 @@ def parse(url): url = parse(self._build_endpoint(rid, ds, 1, "json")) assert url == f"{ds}{innards}1/{rid}/json" + def get_version(self): + logging.info("get_version()") + endpoint_mapping = self.default_url_mapping + endpoint = self._endpoints.get("get_version", None) + logging.info(f"endpoint: {endpoint}") + + url = endpoint.format_map(endpoint_mapping) + logging.info(f"url: {url}") + response = self.session.get(url) + logging.info(f"response: {response}") + if response.status_code == 404: # server doesn't have this endpoint yet + logging.info("404") + return None + else: + version_str = response.json() + logging.info(f"version_str: {type(version_str)} {version_str}") + version = Version(version_str) + logging.info(f"version: {version}") + return version + + def get_versions(self): + logging.info("get_versions()") + endpoint_mapping = self.default_url_mapping + endpoint = self._endpoints.get("get_versions", None) + logging.info(f"endpoint: {endpoint}") + + url = endpoint.format_map(endpoint_mapping) + logging.info(f"url: {url}") + response = self.session.get(url) + logging.info(f"response: {response}") + if response.status_code == 404: # server doesn't have this endpoint yet + logging.info("404") + return None + else: + versions = response.json() + logging.info(f"versions: {type(versions)} {versions}") + return versions + @staticmethod def compressStringToBytes(inputString): """ @@ -395,7 +400,7 @@ def skeletons_exist( @cached(TTLCache(maxsize=32, ttl=3600)) def get_precomputed_skeleton_info( self, - skvn: int = 0, + skvn, datastack_name: Optional[str] = None, ): """get's the precomputed skeleton information @@ -422,18 +427,11 @@ def get_skeleton( self, root_id: int, datastack_name: Optional[str] = None, - skeleton_version: Optional[int] = 0, + skeleton_version: Optional[int] = None, output_format: Literal[ - "none", - "h5", + "dict", "swc", - "swccompressed", - "json", - "jsoncompressed", - "arrays", - "arrayscompressed", - "precomputed", - ] = "none", + ] = "dict", log_warning: bool = True, verbose_level: Optional[int] = 0, ): @@ -446,18 +444,12 @@ def get_skeleton( datastack_name : str The name of the datastack to check skeleton_version : int - The skeleton version to generate and retrieve. Options are documented in SkeletonService. Use 0 for latest. + The skeleton version to generate and retrieve. Options are documented in SkeletonService. Use 0 for Neuroglancer-compatibility. Use -1 for latest. output_format : string The format to retrieve. Options are: - - 'none': No return value (this can be used to generate a skeleton without retrieving it) - - 'precomputed': A cloudvolume.Skeleton object - - 'json': A dictionary - - 'jsoncompressed': A dictionary using compression for transmission (generally faster than 'json') - - 'arrays': A dictionary (literally a subset of the json response) - - 'arrayscompressed': A dictionary using compression for transmission (generally faster than 'arrays') + - 'dict': A dictionary - 'swc': A pandas DataFrame - - 'h5': An BytesIO object containing bytes for an h5 file Returns ------- @@ -468,8 +460,39 @@ def get_skeleton( if not self.fc.l2cache.has_cache(): raise NoL2CacheException("SkeletonClient requires an L2Cache.") + if output_format not in ["dict", "swc"]: + raise ValueError(f"Unknown output format: {output_format}") + + if verbose_level >= 1: + logging.info(f"SkeletonService version: {self._server_version}") + + if self._server_version < Version("0.6.0"): + logging.warning( + "SkeletonService version is less than 0.6.0. Please upgrade to the latest version." + ) + + # The output formats were changed in server v0.6.0 and must be handled differently by the client + if output_format == "dict": + if self._server_version < Version("0.6.0"): + endpoint_format = "jsoncompressed" + else: + endpoint_format = "flatdict" + elif output_format == "swc": + endpoint_format = "swccompressed" + + if skeleton_version is None: + logging.warning( + "The optional nature of the 'skeleton_version' parameter will be deprecated in the future. Please specify a skeleton version." + ) + skeleton_version = -1 + + # -1, to specify the latest version, was only added in server v0.6.1 + if self._server_version < Version("0.6.1") and skeleton_version == -1: + skeleton_versions = self.get_versions() + skeleton_version = sorted(skeleton_versions)[-1] + url = self._build_endpoint( - root_id, datastack_name, skeleton_version, output_format + root_id, datastack_name, skeleton_version, endpoint_format ) response = self.session.get(url) @@ -480,34 +503,22 @@ def get_skeleton( f"get_skeleton() response contains content of size {len(response.content)} bytes" ) - if output_format == "none": - return - if output_format == "precomputed": - if not CLOUDVOLUME_AVAILABLE: - raise ImportError( - "'precomputed' output format requires cloudvolume, which is not available." - ) - metadata = self.get_precomputed_skeleton_info( - skeleton_version, datastack_name - ) - vertex_attributes = metadata["vertex_attributes"] - return cloudvolume.Skeleton.from_precomputed( - response.content, vertex_attributes=vertex_attributes - ) - if output_format == "json": - return response.json() - if output_format == "jsoncompressed": + if endpoint_format == "jsoncompressed": + assert self._server_version < Version("0.6.0") + sk_json = SkeletonClient.decompressBytesToDict(response.content) + if "vertex_properties" in sk_json.keys(): + for key in sk_json["vertex_properties"].keys(): + # Radius was redundantly store both as a top-level parameter and in vertex_properties. + # We could either check for it (or any such redundancy key) and skip over it, or we could overwrite it. + # Since they were created as duplicates anyway, it doesn't matter which approach is used. + sk_json[key] = sk_json["vertex_properties"][key] + del sk_json["vertex_properties"] + return sk_json + if endpoint_format == "flatdict": + assert self._server_version >= Version("0.6.0") return SkeletonClient.decompressBytesToDict(response.content) - if output_format == "arrays": - return response.json() - if output_format == "arrayscompressed": - return SkeletonClient.decompressBytesToDict(response.content) - if output_format == "swc" or output_format == "swccompressed": - file_content = ( - response.content.decode() - if output_format == "swc" - else SkeletonClient.decompressBytesToString(response.content) - ) + if endpoint_format == "swccompressed": + file_content = SkeletonClient.decompressBytesToString(response.content) # I got the SWC column header from skeleton_plot.skel_io.py df = pd.read_csv( @@ -526,9 +537,6 @@ def get_skeleton( # df = df.apply(pd.to_numeric, downcast='float') return df - if output_format == "h5": - skeleton_bytesio = BytesIO(response.content) - return skeleton_bytesio raise ValueError(f"Unknown output format: {output_format}") @@ -537,11 +545,11 @@ def get_bulk_skeletons( self, root_ids: List, datastack_name: Optional[str] = None, - skeleton_version: Optional[int] = 0, + skeleton_version: Optional[int] = None, output_format: Literal[ - "json", + "dict", "swc", - ] = "json", + ] = "dict", generate_missing_skeletons: bool = False, log_warning: bool = True, verbose_level: Optional[int] = 0, @@ -555,16 +563,27 @@ def get_bulk_skeletons( datastack_name : str The name of the datastack to check skeleton_version : int - The skeleton version to generate. Use 0 for latest. + The skeleton version to generate. Use 0 for Neuroglancer-compatibility. Use -1 for latest. """ if not self.fc.l2cache.has_cache(): raise NoL2CacheException("SkeletonClient requires an L2Cache.") + if output_format == "dict": + endpoint_format = "flatdict" + elif output_format == "swc": + endpoint_format = "swc" + + if skeleton_version is None: + logging.warning( + "The optional nature of the 'skeleton_version' parameter will be deprecated in the future. Please specify a skeleton version." + ) + skeleton_version = -1 + url = self._build_bulk_endpoint( root_ids, datastack_name, skeleton_version, - output_format, + endpoint_format, generate_missing_skeletons, ) response = self.session.get(url) @@ -575,7 +594,7 @@ def get_bulk_skeletons( f"Generated skeletons for root_ids {root_ids} (with generate_missing_skeletons={generate_missing_skeletons})" ) - if output_format == "json": + if endpoint_format == "flatdict": sk_jsons = {} for rid, swc_bytes in response.json().items(): try: @@ -588,7 +607,7 @@ def get_bulk_skeletons( f"Error decompressing skeleton for root_id {rid}: {e}" ) return sk_jsons - elif output_format == "swc": + elif endpoint_format == "swc": sk_dfs = {} for rid, swc_bytes in response.json().items(): try: @@ -613,7 +632,7 @@ def generate_bulk_skeletons_async( self, root_ids: List, datastack_name: Optional[str] = None, - skeleton_version: Optional[int] = 0, + skeleton_version: Optional[int] = None, log_warning: bool = True, verbose_level: Optional[int] = 0, ): @@ -626,18 +645,31 @@ def generate_bulk_skeletons_async( datastack_name : str The name of the datastack to check skeleton_version : int - The skeleton version to generate. Use 0 for latest. + The skeleton version to generate. Use 0 for Neuroglancer-compatibility. Use -1 for latest. """ if not self.fc.l2cache.has_cache(): raise NoL2CacheException("SkeletonClient requires an L2Cache.") + if skeleton_version is None: + logging.warning( + "The optional nature of the 'skeleton_version' parameter will be deprecated in the future. Please specify a skeleton version." + ) + skeleton_version = -1 + url = self._build_bulk_async_endpoint( root_ids, datastack_name, skeleton_version ) response = self.session.get(url) self.raise_for_status(response, log_warning=log_warning) + estimated_async_time_secs_upper_bound = float(response.text) + if verbose_level >= 1: logging.info( f"Queued asynchronous skeleton generation for root_ids: {root_ids}" ) + logging.info( + f"Upper estimate to generate {len(root_ids)} skeletons: {estimated_async_time_secs_upper_bound} seconds" + ) + + return f"Upper estimate to generate {len(root_ids)} skeletons: {estimated_async_time_secs_upper_bound} seconds" diff --git a/docs/tutorials/skeletonization.md b/docs/tutorials/skeletonization.md index ee13dae2..bca22137 100644 --- a/docs/tutorials/skeletonization.md +++ b/docs/tutorials/skeletonization.md @@ -9,6 +9,8 @@ The skeleton service enables you to generate and retrieve skeletons from the ser The simplest way to initialize the CAVEclient is by merely providing the datastack of interest: ```python +import caveclient as cc + client = cc.CAVEclient(<datastack_name>) ```