diff --git a/nodepool/driver/openstack/adapter.py b/nodepool/driver/openstack/adapter.py index cef96428..b9860dd9 100644 --- a/nodepool/driver/openstack/adapter.py +++ b/nodepool/driver/openstack/adapter.py @@ -23,11 +23,10 @@ import time import operator -import cachetools.func import openstack from keystoneauth1.exceptions.catalog import EndpointNotFound -from nodepool.driver.utils import QuotaInformation +from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache from nodepool.driver import statemachine from nodepool import exceptions from nodepool import stats @@ -409,6 +408,20 @@ def __init__(self, provider_config): thread_name_prefix=f'openstack-api-{provider_config.name}', max_workers=workers) + # Use a lazy TTL cache for these. This uses the TPE to + # asynchronously update the cached values, meanwhile returning + # the previous cached data if available. This means every + # call after the first one is instantaneous. + self._listServers = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listServers) + self._listVolumes = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listVolumes) + self._listFloatingIps = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listFloatingIps) + self._last_image_check_failure = time.time() self._last_port_cleanup = None self._statsd = stats.get_client() @@ -688,12 +701,10 @@ def _findNetwork(self, name): name, self.provider.name)) return network - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listServers(self): with Timer(self.log, 'API call list_servers'): return self._client.list_servers(bare=True) - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listVolumes(self): try: with Timer(self.log, 'API call list_volumes'): @@ -701,7 +712,6 @@ def _listVolumes(self): except EndpointNotFound: return [] - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listFloatingIps(self): with Timer(self.log, 'API call list_floating_ips'): return self._client.list_floating_ips() diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py index 2efa4526..b7825a12 100644 --- a/nodepool/driver/utils.py +++ b/nodepool/driver/utils.py @@ -457,3 +457,77 @@ def __exit__(self, etype, value, tb): def _exit(self, etype, value, tb): pass + + +class LazyExecutorTTLCache: + """This is a lazy executor TTL cache. + + It's lazy because if it has cached data, it will always return it + instantly. + + It's executor based, which means that if a cache miss occurs, it + will submit a task to an executor to fetch new data. + + Finally, it's a TTL cache, which means it automatically expires data. + + Since it is only expected to be used when caching provider + resource listing methods, it assumes there will only be one entry + and ignores arguments -- it will return the same cached data no + matter what arguments are supplied; but it will pass on those + arguments to the underlying method in a cache miss. + + :param numeric ttl: The cache timeout in seconds. + :param concurrent.futures.Executor executor: An executor to use to + update data asynchronously in case of a cache miss. + """ + + def __init__(self, ttl, executor): + self.ttl = ttl + self.executor = executor + # If we have an outstanding update being run by the executor, + # this is the future. + self.future = None + # The last time the underlying method completed. + self.last_time = None + # The last value from the underlying method. + self.last_value = None + # A lock to make all of this thread safe (especially to ensure + # we don't fire off multiple updates). + self.lock = threading.Lock() + + def __call__(self, func): + def decorator(*args, **kw): + with self.lock: + now = time.monotonic() + if self.future and self.future.done(): + # If a previous call spawned an update, resolve + # that now so we can use the data. + try: + self.last_time, self.last_value = self.future.result() + finally: + # Clear the future regardless so we don't loop. + self.future = None + if (self.last_time is not None and + now - self.last_time < self.ttl): + # A cache hit. + return self.last_value + # The rest of the method is a cache miss. + if self.last_time is not None: + if not self.future: + # Fire off an asynchronous update request. + # This second wrapper ensures that we record + # the time that the update is complete along + # with the value. + def func_with_time(): + ret = func(*args, **kw) + now = time.monotonic() + return (now, ret) + self.future = self.executor.submit(func_with_time) + else: + # This is the first time this method has been + # called; since we don't have any cached data, we + # will synchronously update the data. + self.last_value = func(*args, **kw) + self.last_time = time.monotonic() + return self.last_value + return decorator diff --git a/nodepool/tests/unit/test_utils.py b/nodepool/tests/unit/test_utils.py index 8d1f4ce6..d9b21cc6 100644 --- a/nodepool/tests/unit/test_utils.py +++ b/nodepool/tests/unit/test_utils.py @@ -12,11 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. +from concurrent.futures import ThreadPoolExecutor import copy import math +import time from nodepool import tests -from nodepool.driver.utils import QuotaInformation +from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache +from nodepool.nodeutils import iterate_timeout class TestQutoInformation(tests.BaseTestCase): @@ -66,3 +69,41 @@ def test_extra(self): remain.subtract(needed) self.assertEqual(expected.quota, remain.quota) + + +class FakeAdapter: + CACHE_TTL = 0.5 + + def __init__(self): + self.api_executor = ThreadPoolExecutor(max_workers=4) + self.get_time = LazyExecutorTTLCache( + self.CACHE_TTL, self.api_executor)( + self.get_time) + + def get_time(self): + return time.monotonic() + + +class TestLazyExecutorTTLCache(tests.BaseTestCase): + def test_lazy_cache(self): + adapter = FakeAdapter() + t0 = time.monotonic() + ret1 = adapter.get_time() + t1 = time.monotonic() + self.assertTrue(t0 < ret1 < t1) + # Assuming the computer isn't completely overloaded, this + # should happen instantly and be a cache hit. + ret2 = adapter.get_time() + self.assertEqual(ret1, ret2) + # Sleep longer than the ttl + time.sleep(adapter.CACHE_TTL + 0.1) + # This should be a cache miss that triggers an update and + # returns the old value. + ret3 = adapter.get_time() + self.assertEqual(ret1, ret3) + # Eventually the async update should return and we should get + # a newer value. + for _ in iterate_timeout(30, Exception, 'cache update'): + ret4 = adapter.get_time() + if ret4 > ret3: + break