Skip to content

Commit

Permalink
Merge "Add a LazyExecutorTTLCache to the OpenStack driver"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and openstack-gerrit committed Mar 27, 2023
2 parents fbe18dc + eda7b0f commit 908040b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 6 deletions.
20 changes: 15 additions & 5 deletions nodepool/driver/openstack/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import time
import operator

import cachetools.func
import openstack
from keystoneauth1.exceptions.catalog import EndpointNotFound

from nodepool.driver.utils import QuotaInformation
from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache
from nodepool.driver import statemachine
from nodepool import exceptions
from nodepool import stats
Expand Down Expand Up @@ -409,6 +408,20 @@ def __init__(self, provider_config):
thread_name_prefix=f'openstack-api-{provider_config.name}',
max_workers=workers)

# Use a lazy TTL cache for these. This uses the TPE to
# asynchronously update the cached values, meanwhile returning
# the previous cached data if available. This means every
# call after the first one is instantaneous.
self._listServers = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listServers)
self._listVolumes = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listVolumes)
self._listFloatingIps = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listFloatingIps)

self._last_image_check_failure = time.time()
self._last_port_cleanup = None
self._statsd = stats.get_client()
Expand Down Expand Up @@ -688,20 +701,17 @@ def _findNetwork(self, name):
name, self.provider.name))
return network

@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listServers(self):
with Timer(self.log, 'API call list_servers'):
return self._client.list_servers(bare=True)

@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listVolumes(self):
try:
with Timer(self.log, 'API call list_volumes'):
return self._client.list_volumes()
except EndpointNotFound:
return []

@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listFloatingIps(self):
with Timer(self.log, 'API call list_floating_ips'):
return self._client.list_floating_ips()
Expand Down
74 changes: 74 additions & 0 deletions nodepool/driver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,77 @@ def __exit__(self, etype, value, tb):

def _exit(self, etype, value, tb):
pass


class LazyExecutorTTLCache:
"""This is a lazy executor TTL cache.
It's lazy because if it has cached data, it will always return it
instantly.
It's executor based, which means that if a cache miss occurs, it
will submit a task to an executor to fetch new data.
Finally, it's a TTL cache, which means it automatically expires data.
Since it is only expected to be used when caching provider
resource listing methods, it assumes there will only be one entry
and ignores arguments -- it will return the same cached data no
matter what arguments are supplied; but it will pass on those
arguments to the underlying method in a cache miss.
:param numeric ttl: The cache timeout in seconds.
:param concurrent.futures.Executor executor: An executor to use to
update data asynchronously in case of a cache miss.
"""

def __init__(self, ttl, executor):
self.ttl = ttl
self.executor = executor
# If we have an outstanding update being run by the executor,
# this is the future.
self.future = None
# The last time the underlying method completed.
self.last_time = None
# The last value from the underlying method.
self.last_value = None
# A lock to make all of this thread safe (especially to ensure
# we don't fire off multiple updates).
self.lock = threading.Lock()

def __call__(self, func):
def decorator(*args, **kw):
with self.lock:
now = time.monotonic()
if self.future and self.future.done():
# If a previous call spawned an update, resolve
# that now so we can use the data.
try:
self.last_time, self.last_value = self.future.result()
finally:
# Clear the future regardless so we don't loop.
self.future = None
if (self.last_time is not None and
now - self.last_time < self.ttl):
# A cache hit.
return self.last_value
# The rest of the method is a cache miss.
if self.last_time is not None:
if not self.future:
# Fire off an asynchronous update request.
# This second wrapper ensures that we record
# the time that the update is complete along
# with the value.
def func_with_time():
ret = func(*args, **kw)
now = time.monotonic()
return (now, ret)
self.future = self.executor.submit(func_with_time)
else:
# This is the first time this method has been
# called; since we don't have any cached data, we
# will synchronously update the data.
self.last_value = func(*args, **kw)
self.last_time = time.monotonic()
return self.last_value
return decorator
43 changes: 42 additions & 1 deletion nodepool/tests/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.

from concurrent.futures import ThreadPoolExecutor
import copy
import math
import time

from nodepool import tests
from nodepool.driver.utils import QuotaInformation
from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache
from nodepool.nodeutils import iterate_timeout


class TestQutoInformation(tests.BaseTestCase):
Expand Down Expand Up @@ -66,3 +69,41 @@ def test_extra(self):
remain.subtract(needed)

self.assertEqual(expected.quota, remain.quota)


class FakeAdapter:
CACHE_TTL = 0.5

def __init__(self):
self.api_executor = ThreadPoolExecutor(max_workers=4)
self.get_time = LazyExecutorTTLCache(
self.CACHE_TTL, self.api_executor)(
self.get_time)

def get_time(self):
return time.monotonic()


class TestLazyExecutorTTLCache(tests.BaseTestCase):
def test_lazy_cache(self):
adapter = FakeAdapter()
t0 = time.monotonic()
ret1 = adapter.get_time()
t1 = time.monotonic()
self.assertTrue(t0 < ret1 < t1)
# Assuming the computer isn't completely overloaded, this
# should happen instantly and be a cache hit.
ret2 = adapter.get_time()
self.assertEqual(ret1, ret2)
# Sleep longer than the ttl
time.sleep(adapter.CACHE_TTL + 0.1)
# This should be a cache miss that triggers an update and
# returns the old value.
ret3 = adapter.get_time()
self.assertEqual(ret1, ret3)
# Eventually the async update should return and we should get
# a newer value.
for _ in iterate_timeout(30, Exception, 'cache update'):
ret4 = adapter.get_time()
if ret4 > ret3:
break

0 comments on commit 908040b

Please sign in to comment.