From eda7b0f6099d93a8ddffceed55abd4bb6d07d680 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 15 Mar 2023 15:07:17 -0700 Subject: [PATCH] Add a LazyExecutorTTLCache to the OpenStack driver See the docstring for an explanation of what a Lazy Executor TTL Cache is. By switching the caching of the server list method (and also volumes and fips) to the lazy cache, we will make all of the methods called by the state machines asynchronous. This means that both the create and delete state machine threads should be able to spin through all of their state machines as quickly as Python and ZooKeeper overhead will allow. Change-Id: Ibce6b4d82929e6a764fdbc025990f7e01060b509 --- nodepool/driver/openstack/adapter.py | 20 ++++++-- nodepool/driver/utils.py | 74 ++++++++++++++++++++++++++++ nodepool/tests/unit/test_utils.py | 43 +++++++++++++++- 3 files changed, 131 insertions(+), 6 deletions(-) diff --git a/nodepool/driver/openstack/adapter.py b/nodepool/driver/openstack/adapter.py index cef964280..b9860dd9e 100644 --- a/nodepool/driver/openstack/adapter.py +++ b/nodepool/driver/openstack/adapter.py @@ -23,11 +23,10 @@ import math import time import operator -import cachetools.func import openstack from keystoneauth1.exceptions.catalog import EndpointNotFound -from nodepool.driver.utils import QuotaInformation +from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache from nodepool.driver import statemachine from nodepool import exceptions from nodepool import stats @@ -409,6 +408,20 @@ class OpenStackAdapter(statemachine.Adapter): thread_name_prefix=f'openstack-api-{provider_config.name}', max_workers=workers) + # Use a lazy TTL cache for these. This uses the TPE to + # asynchronously update the cached values, meanwhile returning + # the previous cached data if available. This means every + # call after the first one is instantaneous. + self._listServers = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listServers) + self._listVolumes = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listVolumes) + self._listFloatingIps = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listFloatingIps) + self._last_image_check_failure = time.time() self._last_port_cleanup = None self._statsd = stats.get_client() @@ -688,12 +701,10 @@ class OpenStackAdapter(statemachine.Adapter): name, self.provider.name)) return network - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listServers(self): with Timer(self.log, 'API call list_servers'): return self._client.list_servers(bare=True) - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listVolumes(self): try: with Timer(self.log, 'API call list_volumes'): @@ -701,7 +712,6 @@ class OpenStackAdapter(statemachine.Adapter): except EndpointNotFound: return [] - @cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL) def _listFloatingIps(self): with Timer(self.log, 'API call list_floating_ips'): return self._client.list_floating_ips() diff --git a/nodepool/driver/utils.py b/nodepool/driver/utils.py index 2efa45260..b7825a12a 100644 --- a/nodepool/driver/utils.py +++ b/nodepool/driver/utils.py @@ -457,3 +457,77 @@ class RateLimiter: def _exit(self, etype, value, tb): pass + + +class LazyExecutorTTLCache: + """This is a lazy executor TTL cache. + + It's lazy because if it has cached data, it will always return it + instantly. + + It's executor based, which means that if a cache miss occurs, it + will submit a task to an executor to fetch new data. + + Finally, it's a TTL cache, which means it automatically expires data. + + Since it is only expected to be used when caching provider + resource listing methods, it assumes there will only be one entry + and ignores arguments -- it will return the same cached data no + matter what arguments are supplied; but it will pass on those + arguments to the underlying method in a cache miss. + + :param numeric ttl: The cache timeout in seconds. + :param concurrent.futures.Executor executor: An executor to use to + update data asynchronously in case of a cache miss. + """ + + def __init__(self, ttl, executor): + self.ttl = ttl + self.executor = executor + # If we have an outstanding update being run by the executor, + # this is the future. + self.future = None + # The last time the underlying method completed. + self.last_time = None + # The last value from the underlying method. + self.last_value = None + # A lock to make all of this thread safe (especially to ensure + # we don't fire off multiple updates). + self.lock = threading.Lock() + + def __call__(self, func): + def decorator(*args, **kw): + with self.lock: + now = time.monotonic() + if self.future and self.future.done(): + # If a previous call spawned an update, resolve + # that now so we can use the data. + try: + self.last_time, self.last_value = self.future.result() + finally: + # Clear the future regardless so we don't loop. + self.future = None + if (self.last_time is not None and + now - self.last_time < self.ttl): + # A cache hit. + return self.last_value + # The rest of the method is a cache miss. + if self.last_time is not None: + if not self.future: + # Fire off an asynchronous update request. + # This second wrapper ensures that we record + # the time that the update is complete along + # with the value. + def func_with_time(): + ret = func(*args, **kw) + now = time.monotonic() + return (now, ret) + self.future = self.executor.submit(func_with_time) + else: + # This is the first time this method has been + # called; since we don't have any cached data, we + # will synchronously update the data. + self.last_value = func(*args, **kw) + self.last_time = time.monotonic() + return self.last_value + return decorator diff --git a/nodepool/tests/unit/test_utils.py b/nodepool/tests/unit/test_utils.py index 8d1f4ce63..d9b21cc6f 100644 --- a/nodepool/tests/unit/test_utils.py +++ b/nodepool/tests/unit/test_utils.py @@ -12,11 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. +from concurrent.futures import ThreadPoolExecutor import copy import math +import time from nodepool import tests -from nodepool.driver.utils import QuotaInformation +from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache +from nodepool.nodeutils import iterate_timeout class TestQutoInformation(tests.BaseTestCase): @@ -66,3 +69,41 @@ class TestQutoInformation(tests.BaseTestCase): remain.subtract(needed) self.assertEqual(expected.quota, remain.quota) + + +class FakeAdapter: + CACHE_TTL = 0.5 + + def __init__(self): + self.api_executor = ThreadPoolExecutor(max_workers=4) + self.get_time = LazyExecutorTTLCache( + self.CACHE_TTL, self.api_executor)( + self.get_time) + + def get_time(self): + return time.monotonic() + + +class TestLazyExecutorTTLCache(tests.BaseTestCase): + def test_lazy_cache(self): + adapter = FakeAdapter() + t0 = time.monotonic() + ret1 = adapter.get_time() + t1 = time.monotonic() + self.assertTrue(t0 < ret1 < t1) + # Assuming the computer isn't completely overloaded, this + # should happen instantly and be a cache hit. + ret2 = adapter.get_time() + self.assertEqual(ret1, ret2) + # Sleep longer than the ttl + time.sleep(adapter.CACHE_TTL + 0.1) + # This should be a cache miss that triggers an update and + # returns the old value. + ret3 = adapter.get_time() + self.assertEqual(ret1, ret3) + # Eventually the async update should return and we should get + # a newer value. + for _ in iterate_timeout(30, Exception, 'cache update'): + ret4 = adapter.get_time() + if ret4 > ret3: + break