diff --git a/opencsp/common/lib/cv/CacheableImage.py b/opencsp/common/lib/cv/CacheableImage.py index d8a558b1a..3b27d8c26 100644 --- a/opencsp/common/lib/cv/CacheableImage.py +++ b/opencsp/common/lib/cv/CacheableImage.py @@ -1,10 +1,13 @@ +import os import sys -from typing import Optional, Union +from typing import Callable, Optional, Union +import weakref import numpy as np import numpy.typing as npt from PIL import Image +import opencsp.common.lib.tool.exception_tools as et import opencsp.common.lib.tool.file_tools as ft import opencsp.common.lib.tool.image_tools as it import opencsp.common.lib.tool.log_tools as lt @@ -16,45 +19,339 @@ class CacheableImage: data isn't in use, or for retrieval of an image from the cached file when the data is in use. - Only one of the inputs (image, cache_path, source_path) are required. - However, if the image doesn't exist as a cache file (.npy) but does - exists as an image file (.png), then both the cache_path and source_path - can be provided. In this case the image will be loaded from the - source_path and when cached will be saved to the cache_path. - The intended use for this class is to reduce memory usage by caching images to disk while not in use. Therefore, there is an inherent priority order for the data that is returned from various methods: (1) in-memory array, (2) numpy cache file, (3) image source file. + + Parameters + ---------- + Only one of the inputs (array, cache_path, source_path) are required in the + constructor. In fact, there is a method :py:meth:`from_single_source` that + tries to guess which input is being provided. The "array" parameter should + be the raw image data, the cache_path should be a string to a .npy file, and + the source_path should be a string to an image file. Note that the + cache_path file doesn't need to exist yet, but can instead be an indicator + for where to cache the image to. + + The following table determines how images are retrieved based on the given + parameters ([X] = given and file exists, [.] = given and file doesn't + exist, * = any): + + +=======+=========+==========+========================================================================+ + | array | cache_p | source_p | retrieval method | + +=======+=========+==========+========================================================================+ + | X | | | array | + | | | | cache_path after cacheing (a temporary cache_path will be assigned) | + +-------+---------+----------+------------------------------------------------------------------------+ + | X | X | * | array | + | | | | cache_path after cacheing (array contents will then be ignored) | + +-------+---------+----------+------------------------------------------------------------------------+ + | X | [X] | | array | + | | | | cache_path after cacheing (array contents will be saved to cache_path) | + +-------+---------+----------+------------------------------------------------------------------------+ + | X | [X] | X | array | + | | | | source_path after cacheing (array contents will then be ignored) | + +-------+---------+----------+------------------------------------------------------------------------+ + | X | | X | array | + | | | | source_path after cacheing (array contents will then be ignored) | + +-------+---------+----------+------------------------------------------------------------------------+ + | X | | [X] | array | + | | | | cache_path after cacheing (array contents will be saved to cache_path) | + +-------+---------+----------+------------------------------------------------------------------------+ + | | X | * | cache_path | + +-------+---------+----------+------------------------------------------------------------------------+ + | | [X] | X | source_path | + +-------+---------+----------+------------------------------------------------------------------------+ + | | | X | source_path | + +-------+---------+----------+------------------------------------------------------------------------+ + + In addition, the following cases will raise a FileNotFoundError during the + __init__ method(): + + +=======+=========+==========+ + | array | cache_p | source_p | + +=======+=========+==========+ + | | [X] | | + +-------+---------+----------+ + | | | [X] | + +-------+---------+----------+ + + + sys.getsizeof + ------------- + This class overrides the default __sizeof__ dunder method, meaning that the + size returned by sys.getsizeof(cacheable_image) is not just the size of all + variables tracked by the instance. Rather, the size of the Numpy array and + Pillow image are returned. This metric better represents the + memory-conserving use case that is intended for this class. + + __sizeof__ returns close to 0 if the array and image attributes have been + set to None (aka the cache() method has been executed). Note that this does + not depend on the state of the garbage collector, which might not actually + free the memory for some time after it is no longer being tracked by this + class. An attempt at freeing the memory can be done immediately with + `gc.collect()` but the results of this are going to be implementation + specific. + + The size of the source path, image path, and all other attributes are not + included in the return value of __sizeof__. This decision was made for + simplicity. Also the additional memory has very little impact. For example a + 256 character path uses ~0.013% as much memory as a 1920x1080 monochrome + image. """ + _cacheable_images_registry: weakref.WeakKeyDictionary["CacheableImage", int] = {} + # Class variable that tracks the existence of each cacheable image and the + # order of accesses. + _inactive_registry: weakref.WeakKeyDictionary["CacheableImage", int] = {} + # Like _cacheable_images_registry, but for instances that have been deregistered and not reregistered. + _cacheable_images_last_access_index: int = 0 + # The last value used in the _cacheable_images_registry for maintaining access order. + _expected_cached_size: int = (48 * 2) * 2 + # Upper bound on the anticipated return value from __sizeof__ after cache() + # has just been evaluated. Each python variable uses ~"48" bytes, there are + # "*2" variables included (array and image), and we don't care about + # specifics so add some buffer "*2". + def __init__(self, array: np.ndarray = None, cache_path: str = None, source_path: str = None): """ Parameters ---------- array: np.ndarray, optional - The image, as it exists in memory. + The image, as it exists in memory. Should be compatible with + image_tools.numpy_to_image(). cache_path: str, optional - The cached version of the image. Should be a numpy (.npy) file. + Where to find and/or save the cached version of the image. Should be + a numpy (.npy) file. source_path: str, optional - The source file for the image (an image file, for example jpg or png). + The source file for the image (an image file, for example jpg or + png). If provided, then this file will be used as the backing cache + instead of creating a new cache file. """ - if array is None and cache_path == None and source_path == None: + # check that all the necessary inputs have been provided + err_msg = "Error in CacheableImage.__init__(): must provide at least one of array, cache_path, or source_path!" + fnfe_msg = err_msg + " %s file %s does not exist!" + if array is None: + if cache_path is None: + if source_path is None: + lt.error_and_raise(ValueError, err_msg) + elif not ft.file_exists(source_path): + lt.error_and_raise(FileNotFoundError, fnfe_msg % ("source", source_path)) + elif not ft.file_exists(cache_path): + if source_path is None: + lt.error_and_raise(FileNotFoundError, fnfe_msg % ("cache", source_path)) + elif not ft.file_exists(source_path): + lt.error_and_raise(FileNotFoundError, fnfe_msg % ("source/cache", source_path + "/" + cache_path)) + elif not isinstance(array, np.ndarray): lt.error_and_raise( - ValueError, "Error in CacheableImage.__init__(): must provide one of array, cache_path, or source_path!" + TypeError, "Error in CacheableImage.__init__(): " + "given array must be a numpy array, if given!" ) + + # verify that the paths are valid self.validate_cache_path(cache_path, "__init__") + self.validate_source_path(source_path, "__init__") + self._array = array + # The in-memory numpy array version of this image. None if not assigned or + # if cached. Should always be available whenever self._image is available. self._image = None + # The in-memory Pillow version of this image. None if not assigned, or if + # the data exists as a numpy array but not as an image, or if this + # instance is cached. self.cache_path = cache_path + """ The path/name.ext to the cached numpy array. """ self.source_path = source_path - self.cached = False + """ The path/name.ext to the source image file. """ + + self._register_access(self) + + def __del__(self): + if not hasattr(self, "cache_path"): + # this can happen when an error is raised during __init__() + pass + else: + if self._cache_path is not None: + with et.ignored(Exception): + ft.delete_file(self._cache_path) + self._cache_path = None + + @property + def cache_path(self) -> str | None: + """ + The path/name.ext to the cached numpy array, if set. Guaranteed to pass + validate_cache_path(). + """ + return self._cache_path + + @cache_path.setter + def cache_path(self, new_val: str | None): + """ + Parameters + ---------- + new_val : str + The path/name.ext to the cached numpy array. This file doesn't need + to exist yet, but if it does exist, then its contents should match + self.nparray. Must pass validate_cache_path(). + """ + if new_val is not None: + # verify the ending to the cache path file + new_val = ft.norm_path(new_val) + self.validate_cache_path(new_val, "cache_path") + + # verify the file contents equal the array contents + try: + arrval = self.nparray + except Exception: + arrval = None + if arrval is not None: + if ft.file_exists(new_val): + cache_val = self._load_image(new_val) + if not np.equal(arrval, cache_val).all(): + lt.error_and_raise( + ValueError, + "Error in CacheableImage.source_path(): " + + f"the contents of self.nparray and {new_val} do not match!" + + f" ({self.cache_path=}, {self.source_path=})", + ) + + self._cache_path = new_val + + @property + def source_path(self) -> str | None: + """ + The path/name.ext to the source image file, if set. Guaranteed to pass + validate_source_path(). + """ + return self._source_path + + @source_path.setter + def source_path(self, new_val: str | None): + """ + Parameters + ---------- + new_val : str + The path/name.ext to the source image file. This file doesn't need + to exist yet, but if it does exist, then it must be readable by + Pillow and its contents should match self.nparray. Must pass + validate_source_path(). + """ + if new_val is not None: + # verify we can read the file + new_val = ft.norm_path(new_val) + self.validate_source_path(new_val, "source_path") + + # verify the file contents equal the array contents + try: + arrval = self.nparray + except Exception: + arrval = None + if arrval is not None: + if ft.file_exists(new_val): + image = self._load_image(new_val) + if not np.equal(arrval, image).all(): + lt.error_and_raise( + ValueError, + "Error in CacheableImage.source_path(): " + + f"the contents of self.nparray and {new_val} do not match!" + + f" ({self.cache_path=}, {self.source_path=})", + ) + + self._source_path = new_val + + @staticmethod + def _register_access(instance: "CacheableImage"): + # Inserts this cacheable image as in index in the registry. This should be + # called every time the cacheable image is accessed for tracking the most + # recently used instances. This should be called at least during:: + # + # - creation + # - loading into memory + # - access via nparray() + # - access via to_image() + # + # Parameters + # ---------- + # instance : CacheableImage + # The instance to be registered. + images_registry = CacheableImage._cacheable_images_registry + inactive_registry = CacheableImage._inactive_registry + + if instance in images_registry: + with et.ignored(KeyError): + del images_registry[instance] + if instance in inactive_registry: + with et.ignored(KeyError): + del inactive_registry[instance] + images_registry[instance] = CacheableImage._cacheable_images_last_access_index + 1 + CacheableImage._cacheable_images_last_access_index += 1 + + @staticmethod + def _register_inactive(instance: "CacheableImage"): + # Removes the given instance from the active registry and inserts it into + # the inactive registry. The inactive registry is useful for when a + # cacheable image has been cached and likely won't be active again for a + # while. + images_registry = CacheableImage._cacheable_images_registry + inactive_registry = CacheableImage._inactive_registry + + if instance in images_registry: + with et.ignored(KeyError): + del images_registry[instance] + if instance in inactive_registry: + with et.ignored(KeyError): + del inactive_registry[instance] + inactive_registry[instance] = 0 + + @staticmethod + def lru(deregister=True) -> Optional["CacheableImage"]: + """ + Returns the least recently used cacheable instance, where "use" is + counted every time the image is loaded from cache. + + If deregister is true, then the returned instance is moved from the + "active" list to the "inactive" list of cacheable images. This is useful + when we anticipate that the returned instance isn't going to be used for + a while, such as from the method cache_images_to_disk_as_necessary(). + + This does not load any cached data from disk. + """ + images_registry = CacheableImage._cacheable_images_registry + + for instance_ref in images_registry: + if instance_ref is not None: + if deregister: + CacheableImage._register_inactive(instance_ref) + return instance_ref + else: + # the CacheableImage has been garbage collected, remove its + # entry from the weak references dict + with et.ignored(KeyError): + del images_registry[instance_ref] def __sizeof__(self) -> int: - return sys.getsizeof(self._array) + sys.getsizeof(self._image) + # Returns the number of bytes in use by the in-memory numpy array and + # Pillow image for this instance. + # + # This does not load any cached data from disk. + return sys.getsizeof(self._array) + it.getsizeof_approx(self._image) @classmethod - def from_single_source(cls, array_or_path: Union[np.ndarray, str, 'CacheableImage']) -> 'CacheableImage': + def all_cacheable_images_size(cls): + """ + The number of bytes of system memory used by all cacheable images for + in-memory numpy arrays and in-memory Pillow images. + """ + ret = 0 + for instance_ref in cls._cacheable_images_registry: + if instance_ref is not None: + ret += sys.getsizeof(instance_ref) + return ret + + @classmethod + def from_single_source( + cls, array_or_path: Union[np.ndarray, str, 'CacheableImage', Image.Image] + ) -> 'CacheableImage': """Generates a CacheableImage from the given numpy array, numpy '.npy' file, or image file.""" if isinstance(array_or_path, CacheableImage): return array_or_path @@ -66,23 +363,47 @@ def from_single_source(cls, array_or_path: Union[np.ndarray, str, 'CacheableImag elif isinstance(array_or_path, np.ndarray): array: np.ndarray = array_or_path return cls(array=array) + elif isinstance(array_or_path, Image.Image): + array: np.ndarray = np.array(array_or_path) + return cls(array=array) else: lt.error_and_raise( TypeError, f"Error in CacheableImage.from_single_source(): unexpected type {type(array_or_path)}" ) def validate_cache_path(self, cache_path: Optional[str], caller_name: str): + """Ensures that the given cache_path ends with ".npy", or is None.""" if cache_path == None: return + if not cache_path.lower().endswith(".npy"): _, _, ext = ft.path_components(cache_path) lt.error_and_raise( ValueError, - f"Error in CacheableImage.{caller_name}(): cache_path must end with '.npy' but instead the extension is {ext}", + f"Error in CacheableImage.{caller_name}(): " + + f"cache_path must end with '.npy' but instead the extension is {ext}", + ) + + def validate_source_path(self, source_path: Optional[str], caller_name: str): + """Ensures that the given source_path has one of the readable file extensions, or is None.""" + if source_path == None: + return + + _, _, ext = ft.path_components(source_path) + ext = ext[1:] # strip off the leading period "." + allowed_exts = sorted(it.pil_image_formats_rw + it.pil_image_formats_readable) + if ext.lower() not in allowed_exts: + lt.error_and_raise( + ValueError, + f"Error in CacheableImage.{caller_name}(): " + + f"{source_path} must be readable by Pillow, but " + + f"the extension {ext} isn't in the known extensions {allowed_exts}!", ) @staticmethod def _load_image(im: str | np.ndarray) -> npt.NDArray[np.int_]: + # Loads the cached numpy data or image file. If the given "im" is a numpy + # array then it will be returned as is. if isinstance(im, np.ndarray): return im elif im.lower().endswith(".npy"): @@ -92,58 +413,239 @@ def _load_image(im: str | np.ndarray) -> npt.NDArray[np.int_]: return np.array(im) def __load_image(self) -> npt.NDArray[np.int_] | None: + # Loads the numpy array from the cache or image file, as necessary. + # self._register_access(self) # registered in self.nparray if self._array is not None: - return self._load_image(self._array) + return self._array elif self.cache_path is not None and ft.file_exists(self.cache_path): - self.cached = True return self._load_image(self.cache_path) - elif ft.file_exists(self.source_path): - return self._load_image(self.source_path) + elif self._source_path is not None and ft.file_exists(self._source_path): + return self._load_image(self._source_path) else: lt.error_and_raise( RuntimeError, - f"Error in CacheableImage.__load_image(): Can't load image! {self._array=}, {self.cache_path=}, {self.source_path=}", + f"Error in CacheableImage.__load_image(): Can't load image! {self._array=}, {self.cache_path=}, {self._source_path=}", ) @property def nparray(self) -> npt.NDArray[np.int_] | None: - self._image = None + """The image data for this image, as a numpy array.""" + self._register_access(self) if self._array is None: - if not self.cached: - self._array = self.__load_image() + self._array = self.__load_image() - return self.__load_image() + return self._array def to_image(self) -> Image.Image: + """The image data for this image, as a Pillow image.""" if self._image == None: + # self._register_access(self) # registered in self.nparray self._image = it.numpy_to_image(self.nparray) + else: + self._register_access(self) + return self._image - def cache(self, cache_path: str = None): - """Stores this instance to the cache and releases the handle to the in-memory image. - Note that the memory might not be abailable for garbage collection, if - there are other parts of the code that still have references to the - in-memory image or array.""" - # get the path to the numpy file - if cache_path == None: - if self.cache_path == None: + def _does_source_image_match(self, nparray: np.ndarray): + # Returns true if this image's source_path image file matches the data + # in the given numpy array. + if self._source_path is not None and ft.file_exists(self._source_path): + imarray = np.array(Image.open(self._source_path)) + try: + arrays_are_equal = np.equal(nparray, imarray).all() + except Exception: + return False + + # Check that the programmer didn't misuse CacheableImage by setting + # the source_path to a file that doesn't actually match the numpy + # array or cache path. + if not arrays_are_equal: + try: + import os + import opencsp.common.lib.opencsp_path.opencsp_root_path as orp + + debug_dir = ft.norm_path(os.path.join(orp.opencsp_temporary_dir(), "debug")) + ft.create_directories_if_necessary(debug_dir) + np_path_name_ext = ft.norm_path(os.path.join(debug_dir, "CacheableImageNparray.png")) + im_path_name_ext = ft.norm_path(os.path.join(debug_dir, "CacheableImageImage.png")) + Image.fromarray(nparray).save(np_path_name_ext) + Image.fromarray(imarray).save(im_path_name_ext) + errtxt = f" These images have been saved to {debug_dir} for debugging." + except: + errtxt = "" + pass + lt.error_and_raise( ValueError, - "Error in CacheableImage.cache(): " - + "this instance does not have a pre-programmed cache_path and the provided cache_path is None. " - + "Caching requires at least one path to be non-None!", + "Error in CacheableImage(): " + + "the cacheable image array data and image file must be identical, but they are not! " + + f"{self.cache_path=}, {self._source_path=}" + + errtxt, ) - cache_path = self.cache_path - self.validate_cache_path(cache_path, "cache") - self.cache_path = cache_path - # check that this instance isn't already cached - if self._array is None and self._image == None: - return + return arrays_are_equal - # cache this instance - np.save(cache_path, self.nparray) + def cache(self, cache_path: str = None): + """ + Stores this instance to the cache and releases the handle to the in-memory image. + Note that the memory might not be available for garbage collection, if + there are other parts of the code that still have references to the + in-memory image or array. + + Parameters + ---------- + cache_path : str, optional + The path/name.ext to cache to, as necessary. Can be None if either + cache_path or source_path are already set. By default None. + """ + # use either self.cache_path or cache_path, depending on: + # 1. self.cache_path exists + # 2. cache_path exists + # 3. self.cache_path is set and cache_path does not exist + # 4. self.cache_path is not set cache_path is set + # 5. self.source_path exists + if self.cache_path is not None: + if ft.file_exists(self.cache_path): + # 1. do nothing + pass + elif cache_path is not None: + if ft.file_exists(cache_path): + # 2. update self.cache_path to the already existing file + self.cache_path = cache_path + else: + # 3. do nothing + pass + else: + # 3. do nothing + pass + else: + if cache_path is not None: + # 4. use the non-None value + self.cache_path = cache_path + else: + if self._source_path is not None and ft.file_exists(self._source_path): + # 5. don't need to create a numpy file if we can just read from the source file + pass + else: + # We don't have enough information about where to put the + # contents for this instance. + lt.error_and_raise( + ValueError, + "Error in CacheableImage.cache(): " + + "this instance was not created with a cache_path and the provided cache_path is None. " + + "Cacheing requires at least one path (cache_path or source_path) to be non-None!", + ) + + # Cache this instance. + if self._does_source_image_match(self.nparray): + # This instance was created from an image file, so we can simply + # depend on that image file instead of writing a new numpy file to + # disk. + pass + elif ft.file_exists(self.cache_path) and os.path.getsize(self.cache_path) > 10: + # This instance has already been cached to disk at least once. Don't + # need to cache it again. We check the size to make sure that the + # file was actually cached and doesn't just exist as a placeholder. + # I chose '> 10' instead of '> 0' because I'm paranoid that + # getsize() will return a small number of bytes on some systems. + pass + else: + # Write the numpy array to disk. + try: + np.save(self.cache_path, self.nparray) + except Exception: + lt.error( + "In CacheableImage.cache(): " + f"exception encountered while trying to save to file {cache_path}. " + ) + raise + + # Indicate that this instance is cached self._array = None self._image = None - self.cached = True + + def save_image(self, image_path_name_ext: str): + """ + Saves this image as an image file to the given file. This method is best + used when an image is intended to be kept after a computation, in which + case the newly saved image file can be the on-disk reference instead of + an on-disk cache file. + + Note: this replaces the internal reference to source_path, if any, with + the newly given path. It is therefore suggested to not use this method + unless you are using this class as part of an end-use application, in + order to avoid unintended side effects. + + Parameters + ---------- + image_path_name_ext : str + The file path/name.ext to save to. For example "image.png". + """ + self.to_image().save(image_path_name_ext) + self._source_path = image_path_name_ext + + @staticmethod + def cache_images_to_disk_as_necessary( + memory_limit_bytes: int, tmp_path_generator: Callable[[], str], log_level=lt.log.DEBUG + ): + """ + Check memory usage and convert images to files (aka file path strings) + as necessary in order to reduce memory usage. + + Note that due to the small amount of necessary memory used by each + CacheableImage instance, all instances can be cached and still be above + the `memory_limit_bytes` threshold. This can happen either when + memory_limit_bytes is sufficiently small, or the number of live + CacheableImages is sufficiently large. In these cases, this method may + not be able to lower the amount of memory in use. + + Parameters + ---------- + memory_limit_bytes : int + The total number of bytes of memory that all CacheableImages are + allowed to use for their in-memory arrays and images, in sum. Note + that each CachableImage instance will still use some small amount of + memory even after it has been cached. + tmp_path_generator : Callable[[], str] + A function that returns a path/name.ext for a file that does not + exist yet. This file will be used to save the numpy array out to. + log_level : int, optional + The level to print out status messages to, including the amount of + memory in use before and after caching images. By default + lt.log.DEBUG. + """ + # By providing the memory_limit_bytes as a parameter, we're effectively + # enabling the user to choose a lower memory threshold than is the + # default. There's also the benefit of requiring the user to think about + # how much memory they want to use, which is going to be system and + # application specific. + total_mem_size = CacheableImage.all_cacheable_images_size() + if total_mem_size <= memory_limit_bytes: + return + + log_method = lt.get_log_method_for_level(log_level) + target_mem_size = total_mem_size / 2 + log_method(f"Hit total memory size of {int(total_mem_size / 1024 / 1024)}MB") + + while total_mem_size > target_mem_size: + # Get the least recently used cacheable image. + # By cacheing the LRU instance, we are most likely to maintain + # images in memory that are going to be used again in the near + # future. + cacheable_image = CacheableImage.lru() + if cacheable_image is None: + break + + # free the LRU instance's memory by cacheing it to disk + cacheable_image_size = sys.getsizeof(cacheable_image) + if cacheable_image_size <= cacheable_image._expected_cached_size: + continue # already cached to disk, probably + if cacheable_image.cache_path is not None: + cacheable_image.cache(None) + else: # cache_path is None + cacheable_image.cache(tmp_path_generator()) + + bytes_cached_to_disk = cacheable_image_size - sys.getsizeof(cacheable_image) + total_mem_size -= bytes_cached_to_disk + + log_method(f"New total memory size after cacheing images: {int(total_mem_size / 1024 / 1024)}MB") diff --git a/opencsp/common/lib/cv/SpotAnalysis.py b/opencsp/common/lib/cv/SpotAnalysis.py index 10105c402..f643fe764 100644 --- a/opencsp/common/lib/cv/SpotAnalysis.py +++ b/opencsp/common/lib/cv/SpotAnalysis.py @@ -190,15 +190,6 @@ def set_image_processors(self, image_processors: list[asaip.AbstractSpotAnalysis self.visualization_coordinator.clear() self.visualization_coordinator.register_visualization_processors(image_processors) - # limit the amount of memory that image processors utilize - from opencsp.common.lib.cv.spot_analysis.image_processor.AbstractSpotAnalysisImageProcessorLeger import ( - image_processors_persistant_memory_total, - ) - - mem_per_image_processor = image_processors_persistant_memory_total / len(self.image_processors) - for image_processor in self.image_processors: - image_processor._allowed_memory_footprint = mem_per_image_processor - # assign the input stream to the first image processor if self.input_stream != None: self._assign_inputs(self.input_stream) @@ -263,7 +254,7 @@ def process_next(self): # Release memory from the previous result if self._prev_result is not None: - self.image_processors[-1].cache_image_to_disk_as_necessary(self._prev_result) + self.image_processors[-1].cache_images_to_disk_as_necessary() self._prev_result = None # Attempt to get the next image. Raises StopIteration if there are no diff --git a/opencsp/common/lib/cv/spot_analysis/SpotAnalysisOperable.py b/opencsp/common/lib/cv/spot_analysis/SpotAnalysisOperable.py index 14f677628..532b9c4f3 100644 --- a/opencsp/common/lib/cv/spot_analysis/SpotAnalysisOperable.py +++ b/opencsp/common/lib/cv/spot_analysis/SpotAnalysisOperable.py @@ -87,17 +87,13 @@ def __post_init__(self): requires_update = True # record the primary image source, if not available already - if primary_image_source_path == None: - if primary_image.source_path != None: + if primary_image_source_path is None: + if primary_image.source_path is not None: primary_image_source_path = primary_image.source_path else: primary_image_source_path = primary_image.cache_path - requires_update = True - - # set the source path on the cacheable instance of the primary image - if primary_image.source_path == None: - if primary_image_source_path != None: - primary_image.source_path = primary_image_source_path + if primary_image_source_path is not None: + requires_update = True if requires_update: # use __init__ to update frozen values @@ -114,8 +110,41 @@ def __post_init__(self): self.image_processor_notes, ) - def __sizeof__(self) -> int: - return sys.getsizeof(self.primary_image) + sum([sys.getsizeof(im) for im in self.supporting_images.values()]) + def get_all_images(self, primary=True, supporting=True, visualization=True, algorithm=True) -> list[CacheableImage]: + """ + Get a list of all images tracked by this operable including all primary + images, supporting images, visualization, and algorithm images. + + Parameters + ---------- + primary : bool, optional + True to include the primary image in the list of returned images. By + default True. + supporting : bool, optional + True to include the supporting images, if any, in the list of + returned images. By default True. + visualization : bool, optional + True to include the visualization images in the list of returned + images. By default True. + algorithm : bool, optional + True to include the algorithm images, if any, in the list of + returned images. By default True. + + Returns + ------- + list[CacheableImage] + The images tracked by this operable. + """ + ret: list[CacheableImage] = [] + + if primary: + ret.append(self.primary_image) + + if supporting: + for image_type in self.supporting_images: + ret.append(self.supporting_images[image_type]) + + return ret def replace_use_default_values( self, supporting_images: dict[ImageType, CacheableImage] = None, data: 'SpotAnalysisOperable' = None @@ -125,13 +154,13 @@ def replace_use_default_values( values.""" ret = self - if supporting_images != None: + if supporting_images is not None: for image_type in supporting_images: - if (image_type in ret.supporting_images) and (ret.supporting_images[image_type] != None): + if (image_type in ret.supporting_images) and (ret.supporting_images[image_type] is not None): supporting_images[image_type] = ret.supporting_images[image_type] ret = replace(ret, supporting_images=supporting_images) - if data != None: + if data is not None: given_fiducials = data.given_fiducials if len(self.given_fiducials) == 0 else self.given_fiducials found_fiducials = data.found_fiducials if len(self.found_fiducials) == 0 else self.found_fiducials annotations = data.annotations if len(self.annotations) == 0 else self.annotations @@ -180,7 +209,7 @@ def get_primary_path_nameext(self) -> tuple[str, str]: if image_name is not None and image_name != "": break - if image_name == None or image_name == "": + if image_name is None or image_name == "": ret_path, ret_name_ext = "unknown_path", "unknown_image" else: ret_path, name, ext = ft.path_components(image_name) @@ -202,7 +231,7 @@ def best_primary_pathnameext(self) -> str: def max_popf(self) -> npt.NDArray[np.float_]: """Returns the maximum population float value, if it exists. Otherwise returns the maximum value for this instance's primary image.""" - if self.population_statistics != None: + if self.population_statistics is not None: return self.population_statistics.maxf else: return np.max(self.primary_image.nparray) @@ -211,7 +240,7 @@ def max_popf(self) -> npt.NDArray[np.float_]: def min_popf(self) -> npt.NDArray[np.float_]: """Returns the minimum population float value, if it exists. Otherwise returns the minimum value for this instance's primary image.""" - if self.population_statistics != None: + if self.population_statistics is not None: return self.population_statistics.minf else: return np.min(self.primary_image.nparray) @@ -233,3 +262,11 @@ def get_fiducials_by_type( + f"found 0 fiducials matching type {fiducial_type.__name__} for image {self.best_primary_pathnameext}" ) return ret + + def __sizeof__(self) -> int: + """ + Get the size of this operable in memory including all primary images, + supporting images, and visualization images. + """ + all_images_size = sum([sys.getsizeof(img) for img in self.get_all_images()]) + return all_images_size diff --git a/opencsp/common/lib/cv/spot_analysis/image_processor/AbstractSpotAnalysisImageProcessor.py b/opencsp/common/lib/cv/spot_analysis/image_processor/AbstractSpotAnalysisImageProcessor.py index 1252fa6b2..c54da7628 100644 --- a/opencsp/common/lib/cv/spot_analysis/image_processor/AbstractSpotAnalysisImageProcessor.py +++ b/opencsp/common/lib/cv/spot_analysis/image_processor/AbstractSpotAnalysisImageProcessor.py @@ -1,6 +1,12 @@ from abc import abstractmethod import copy -from typing import Callable, Iterator, Union +import dataclasses +import os +import sys +from typing import Callable, Iterable, Iterator, Union + +import numpy as np +from PIL import Image from opencsp.common.lib.cv.CacheableImage import CacheableImage from opencsp.common.lib.cv.spot_analysis.ImagesIterable import ImagesIterable @@ -8,39 +14,221 @@ from opencsp.common.lib.cv.spot_analysis.SpotAnalysisOperable import SpotAnalysisOperable from opencsp.common.lib.cv.spot_analysis.SpotAnalysisOperablesStream import SpotAnalysisOperablesStream from opencsp.common.lib.cv.spot_analysis.SpotAnalysisImagesStream import SpotAnalysisImagesStream -from opencsp.common.lib.cv.spot_analysis.image_processor.AbstractSpotAnalysisImageProcessorLeger import ( - AbstractSpotAnalysisImagesProcessorLeger, -) +import opencsp.common.lib.opencsp_path.opencsp_root_path as orp +import opencsp.common.lib.tool.file_tools as ft import opencsp.common.lib.tool.log_tools as lt import opencsp.common.lib.tool.typing_tools as tt +image_processors_persistant_memory_total: int = 1 * pow(2, 30) # default total of 1GiB +""" The amount of system memory that image processors are allowed to retain +as cache between calls to their 'run()' method. The most recently used results +are prioritized for maining in memory. Default (1 GiB). """ + -class AbstractSpotAnalysisImagesProcessor(Iterator[SpotAnalysisOperable], AbstractSpotAnalysisImagesProcessorLeger): +class AbstractSpotAnalysisImagesProcessor(Iterator[SpotAnalysisOperable]): """Class to perform one step of image processing before spot analysis is performed. This is an abstract class. Implementations can be found in the same directory. To create a new implementation, inherit from one of the existing - implementations or this class. + implementations or this class. The most basic implementation need only + implement the _execute method:: + + def _execute(self, operable: SpotAnalysisOperable, is_last: bool) -> list[SpotAnalysisOperable]: + raise NotImplementedError() """ - def __init__(self, name: str): - AbstractSpotAnalysisImagesProcessorLeger.__init__(self, name) + def __init__(self, name: str = None): + """ + Parameters + ---------- + name : str, optional + The name to use for this image processor instance, or None to + default to the class name. By default None. + """ + + # set default parameter values + if name is None: + name = self.__class__.__name__ + + self._name = name + # Name of this instance, probably the class name. + self._num_images_processed: int = 0 + # The running total of the number of resulting images this instance + # produced, since the last time that inputs were assigned. + self.save_to_disk: int = False + """ True to save results to the hard drive instead of holding them in + memory. If False, then this is dynamically determined at runtime during + image processing based on image_processors_persistant_memory_total. """ + self.input_operables: Iterable[SpotAnalysisOperable] = None + """ The input iterable given to assign_inputs. """ + self.input_iter: Iterator[SpotAnalysisOperable] = None + """ Iterator over the input images. None until the iterator has been primed. """ + self.is_iterator_primed = False + """ True if __iter__ has been called since assign_inputs. False otherwise. """ + self._finished_processing = False + # True if we've finished iterating over all input images. This gets set + # when we get a StopIteration error from next(input_iter). + self.cached = False + """ True if we've ever cached the processed results of this processor to + disk since processing was started. """ + self._my_tmp_dir = None + # The directory where temporary images from this instance are saved to. + self._tmp_images_saved = 0 + # How many images have been saved by this instance since it was created. + self._clear_tmp_on_deconstruct = True + # If true, then delete all png images in _my_tmp_dir, and then also + # the directory if empty. self.next_item: SpotAnalysisOperable = None - """ The next fetched item from the input_iter, held in anticipation for the - following call to __next__(). None if the next item hasn't been fetched yet, - or we've reached the end of the input_iter. """ - self.inmem_inputs: list[SpotAnalysisOperable] = [] - """ Operables retrieved from from input_iter that have been _execute()'ed - on, but _execute() hasn't returned as many results as the number of times - that it's been called. In other words:: + """ + The next fetched item from the input_iter, held in anticipation for the + following call to __next__(). None if we haven't retrieved the next + value from the input_iter yet. + """ + self.operables_in_flight: list[SpotAnalysisOperable] = [] + """ + For most processors, _execute() will return one resultant operable for + each input operable. For these standard cases, this will contain one + value during the process_operable() method, and will be empty otherwise. - len(inmem_inputs) == num_executes - len(cummulative_processed_results) """ + Sometimes _execute() may return zero results. In this case, this value + will contain all the operables passed to _execute() since the last time + that _execute() returned at least one operable. These "in-flight" + operables are remembered so that history can be correctly assigned to + the resultant operables, once they become available. + """ self.results_on_deck: list[SpotAnalysisOperable] = [] - """ Sometimes _execute() may return multiple results. In this case, - we hold on to the processed operables and return only one of them per - iteration in __next__(). """ + """ + Sometimes _execute() may return multiple results. In this case, we hold + on to the processed operables and return only one of them per iteration + in __next__(). This gaurantees that each image processor in the chain + consumes and produces single images. + """ self._on_image_processed: list[Callable[[SpotAnalysisOperable]]] = [] - """ A list of callbacks to be evaluated when an image is finished processing. """ + # A list of callbacks to be evaluated when an image is finished processing. + + # initialize some of the state + self.assign_inputs([]) + + @property + def name(self) -> str: + """Name of this processor""" + return self._name + + @property + def _finished_processing(self): + return self.__finished_processing + + @_finished_processing.setter + def _finished_processing(self, val): + self.__finished_processing = val + + @property + def finished(self): + """True if we've finished iterating over all input images. This gets set + when we get a StopIteration error from next(input_iter).""" + return self.__finished_processing + + def assign_inputs( + self, + operables: Union[ + 'AbstractSpotAnalysisImagesProcessor', list[SpotAnalysisOperable], Iterator[SpotAnalysisOperable] + ], + ): + """ + Register the input operables to be processed either with the run() + method, or as an iterator. + + Parameters + ---------- + operables : Union[ AbstractSpotAnalysisImagesProcessor, list[SpotAnalysisOperable], Iterator[SpotAnalysisOperable] ] + The operables to be processed. + """ + # initialize the state for a new set of inputs + self.input_operables = operables + self._num_images_processed = 0 + self._finished_processing = False + self.next_item = None + self.is_iterator_primed = False + + # check for an empty list [], indicating no inputs + if isinstance(operables, list): + if len(operables) == 0: + self.input_iter = None + self._finished_processing = True + self.operables_in_flight.clear() + self.results_on_deck.clear() + + def register_processed_result(self, is_last: bool): + self._num_images_processed += 1 + if is_last: + self._finished_processing = True + + def cache_images_to_disk_as_necessary(self): + """Check memory usage and convert images to files (aka file path + strings) as necessary in order to reduce memory usage.""" + allowed_memory_footprint = image_processors_persistant_memory_total + CacheableImage.cache_images_to_disk_as_necessary(allowed_memory_footprint, self._get_tmp_path) + + def _get_save_dir(self): + # Finds a temporary directory to save to for the processed output images from this instance. + if self._my_tmp_dir == None: + scratch_dir = os.path.join(orp.opencsp_scratch_dir(), "spot_analysis_image_processing") + i = 0 + while True: + dirname = self.name + str(i) + self._my_tmp_dir = os.path.join(scratch_dir, dirname) + if not ft.directory_exists(self._my_tmp_dir): + try: + os.makedirs(self._my_tmp_dir) + break # success! + except FileExistsError: + # probably just created this directory in another thread + pass + else: + i += 1 + return self._my_tmp_dir + + def _get_tmp_path(self) -> str: + # Get the path+name+ext to save a cacheable image to, in our temporary + # directory, in numpy format. + # + # Returns: + # path_name_ext: str + # Where to save the image. + # get the path + path_name_ext = os.path.join(self._get_save_dir(), f"{self._tmp_images_saved}.npy") + self._tmp_images_saved += 1 + return path_name_ext + + def _save_image(self, im: CacheableImage, idx_list: list[int], dir: str, name_prefix: str = None, ext="jpg") -> str: + # Saves the given image to the given path. + # + # Parameters + # ---------- + # im : CacheableImage + # The image to be saved. + # idx_list : list[int] + # Length-1 list where idx_list[0] is the count of images saved with + # this method. Used for naming the saved images. This value is updated + # as part of the execution of this method. + # dir : str + # The directory to save the image to. + # name_prefix : str, optional + # A prefix to prepend to the image name, by default empty string + # ext : str, optional + # The extension/type to save the image with, by default "jpg" + # + # Returns + # ------- + # str + # The path/name.ext of the newly saved image. + idx = idx_list[0] + image_name = ("" if name_prefix == None else f"{name_prefix}_") + f"SA_preprocess_{self.name}{idx}" + image_path_name_ext = os.path.join(dir, image_name + "." + ext) + lt.debug("Saving SpotAnalysis processed image to " + image_path_name_ext) + im.to_image().save(image_path_name_ext) + idx_list[0] = idx + 1 + return image_path_name_ext def run( self, @@ -48,30 +236,46 @@ def run( ImagesIterable | ImagesStream | SpotAnalysisImagesStream - | Union['AbstractSpotAnalysisImagesProcessor', list[SpotAnalysisOperable], Iterator[SpotAnalysisOperable]] + | list[SpotAnalysisOperable] + | Iterator[SpotAnalysisOperable] + | Union['AbstractSpotAnalysisImagesProcessor'] ), - ) -> list[CacheableImage]: - """Performs image processing on the input images.""" + ) -> list[SpotAnalysisOperable]: + """ + Performs image processing on the input operables and returns the results. + + This is provided as a convenience method. The more typical way to use + this class is to create a SpotAnalysis instance, assign this image + processor to that instance, and then iterate over the results. + + See also: :py:meth:`process_images` as another convenience method. + + Parameters + ---------- + operables : ImagesIterable | ImagesStream | SpotAnalysisImagesStream | list[SpotAnalysisOperable] | Iterator[SpotAnalysisOperable] | Union[AbstractSpotAnalysisImagesProcessor] + The input operables to be processed. If these are images, then they + will be wrapped in a SpotAnalysisOperablesStream. + + Returns + ------- + list[SpotAnalysisOperable] + The resulting operables after processing. + """ if isinstance(operables, (ImagesIterable, ImagesStream)): operables = SpotAnalysisImagesStream(operables) if isinstance(operables, SpotAnalysisImagesStream): operables = SpotAnalysisOperablesStream(operables) self.assign_inputs(operables) - for result in self: - pass - return copy.copy(self.all_processed_results) + ret = [result for result in self] + return ret - def process_image(self, input_operable: SpotAnalysisOperable, is_last: bool = False) -> list[SpotAnalysisOperable]: + def process_operable( + self, input_operable: SpotAnalysisOperable, is_last: bool = False + ) -> list[SpotAnalysisOperable]: """Should probably not be called by external classes. Evaluate this instance as an iterator instead. Executes this instance's image processing on a single given input - primary image, with the supporting other images. If enough images have - been processed as to exceed this instance's memory limitations, then all - processed primary images will be stored to disk instead of being kept in - memory. The resulting processed images or paths to said images will be - recorded in self.cummulative_processed_results and can be accessed by - self.all_results after all input images have been processed (aka when - is_last=True). + primary image, with the supporting other images. When processed with the run() method, this function will be called for all input images. @@ -89,42 +293,84 @@ def process_image(self, input_operable: SpotAnalysisOperable, is_last: bool = Fa results : list[SpotAnalysisOperable] Zero, one, or more than one results from running image processing. """ - if self.cummulative_processed_results == None: - self.initialize_cummulative_processed_results() - - self.inmem_inputs.append(input_operable) - ret: list[SpotAnalysisOperable] = self._execute(input_operable, is_last) + self.operables_in_flight.append(input_operable) + + try: + ret: list[SpotAnalysisOperable] = self._execute(input_operable, is_last) + except Exception as ex: + lt.error( + "Error in AbstractSpotAnalysisImageProcessor.process_operable(): " + + f"encountered {ex.__class__.__name__} exception while processing image {input_operable.primary_image_source_path}" + ) + raise if not isinstance(ret, list): lt.error_and_raise( TypeError, - f"Error in AbstractSpotAnalysisImageProcessor.process_image() ({self.name}): " + f"Error in AbstractSpotAnalysisImageProcessor.process_operable() ({self.name}): " + f"_execute() should return a list[SpotAnalysisOperable] but instead returned a {type(ret)}", ) for operable in ret: if not isinstance(operable, SpotAnalysisOperable): lt.error_and_raise( TypeError, - f"Error in AbstractSpotAnalysisImageProcessor.process_image() ({self.name}): " + f"Error in AbstractSpotAnalysisImageProcessor.process_operable() ({self.name}): " + f"expected return value from _execute() to be list[SpotAnalysisOperable] but is instead list[{type(operable)}]!", ) # record processed results - for result in ret: - self.register_processed_result(result, is_last) + if is_last: + self.register_processed_result(is_last) # execute any registered callbacks for operable in ret: for callback in self._on_image_processed: callback(operable) + # de-register any operable on which we're waiting for results + if len(ret) > 0 or is_last: + self.operables_in_flight.clear() + # release memory by cacheing images to disk - for operable in ret: - # release one per returned value - self.cache_image_to_disk_as_necessary(self.inmem_inputs.pop(0)) - if is_last: - # release all - while len(self.inmem_inputs) > 0: - self.cache_image_to_disk_as_necessary(self.inmem_inputs.pop(0)) + self.cache_images_to_disk_as_necessary() + + return ret + + def process_images(self, images: list[CacheableImage | np.ndarray | Image.Image]) -> list[CacheableImage]: + """ + Processes the given images with this processor and returns 0, 1, or more + than 1 resulting images. + + This method is provided for convenience, to allow for use of the spot + analysis image processors as if they were simple functions. The + following is an example of a more standard use of an image processor:: + + processors = [ + EchoImageProcessor(), + LogScaleImageProcessor(), + FalseColorImageProcessor() + ] + spot_analysis = SpotAnalysis("Log Scale Images", processors) + spot_analysis.set_primary_images(images) + results = [result for result in spot_analysis] + + Parameters + ---------- + images : list[CacheableImage | np.ndarray | Image.Image] + The images to be processed. + + Returns + ------- + list[CacheableImage] + The resulting images after processing. + """ + # import here to avoid cyclic imports + from opencsp.common.lib.cv.SpotAnalysis import SpotAnalysis + + spot_analysis = SpotAnalysis(self.name, [self]) + spot_analysis.set_primary_images(images) + ret: list[CacheableImage] = [] + for result in spot_analysis: + ret += result.get_all_images(supporting=False) return ret @@ -132,7 +378,7 @@ def process_image(self, input_operable: SpotAnalysisOperable, is_last: bool = Fa def _execute(self, operable: SpotAnalysisOperable, is_last: bool) -> list[SpotAnalysisOperable]: """Evaluate an input primary image (and other images/data), and generate the output processed image(s) and data. - The actual image processing method. Called from process_image(). + The actual image processing method. Called from process_operable(). Parameters ---------- @@ -163,48 +409,58 @@ def _execute(self, operable: SpotAnalysisOperable, is_last: bool) -> list[SpotAn """ pass + def __len__(self) -> int: + """ + Get the current number of processed output images from this instance. + + This number will potentially be increasing with every call to + process_operable() or _execute(). + + This will return 0 immediately after a call to assign_inputs(). + """ + return self._num_images_processed + def __iter__(self): - if self.all_processed_results != None: + if not self.is_iterator_primed: + # Operables have been assigned via assign_inputs(), but we haven't + # started iterating yet. We need to prime self.input_iter and + # self.next_item before returning. + try: + input_iter: Iterator[SpotAnalysisOperable] = iter(self.input_operables) + self.input_iter = input_iter + self.is_iterator_primed = True + return iter(self) + except StopIteration: + self.assign_inputs([]) + raise + elif self.finished: # We must have already finished processing all input images, either # through the run() method or by simply having iterated through them # all. - if not self.finished_processing: - lt.error_and_raise( - RuntimeError, - f"Programmer error in AbstractSpotAnalaysisImageProcess.__iter__: " - + f"self.all_processed_results != None but {self.finished_processing=}!", - ) - raise StopIteration + return self + elif self.input_iter != None: + # We must be iterating through the input images already. + return self else: - if self.input_iter != None: - # We must be iterating through the input images already. - return self - else: - # We must be iterating through the input images and haven't - # started processing them yet. - # Or, we're restarting iteration. - self.assign_inputs(self._original_operables) # initializes the leger - self.input_iter = iter(self._original_operables) - self.inmem_inputs = [] - try: - self.next_item = next(self.input_iter) - except StopIteration: - self.next_item = {} - self.all_processed_results = [] - self.finished_processing = True - return self + lt.error_and_raise( + RuntimeError, + "Error in AbstractSpotAnalysisImageProcessor.__iter__(): " + + "unexpected state encountered, " + + "expected is_iterator_primed to be False, finished to be True, or input_iter to be set, but " + + f"{self.is_iterator_primed=}, {self.finished=}, and {self.input_iter=}", + ) def __next__(self): """Get the next processed image and it's support images and data. Since this is only utilized when processing as an iterator instead of by using the run() method, then calling this method will cause one or more input image(s) to be fetched so that it can be executed upon.""" + if not self.has_next(): + raise StopIteration + # Check if we already have a result from a previous iteration staged and ready to be returned if len(self.results_on_deck) > 0: return self.results_on_deck.pop(0) - else: - if self.finished_processing: - raise StopIteration # Process the next image. # @@ -217,36 +473,36 @@ def __next__(self): is_last = False output_operables: list[SpotAnalysisOperable] = [] while not is_last: - # Get the the values for: - # - the current input operable - # - the input operable for the next cycle - # - and the value of is_last + # Get the operable to be processed + self.fetch_input_operable() input_operable = self.next_item + self.next_item = None if input_operable == None: lt.error_and_raise( RuntimeError, f"Programmer error in AbstractSpotAnalysisImageProcessor.__next__() ({self.name}): " + "input_operable should never be None but it is!", ) - try: - self.next_item = next(self.input_iter) - except StopIteration: - self.next_item = None - is_last = True - output_operables = self.process_image(input_operable, is_last) + # Determine if this is the last operable we're going to be receiving + is_last = not self.has_next() + + # Process the input operable and get the output results + output_operables = self.process_operable(input_operable, is_last) + + # Stop if we have results to return if len(output_operables) > 0: break if is_last: - self.finished_processing = True + self._finished_processing = True # Did execute return any results? if len(output_operables) == 0: if not is_last: lt.error_and_raise( RuntimeError, - f"Programmer error in SpotAnalysisAbstractImagesProcessor.__next__() ({self.name}): " + f"Programmer error in AbstractSpotAnalysisImagesProcessor.__next__() ({self.name}): " + "as long as there are input image available (aka is_last is False) we should keep executing until " + f"at least one result becomes available, but {is_last=} and {len(output_operables)=}", ) @@ -263,6 +519,64 @@ def __next__(self): return ret + def has_next(self) -> bool: + """ + Returns True if this image processor will return another result when + __next__() is called, or False if it won't. This might result in a call + to the prior image processor's __next__() method. + """ + if self.finished: + return False + if len(self.results_on_deck) > 0: + return True + if (self.input_operables is None) or ( + isinstance(self.input_operables, list) and len(self.input_operables) == 0 + ): + return False + if self.next_item is not None: + return True + if hasattr(self.input_operables, "has_next"): + return self.input_operables.has_next() + + # We tried every other method of determining if there is a next value + # available from the input iterable. The only possible way left to + # determine if there is a next value available is to try and retrieve + # it. + if not self.is_iterator_primed: + iter(self) # primes the iterator + try: + self.fetch_input_operable() + except StopIteration: + pass + return self.next_item is not None + + def fetch_input_operable(self): + """ + Retrieves the operable to operate on. Populates self.next_item. + + The input operable might be the prefetched self.next_item or it might + need to be requested as the next result from the input_iter. + + Raises + ------ + StopIteration: + The input_iter doesn't have any more results available + """ + # check for invalid state + if self.finished: + lt.error_and_raise( + RuntimeError, + "Error in AbstractSpotAnalysisImageProcessor.get_input_operable() ({self.name}): " + + "Trying to retrieve an operable in an invalid state.", + ) + + # get the operable, as necessary + if self.next_item is None: + try: + self.next_item = next(self.input_iter) + except StopIteration: + raise + @tt.strict_types def get_processed_image_save_callback( self, dir: str, name_prefix: str = None, ext="jpg" @@ -285,6 +599,6 @@ def on_image_processed(self, callback: Callable[[SpotAnalysisOperable], None]): ---------- callback : Callable[[SpotAnalysisOperable], None] The function to be evaluated. Requires one input, which will be the - result from process_image(). + result from process_operable(). """ self._on_image_processed.append(callback) diff --git a/opencsp/common/lib/cv/spot_analysis/image_processor/AbstractSpotAnalysisImageProcessorLeger.py b/opencsp/common/lib/cv/spot_analysis/image_processor/AbstractSpotAnalysisImageProcessorLeger.py deleted file mode 100644 index bab8ad403..000000000 --- a/opencsp/common/lib/cv/spot_analysis/image_processor/AbstractSpotAnalysisImageProcessorLeger.py +++ /dev/null @@ -1,254 +0,0 @@ -from abc import ABC -from collections.abc import Sized -import copy -import os -from typing import Iterator, Union -import sys - -from opencsp.common.lib.cv.CacheableImage import CacheableImage -from opencsp.common.lib.cv.spot_analysis.SpotAnalysisOperable import SpotAnalysisOperable -import opencsp.common.lib.opencsp_path.opencsp_root_path as orp -import opencsp.common.lib.tool.log_tools as lt -import opencsp.common.lib.tool.file_tools as ft - -image_processors_persistant_memory_total: int = 4 * pow(2, 30) # default total of 4GB -""" The amount of static memory that image processors are allowed to retain -as cache between calls to their 'run()' method. They always keep the most recent -results in memory, up until the data has been used by the next image processor -in the pipeline. """ - - -class AbstractSpotAnalysisImagesProcessorLeger(ABC, Sized): - """Holds all the data flowing through an - AbstractSpotAnalysisImageProcessor, broken out so as to reduce that class's - complexity.""" - - def __init__(self, name: str): - self._name = name - """ Name of this instance, probably the class name. """ - self._original_operables: Iterator[SpotAnalysisOperable] = None - """ The input images to be processed, as given in the run() method. """ - self.all_processed_results: list[SpotAnalysisOperable] = None - """ The results from evaluating this instance. None if processing hasn't - finished yet. """ - self.cummulative_processed_results: list[SpotAnalysisOperable] = None - """ The results currently available from evaluating this instance. None if - processing hasn't started yet. """ - self._allowed_memory_footprint: int = None - """ How much memory this instance is allowed to consume while idle. This - affects whether primary and supporting images are held in memory as - numpy arrays, or if they are saved to disk and returned as file path - strings. """ - self.save_to_disk: int = False - """ True to save results to the hard drive instead of holding them in - memory. Dynamically determined at runtime during image processing based on - self._allowed_memory_footprint. """ - self.input_iter: Iterator[SpotAnalysisOperable] = None - """ Iterator over the input images. """ - self.finished_processing = False - """ True if we've finished iterating over all input images. This gets set - when we get a StopIteration error from next(input_iter). """ - self.cached = False - """ True if we've ever cached the processed results of this processor to - disk since processing was started. """ - self._my_tmp_dir = None - """ The directory where temporary images from this instance are saved to. """ - self._tmp_images_saved = 0 - """ How many images have been saved by this instance since it was created. """ - self._clear_tmp_on_deconstruct = True - """ If true, then delete all png images in _my_tmp_dir, and then also - the directory if empty. """ - - self.assign_inputs([]) - - def __sizeof__(self) -> int: - if self.cached: - return 0 - elif len(self.cummulative_processed_results) == 0: - return 0 - else: - return sys.getsizeof(self.cummulative_processed_results[0]) * len(self.cummulative_processed_results) - - def __del__(self): - # delete cached numpy files - if ft.directory_exists(self._get_tmp_path()): - ft.delete_files_in_directory(self._get_tmp_path(), "*.npy", error_on_dir_not_exists=False) - if ft.directory_is_empty(self._get_tmp_path()): - os.rmdir(self._get_tmp_path()) - - # delete output png files - if self._my_tmp_dir != None: - if self._clear_tmp_on_deconstruct: - ft.delete_files_in_directory(self._my_tmp_dir, "*.png", error_on_dir_not_exists=False) - if ft.directory_is_empty(self._my_tmp_dir): - os.rmdir(self._my_tmp_dir) - self._my_tmp_dir = None - - @property - def name(self) -> str: - """Name of this processor""" - return self._name - - @property - def input_operable(self) -> CacheableImage | None: - """The first of the input operables that was given to this instance - before it did any image processing.""" - originals = self.input_operables - if originals == None or len(originals) == 0: - return None - return originals[0] - - @property - def input_operables(self) -> list[SpotAnalysisOperable] | None: - """The input operables that were given to this instance before it did - any image processing. None if the input wasn't a list or image processor - type.""" - if isinstance(self._original_operables, AbstractSpotAnalysisImagesProcessorLeger): - predecessor: AbstractSpotAnalysisImagesProcessorLeger = self._original_operables - if predecessor.finished: - return predecessor.all_results - elif isinstance(self._original_operables, list): - return copy.copy(self._original_operables) - return None - - @property - def finished(self): - if not (self.finished_processing == (self.all_processed_results == None)): - lt.error_and_raise( - RuntimeError, - f"Programmer error in AbstractSpotAnalysisImageProcessor.finished: {self.finished_processing=} but {len(self.all_processed_results)=}", - ) - return self.finished_processing - - @property - def all_results(self): - """Returns the list of resultant output images from this instance's image processing. - - Raises - ------- - RuntimeError - If finished == False - """ - if not self.finished_processing: - lt.error_and_raise( - RuntimeError, - "Can't get the list of processed images from this instance until all input images have been processed.", - ) - return copy.copy(self.all_processed_results) - - def assign_inputs( - self, - operables: Union[ - 'AbstractSpotAnalysisImagesProcessorLeger', list[SpotAnalysisOperable], Iterator[SpotAnalysisOperable] - ], - ): - """Register the input operables to be processed either with the run() method, or as an iterator.""" - self._original_operables = operables - self.all_processed_results: list[SpotAnalysisOperable] = None - self.cummulative_processed_results: list[SpotAnalysisOperable] = None - self.input_iter: Iterator[SpotAnalysisOperable] = None - self.finished_processing = False - - def initialize_cummulative_processed_results(self): - if self.cummulative_processed_results != None and len(self.cummulative_processed_results) > 0: - lt.error_and_raise( - RuntimeError, - f"Programmer error: initialized cummulative_processed_results at incorrect time. There are current {len(self.cummulative_processed_results)} in-flight results when there should be 0.", - ) - self.cummulative_processed_results = [] - - def register_processed_result(self, operable: SpotAnalysisOperable, is_last: bool): - # remember this processed result so that we can reference it again during a later computation - self.cummulative_processed_results.append(operable) - if is_last: - self.all_processed_results = copy.copy(self.cummulative_processed_results) - - def cache_image_to_disk_as_necessary(self, operable: SpotAnalysisOperable): - """Check memory usage and convert images to files (aka file path - strings) as necessary in order to reduce memory usage.""" - # import here to avoid import loops, since AbstractSpotAnalysisImageProcessor inherits from this class - from opencsp.common.lib.cv.spot_analysis.image_processor.AbstractSpotAnalysisImageProcessor import ( - AbstractSpotAnalysisImagesProcessor, - ) - - total_mem_size = sys.getsizeof(operable) * 2 + sys.getsizeof(self) - allowed_memory_footprint = image_processors_persistant_memory_total - if self._allowed_memory_footprint != None: - allowed_memory_footprint = self._allowed_memory_footprint - - if (total_mem_size > allowed_memory_footprint) or (self.save_to_disk): - image_processor = self - if isinstance(self.input_iter, AbstractSpotAnalysisImagesProcessor): - image_processor = self.input_iter - - operable.primary_image.cache(image_processor._get_tmp_path()) - if not self.cached: - for result in self.cummulative_processed_results: - result.primary_image.cache(image_processor._get_tmp_path()) - self.cached = True - - def _get_save_dir(self): - """Finds a temporary directory to save to for the processed output images from this instance.""" - if self._my_tmp_dir == None: - scratch_dir = os.path.join(orp.opencsp_scratch_dir(), "spot_analysis_image_processing") - i = 0 - while True: - dirname = self.name + str(i) - self._my_tmp_dir = os.path.join(scratch_dir, dirname) - if not ft.directory_exists(self._my_tmp_dir): - try: - os.makedirs(self._my_tmp_dir) - break # success! - except FileExistsError: - # probably just created this directory in another thread - pass - else: - i += 1 - return self._my_tmp_dir - - def _get_tmp_path(self) -> str: - """Get the path+name+ext to save a cacheable image to, in our temporary - directory, in numpy format. - - Returns: - path_name_ext: str - Where to save the image. - """ - # get the path - path_name_ext = os.path.join(self._get_save_dir(), f"{self._tmp_images_saved}.npy") - self._tmp_images_saved += 1 - return path_name_ext - - def __len__(self) -> int: - """Get the number of processed output images from this instance. - - Raises - ------- - RuntimeError - If the input images haven't finished being processed yet. - """ - if self.all_processed_results != None: - return len(self.all_processed_results) - lt.error_and_raise( - RuntimeError, "Can't get the length of this instance until all input images have been processed." - ) - - def save_processed_images(self, dir: str, name_prefix: str = None, ext="jpg"): - """Saves the processed images to the given directory with the file name - "[name_prefix+'_']SA_preprocess_[self.name][index].[ext]". If this - instance is being used as an image stream, then use - on_image_processed(get_processed_image_save_callback()) instead. - - This method is designed to be used as a callback with self.on_image_processed(). - """ - for idx, operable in enumerate(self.all_processed_results): - self._save_image(operable.primary_image, [idx], dir, name_prefix, ext) - - def _save_image(self, im: CacheableImage, idx_list: list[int], dir: str, name_prefix: str = None, ext="jpg"): - idx = idx_list[0] - image_name = ("" if name_prefix == None else f"{name_prefix}_") + f"SA_preprocess_{self.name}{idx}" - image_path_name_ext = os.path.join(dir, image_name + "." + ext) - lt.debug("Saving SpotAnalysis processed image to " + image_path_name_ext) - im.to_image().save(image_path_name_ext) - idx_list[0] = idx + 1 - return image_path_name_ext diff --git a/opencsp/common/lib/cv/spot_analysis/image_processor/test/data/input/AbstractSpotAnalysisImageProcessor/example_image.npy b/opencsp/common/lib/cv/spot_analysis/image_processor/test/data/input/AbstractSpotAnalysisImageProcessor/example_image.npy new file mode 100644 index 000000000..cd6de2a0a Binary files /dev/null and b/opencsp/common/lib/cv/spot_analysis/image_processor/test/data/input/AbstractSpotAnalysisImageProcessor/example_image.npy differ diff --git a/opencsp/common/lib/cv/spot_analysis/image_processor/test/test_AbstractSpotAnalysisImageProcessor.py b/opencsp/common/lib/cv/spot_analysis/image_processor/test/test_AbstractSpotAnalysisImageProcessor.py new file mode 100644 index 000000000..e7cc15ab6 --- /dev/null +++ b/opencsp/common/lib/cv/spot_analysis/image_processor/test/test_AbstractSpotAnalysisImageProcessor.py @@ -0,0 +1,224 @@ +import copy +import dataclasses +import random +import unittest + +import numpy as np + +import opencsp.common.lib.cv.CacheableImage as ci +import opencsp.common.lib.cv.spot_analysis.image_processor.AbstractSpotAnalysisImageProcessor as asaip +import opencsp.common.lib.cv.spot_analysis.SpotAnalysisOperable as sao +import opencsp.common.lib.tool.file_tools as ft +import opencsp.common.lib.tool.log_tools as lt + + +class DoNothingImageProcessor(asaip.AbstractSpotAnalysisImagesProcessor): + def _execute(self, operable: sao.SpotAnalysisOperable, is_last: bool) -> list[sao.SpotAnalysisOperable]: + return [operable] + + +class SetOnesImageProcessor(asaip.AbstractSpotAnalysisImagesProcessor): + def _execute(self, operable: sao.SpotAnalysisOperable, is_last: bool) -> list[sao.SpotAnalysisOperable]: + img = copy.copy(operable.primary_image.nparray) + img[:, :] = 1 + ret = dataclasses.replace(operable, primary_image=img) + return [ret] + + +class test_AbstractSpotAnalysisImageProcessor(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + path, name, _ = ft.path_components(__file__) + cls.in_dir = ft.join(path, 'data/input', name.split('test_')[-1]) + cls.out_dir = ft.join(path, 'data/output', name.split('test_')[-1]) + ft.create_directories_if_necessary(cls.out_dir) + ft.delete_files_in_directory(cls.out_dir, '*') + return super().setUpClass() + + def setUp(self) -> None: + self.test_name = self.id().split('.')[-1] + + self.example_cache_path = ft.join(self.in_dir, "example_image.npy") + self.cacheable_image = ci.CacheableImage(cache_path=self.example_cache_path) + self.example_operable = sao.SpotAnalysisOperable(self.cacheable_image) + + self.example_operables: list[sao.SpotAnalysisOperable] = [] + self.num_example_operables = random.randint(0, 10) + for i in range(self.num_example_operables): + ci_i = ci.CacheableImage(cache_path=self.example_cache_path) + sao_i = sao.SpotAnalysisOperable(ci_i) + self.example_operables.append(sao_i) + + self.example_operables_gte2: list[sao.SpotAnalysisOperable] = [] + self.num_example_operables_gte2 = self.num_example_operables + 2 + for i in range(self.num_example_operables_gte2): + ci_i = ci.CacheableImage(cache_path=self.example_cache_path) + sao_i = sao.SpotAnalysisOperable(ci_i) + self.example_operables_gte2.append(sao_i) + + def test_name(self): + """ + Verify that the auto-assigned name is the class name, and that it can be + overwritten with a specific name. + """ + try: + instance = DoNothingImageProcessor() + self.assertEqual(instance.name, "DoNothingImageProcessor") + instance = DoNothingImageProcessor("Other Name") + self.assertEqual(instance.name, "Other Name") + + except: + lt.error( + "Error in test_AbstractSpotAnalysisImageProcessor.test_name(): " + + f"failed for operables {self.example_operables=}" + ) + raise + + def test_finished(self): + """ + Verify that finished is True only when no images have been assigned, or + when all images have been processed. + """ + try: + # 0 + instance = DoNothingImageProcessor() + self.assertTrue(instance.finished) + + # 1 + instance.assign_inputs([self.example_operable]) + self.assertFalse(instance.finished) + for result in instance: + pass + self.assertTrue(instance.finished) + + # > 1 + instance.assign_inputs(self.example_operables_gte2) + for result in instance: + pass + self.assertTrue(instance.finished) + + except: + lt.error( + "Error in test_AbstractSpotAnalysisImageProcessor.test_finished(): " + + f"failed for operables {self.example_operables_gte2=}" + ) + raise + + def test_0_operables(self): + """Verify that we see the expected behavior when attempting to run with 0 input.""" + instance = DoNothingImageProcessor() + nprocessed = 0 + + # test assignment + instance.assign_inputs([]) + + # test processing + for result in instance: + nprocessed += 1 + self.assertEqual(0, nprocessed) + + # test running + results = instance.run([]) + self.assertEqual(len(results), 0) + + def test_iterator_finishes_all(self): + try: + instance = DoNothingImageProcessor() + nprocessed = 0 + + # test with an assignment of a few operables + instance.assign_inputs(self.example_operables) + for result in instance: + nprocessed += 1 + self.assertEqual(nprocessed, self.num_example_operables) + + # test with an assignment of an additional "two" operables + instance.assign_inputs([self.example_operable, self.example_operable]) + for result in instance: + nprocessed += 1 + self.assertEqual(nprocessed, self.num_example_operables + 2) + + except: + lt.error( + "Error in test_AbstractSpotAnalysisImageProcessor.test_iterator_finishes_all(): " + + f"failed for operables {self.example_operables=}" + ) + raise + + def test_run(self): + """Verify that run() touches all the operables""" + try: + # sanity check: no pixels are equal to 1 + for operable in self.example_operables: + self.assertTrue(np.all(operable.primary_image.nparray != 1)) + + # process all images + instance = SetOnesImageProcessor() + results = instance.run(self.example_operables) + + # verify the input operables haven't been touched + for operable in self.example_operables: + self.assertTrue(np.all(operable.primary_image.nparray != 1)) + + # verify all pixels in the new operables are equal to 1 + for operable in results: + self.assertTrue(np.all(operable.primary_image.nparray == 1)) + + except: + lt.error( + "Error in test_AbstractSpotAnalysisImageProcessor.test_run(): " + + f"failed for operables {self.example_operables=}" + ) + raise + + def test_process_operable(self): + """Verify that process_operable() updates the pixels""" + try: + for operable in self.example_operables: + # sanity check: no pixels are equal to 1 + self.assertTrue(np.all(operable.primary_image.nparray != 1)) + + # process the operable + instance = SetOnesImageProcessor() + result = instance.process_operable(operable, is_last=True) + + # verify the input operable hasn't been touched + self.assertTrue(np.all(operable.primary_image.nparray != 1)) + + # verify all pixels in the new operable are equal to 1 + self.assertTrue(np.all(result[0].primary_image.nparray == 1)) + + except: + lt.error( + "Error in test_AbstractSpotAnalysisImageProcessor.test_process_operable(): " + + f"failed for operables {self.example_operables=}" + ) + raise + + def test_process_images(self): + """Verify that process_images() updates the pixels""" + try: + for operable in self.example_operables: + # sanity check: no pixels are equal to 1 + self.assertTrue(np.all(operable.primary_image.nparray != 1)) + + # process the image + instance = SetOnesImageProcessor() + result = instance.process_images([operable.primary_image]) + + # verify the input image hasn't been touched + self.assertTrue(np.all(operable.primary_image.nparray != 1)) + + # verify all pixels in the new image are equal to 1 + self.assertTrue(np.all(result[0].nparray == 1)) + + except: + lt.error( + "Error in test_AbstractSpotAnalysisImageProcessor.test_process_images(): " + + f"failed for operables {self.example_operables=}" + ) + raise + + +if __name__ == '__main__': + unittest.main() diff --git a/opencsp/common/lib/cv/test/data/input/CacheableImage/example_image.npy b/opencsp/common/lib/cv/test/data/input/CacheableImage/example_image.npy new file mode 100644 index 000000000..cd6de2a0a Binary files /dev/null and b/opencsp/common/lib/cv/test/data/input/CacheableImage/example_image.npy differ diff --git a/opencsp/common/lib/cv/test/data/input/CacheableImage/example_image.png b/opencsp/common/lib/cv/test/data/input/CacheableImage/example_image.png new file mode 100644 index 000000000..097cc85ca Binary files /dev/null and b/opencsp/common/lib/cv/test/data/input/CacheableImage/example_image.png differ diff --git a/opencsp/common/lib/cv/test/test_CacheableImage.py b/opencsp/common/lib/cv/test/test_CacheableImage.py new file mode 100644 index 000000000..26955c040 --- /dev/null +++ b/opencsp/common/lib/cv/test/test_CacheableImage.py @@ -0,0 +1,357 @@ +import sys +import unittest + +import numpy as np + +from opencsp.common.lib.cv.CacheableImage import CacheableImage +import opencsp.common.lib.tool.file_tools as ft +import opencsp.common.lib.tool.image_tools as it +import opencsp.common.lib.tool.log_tools as lt + + +class test_CacheableImage(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + path, name, _ = ft.path_components(__file__) + cls.in_dir = ft.join(path, 'data/input', name.split('test_')[-1]) + cls.out_dir = ft.join(path, 'data/output', name.split('test_')[-1]) + ft.create_directories_if_necessary(cls.in_dir) + ft.create_directories_if_necessary(cls.out_dir) + ft.delete_files_in_directory(cls.out_dir, '*') + return super().setUpClass() + + def setUp(self) -> None: + self.test_name = self.id().split('.')[-1] + + # example image with color quadrants r, g, b, y + self.example_array = np.zeros((40, 40, 3), dtype=np.uint8) + self.example_array[:20, :20, 0] = 255 + self.example_array[20:, :20, 1] = 255 + self.example_array[:20, 20:, 2] = 255 + self.example_array[20:, 20:, :2] = 255 + + # same as the example image, but as a numpy file + self.example_cache_path = ft.join(self.in_dir, "example_image.npy") + + # same as the example image, but as an image file + self.example_source_path = ft.join(self.in_dir, "example_image.png") + + # non-existant values + self.noexist_cache_path = ft.join(self.out_dir, "noexist.npy") + self.noexist_source_path = ft.join(self.out_dir, "noexist.png") + + # de-register all cacheable images + while CacheableImage.lru() is not None: + pass + + def test_init_valid(self): + """Test all valid combinations of CacheableImage constructor parameters.""" + # fmt: off + valid_combinations = [ + [ self.example_array, None, None ], + [ self.example_array, self.example_cache_path, None ], + [ self.example_array, self.example_cache_path, self.example_source_path ], + [ self.example_array, self.example_cache_path, self.noexist_source_path ], + [ self.example_array, self.noexist_cache_path, None ], + [ self.example_array, self.noexist_cache_path, self.example_source_path ], + [ self.example_array, self.noexist_cache_path, self.noexist_source_path ], + [ self.example_array, None, self.example_source_path ], + [ self.example_array, None, self.noexist_source_path ], + [ None, self.example_cache_path, None ], + [ None, self.example_cache_path, self.example_source_path ], + [ None, self.example_cache_path, self.noexist_source_path ], + [ None, self.noexist_cache_path, self.example_source_path ], + [ None, None, self.example_source_path ], + ] + # fmt: on + + for valid_combination in valid_combinations: + try: + CacheableImage(*valid_combination) + except Exception: + lt.error( + f"Encountered exception in {self.test_name} with the following valid combination of constructor parameters:\n" + + f"\tarray = {type(valid_combination[0])}\n" + + f"\tcache_path = {valid_combination[1]}\n" + + f"\tsource_path = {valid_combination[2]}\n" + ) + raise + + def test_init_invalid(self): + """Test all invalid combinations of CacheableImage constructor parameters.""" + with self.assertRaises(ValueError): + CacheableImage(None, None, None) + + # fmt: off + invalid_combinations = [ + [ None, None, self.noexist_source_path ], + [ None, self.noexist_cache_path, None ], + [ None, self.noexist_cache_path, self.noexist_source_path ], + ] + # fmt: on + + for invalid_combination in invalid_combinations: + with self.assertRaises(FileNotFoundError): + CacheableImage(*invalid_combination) + lt.error( + f"Expected exception in {self.test_name} for the following invalid combination of constructor parameters:\n" + + f"\tarray = {type(invalid_combination[0])}\n" + + f"\tcache_path = {invalid_combination[1]}\n" + + f"\tsource_path = {invalid_combination[2]}\n" + ) + + def test_size(self): + """ + Verifies that the size() built-in returns the correct value, and that + the sum of all CacheableImages returns the correct value. + """ + # cacheable images exist from other tests, include their sizes as well + existing_sizes = CacheableImage.all_cacheable_images_size() + + implementation_overhead = 48 + # Note that this number is stemming from the CPython implementation of + # objects and the memory those objects require for their book-keeping. + # Therefore this number could be different on every system and could + # also change with python versions. + + delta = 2 * implementation_overhead + # Also, we don't actually care that much what the + # implementation-specific number is, so let's just make the buffer a + # little bigger. + + # one cacheable image + ci1 = CacheableImage(self.example_array, None, self.example_source_path) + # sys.getsizeof(ci1): 4976 + example_image = None + self.assertAlmostEqual(sys.getsizeof(ci1), sys.getsizeof(self.example_array), delta=delta) + self.assertAlmostEqual( + sys.getsizeof(ci1), + 4800, + delta=1000, + msg="Sanity check that the memory usage is roughly proportional to the size of the image failed.", + ) + self.assertAlmostEqual( + sys.getsizeof(ci1), CacheableImage.all_cacheable_images_size() - existing_sizes, delta=delta + ) + example_image = ci1.to_image() + # sys.getsizeof(ci1): 9808 + self.assertAlmostEqual( + sys.getsizeof(ci1), sys.getsizeof(self.example_array) + it.getsizeof_approx(example_image), delta=delta + ) + self.assertAlmostEqual( + sys.getsizeof(ci1), + 9600, + delta=2000, + msg="Sanity check that the memory usage is roughly proportional to the size of the image failed.", + ) + self.assertAlmostEqual( + sys.getsizeof(ci1), CacheableImage.all_cacheable_images_size() - existing_sizes, delta=delta + ) + + # multiple cacheable images + ci2 = CacheableImage(self.example_array, None, self.example_source_path) + ci3 = CacheableImage(self.example_array, None, self.example_source_path) + ci2.to_image() + ci3.to_image() + # CacheableImage.all_cacheable_images_size(): 29424 + self.assertAlmostEqual( + sys.getsizeof(ci1) * 3, CacheableImage.all_cacheable_images_size() - existing_sizes, delta=delta + ) + + def test_cache(self): + """Test all valid combinations of CacheableImage constructor parameters.""" + default_cache_file = ft.join(self.out_dir, f"{self.test_name}.npy") + noexist_cache_file = ft.join(self.out_dir, f"{self.test_name}_no_exist.npy") + + # fmt: off + valid_combinations = [ + # np array, .np cache file, .png source file, expected .np cache file + [ self.example_array, None, None, default_cache_file ], + [ self.example_array, self.example_cache_path, None, self.example_cache_path ], + [ self.example_array, None, self.noexist_source_path, default_cache_file ], + [ self.example_array, self.example_cache_path, self.example_source_path, None ], + [ self.example_array, noexist_cache_file, None, noexist_cache_file ], + [ self.example_array, self.example_cache_path, self.noexist_source_path, None ], + [ self.example_array, noexist_cache_file, self.example_source_path, None ], + [ self.example_array, noexist_cache_file, self.noexist_source_path, noexist_cache_file ], + [ self.example_array, None, self.example_source_path, None ], + [ None, self.example_cache_path, None, None ], + [ None, self.example_cache_path, self.example_source_path, None ], + [ None, self.example_cache_path, self.noexist_source_path, None ], + [ None, noexist_cache_file, self.example_source_path, None ], + [ None, None, self.example_source_path, None ], + ] + # fmt: on + + for valid_combination in valid_combinations: + # setup + err_msg = ( + f"Error encountered in {self.test_name} with the following valid combination of constructor parameters:\n" + + f"\tarray = {type(valid_combination[0])}\n" + + f"\tcache_path = {valid_combination[1]}\n" + + f"\tsource_path = {valid_combination[2]}\n" + ) + + try: + # setup + should_create_cache_file = valid_combination[3] + valid_combination = valid_combination[:3] + ft.delete_file(default_cache_file, error_on_not_exists=False) + ft.delete_file(noexist_cache_file, error_on_not_exists=False) + + # create the cacheable image + cacheable = CacheableImage(*valid_combination) + + # check memory usage + cacheable.nparray + cacheable.to_image() + self.assertGreaterEqual(sys.getsizeof(cacheable), sys.getsizeof(self.example_array), msg=err_msg) + + # verify that cacheing works + self.assertFalse(ft.file_exists(default_cache_file), msg=err_msg) + cacheable.cache(default_cache_file) + if should_create_cache_file is not None: + self.assertTrue(ft.file_exists(should_create_cache_file), msg=err_msg) + else: + self.assertFalse(ft.file_exists(default_cache_file), msg=err_msg) + self.assertFalse(ft.file_exists(noexist_cache_file), msg=err_msg) + + # check memory usage + self.assertAlmostEqual(0, sys.getsizeof(cacheable), delta=cacheable._expected_cached_size, msg=err_msg) + + # verify that loading from the cache works + uncached_array = cacheable.nparray + self.assertGreaterEqual(sys.getsizeof(cacheable), sys.getsizeof(self.example_array), msg=err_msg) + np.testing.assert_array_equal(self.example_array, uncached_array) + + # cache and delete the cache file + # loading from the cache should fail + cacheable.cache(default_cache_file) + if should_create_cache_file is not None: + self.assertTrue(ft.file_exists(should_create_cache_file), msg=err_msg) + ft.delete_file(should_create_cache_file) + with self.assertRaises(Exception, msg=err_msg): + cacheable.nparray + else: + self.assertFalse(ft.file_exists(default_cache_file), msg=err_msg) + self.assertFalse(ft.file_exists(noexist_cache_file), msg=err_msg) + + except Exception: + lt.error(err_msg) + raise + + def test_lru(self): + """Verifies that the Least Recently Used functionality works as expected""" + + # create three cacheable images + # new LRU: 1, 2, 3 + c1 = CacheableImage(self.example_array) + self.assertEqual(c1, CacheableImage.lru(False)) + c2 = CacheableImage(self.example_array) + self.assertEqual(c1, CacheableImage.lru(False)) + c3 = CacheableImage(self.example_array) + self.assertEqual(c1, CacheableImage.lru(False)) + + # get the value from the 1st image, then the 2nd image + # LRU: 2, 3, 1 + c1.nparray + self.assertEqual(c2, CacheableImage.lru(False)) + # LRU: 3, 1, 2 + c2.nparray + self.assertEqual(c3, CacheableImage.lru(False)) + + # cache the 1st image, check that this doesn't change the lru + # LRU: 3, 2, 1 + c1.cache(ft.join(self.out_dir, f"{self.test_name}_c1.npy")) + self.assertEqual(c3, CacheableImage.lru(False)) + + # get the value of the 3rd image, check that the 2nd is now the LRU + # LRU: 2, 1, 3 + c3.nparray + self.assertEqual(c2, CacheableImage.lru(False)) + + # deregister the 2nd, then 1st, then 3rd + self.assertEqual(c2, CacheableImage.lru(True)) + # LRU: 1, 3 + self.assertEqual(c1, CacheableImage.lru(True)) + # LRU: 3 + self.assertEqual(c3, CacheableImage.lru(True)) + # LRU: + self.assertEqual(None, CacheableImage.lru(True)) + + # get the value for and deregister the 1st, then 2nd, then 3rd + # LRU: 1 + c1.nparray + self.assertEqual(c1, CacheableImage.lru(True)) + # LRU: 2 + c2.nparray + self.assertEqual(c2, CacheableImage.lru(True)) + # LRU: 3 + c3.nparray + self.assertEqual(c3, CacheableImage.lru(True)) + + # check that there are no more registered cacheable images + self.assertEqual(None, CacheableImage.lru(False)) + + def test_save_image(self): + """Verify that after saving the image, cacheing no longer creates a cache file""" + cache_file = ft.join(self.out_dir, f"{self.test_name}.npy") + image_file = ft.join(self.out_dir, f"{self.test_name}.png") + ci = CacheableImage(self.example_array, source_path=image_file) + in_memory_size = sys.getsizeof(ci) + + # Sanity test: cacheing without saving the image creates a cache file. + # This is in preparation for the "finale". + self.assertFalse(ft.file_exists(cache_file)) + ci.cache(cache_file) + self.assertTrue(ft.file_exists(cache_file)) + cached_size = sys.getsizeof(ci) + self.assertLess(cached_size, in_memory_size) + + # re-load the data + ci.nparray + in_memory_size = sys.getsizeof(ci) + self.assertGreater(in_memory_size, cached_size) + + # delete the cache file + ft.delete_file(cache_file) + self.assertFalse(ft.file_exists(cache_file)) + + # save to the image file + self.assertFalse(ft.file_exists(image_file)) + ci.save_image(image_file) + self.assertTrue(ft.file_exists(image_file)) + self.assertEqual(image_file, ci.source_path) + + # Finale: cacheing should not re-create the cache file because now the + # image file exists + self.assertFalse(ft.file_exists(cache_file)) + ci.cache(cache_file) + self.assertFalse(ft.file_exists(cache_file)) + cached_size = sys.getsizeof(ci) + self.assertLess(cached_size, in_memory_size) + + def test_cache_memlimit0(self): + """Check that cacheing doesn't halt forever when the memory limit is 0.""" + default_cache_file = ft.join(self.out_dir, f"{self.test_name}.npy") + cache_path_gen = lambda: default_cache_file + + # create the cacheable image + ci1 = CacheableImage(cache_path=self.example_cache_path) + self.assertAlmostEqual(0, sys.getsizeof(ci1), delta=ci1._expected_cached_size) + + # verify we're not cached yet + ci1.nparray + self.assertAlmostEqual(40 * 40 * 3, sys.getsizeof(ci1), delta=ci1._expected_cached_size) + + # verify the memory limit is working + ci1.cache_images_to_disk_as_necessary(1e10, cache_path_gen) + self.assertAlmostEqual(40 * 40 * 3, sys.getsizeof(ci1), delta=ci1._expected_cached_size) + + # check that a memory limit of 0 is accepted + ci1.cache_images_to_disk_as_necessary(0, cache_path_gen) + self.assertAlmostEqual(0, sys.getsizeof(ci1), delta=ci1._expected_cached_size) + + +if __name__ == '__main__': + unittest.main() diff --git a/opencsp/common/lib/tool/image_tools.py b/opencsp/common/lib/tool/image_tools.py index 8b21d25e0..ce5c67978 100644 --- a/opencsp/common/lib/tool/image_tools.py +++ b/opencsp/common/lib/tool/image_tools.py @@ -5,6 +5,7 @@ """ +import sys import numpy as np from PIL import Image @@ -256,3 +257,35 @@ def image_files_in_directory(dir: str, allowable_extensions: list[str] = None) - files = sorted(files) return files + + +def getsizeof_approx(img: Image) -> int: + """ + Get the number of bytes of memory used by the given image. + + Note that this value is approximate. It should be close based on the basic + assumptions of uncompressed data in memory, and one byte per pixel per color + channel. + + Parameters + ---------- + img : Image + The image to get the size of. + + Returns + ------- + int + Number of bytes of memory that the image object + image data together occupy. + """ + # Get the size of the image object + object_size = sys.getsizeof(img) + + # Calculate the approximate size of the image data + if img is None: + image_data_size = 0 + else: + width, height = img.size + mode_size = len(img.getbands()) # Number of bytes per pixel + image_data_size = width * height * mode_size + + return object_size + image_data_size diff --git a/opencsp/common/lib/tool/test/test_image_tools.py b/opencsp/common/lib/tool/test/test_image_tools.py index 50a2bbfb3..c43432cf0 100644 --- a/opencsp/common/lib/tool/test/test_image_tools.py +++ b/opencsp/common/lib/tool/test/test_image_tools.py @@ -1,10 +1,12 @@ +import unittest + import numpy as np import numpy.testing as nptest -import os -import unittest +import PIL.Image as Image import opencsp.common.lib.tool.file_tools as ft import opencsp.common.lib.tool.image_tools as it +import opencsp.common.lib.tool.log_tools as lt class TestImageTools(unittest.TestCase): @@ -88,6 +90,30 @@ def test_image_files_in_directory(self): self.assertIn("c.jpg", png_jpg_image_files) self.assertNotIn("d.txt", png_jpg_image_files) + def test_getsizeof_approx(self): + """ + Verifies that the returned size of the image in memory is somewhere in + the ballpark for how much memory we're expecting. + """ + implementation_overhead = 48 + # The delta accounts for the extra bytes of memory used for the python + # object overhead. This number is going to be system and implementation + # specific, and so might change with different python versions. + + delta = 2 * implementation_overhead + # Also, we don't actually care that much what the + # implementation-specific number is, so let's just make the buffer a + # little bigger. + + img = Image.new('RGB', (40, 40)) + self.assertAlmostEqual(it.getsizeof_approx(img), 40 * 40 * 3, delta=delta) + if 40 * 40 * 3 + implementation_overhead != it.getsizeof_approx(img): + lt.warn( + "Warning in test_image_tools.test_getsizeof_approx(): " + + "40*40*3 + implementation_overhead != it.getsizeof_approx(img) " + + f"({40*40*3} + {implementation_overhead} != {it.getsizeof_approx(img)})" + ) + if __name__ == '__main__': unittest.main()