Merge "Add a LazyExecutorTTLCache to the OpenStack driver"
This commit is contained in:
commit
908040bd67
@ -23,11 +23,10 @@ import math
|
||||
import time
|
||||
import operator
|
||||
|
||||
import cachetools.func
|
||||
import openstack
|
||||
from keystoneauth1.exceptions.catalog import EndpointNotFound
|
||||
|
||||
from nodepool.driver.utils import QuotaInformation
|
||||
from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache
|
||||
from nodepool.driver import statemachine
|
||||
from nodepool import exceptions
|
||||
from nodepool import stats
|
||||
@ -409,6 +408,20 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
thread_name_prefix=f'openstack-api-{provider_config.name}',
|
||||
max_workers=workers)
|
||||
|
||||
# Use a lazy TTL cache for these. This uses the TPE to
|
||||
# asynchronously update the cached values, meanwhile returning
|
||||
# the previous cached data if available. This means every
|
||||
# call after the first one is instantaneous.
|
||||
self._listServers = LazyExecutorTTLCache(
|
||||
CACHE_TTL, self.api_executor)(
|
||||
self._listServers)
|
||||
self._listVolumes = LazyExecutorTTLCache(
|
||||
CACHE_TTL, self.api_executor)(
|
||||
self._listVolumes)
|
||||
self._listFloatingIps = LazyExecutorTTLCache(
|
||||
CACHE_TTL, self.api_executor)(
|
||||
self._listFloatingIps)
|
||||
|
||||
self._last_image_check_failure = time.time()
|
||||
self._last_port_cleanup = None
|
||||
self._statsd = stats.get_client()
|
||||
@ -688,12 +701,10 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
name, self.provider.name))
|
||||
return network
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
|
||||
def _listServers(self):
|
||||
with Timer(self.log, 'API call list_servers'):
|
||||
return self._client.list_servers(bare=True)
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
|
||||
def _listVolumes(self):
|
||||
try:
|
||||
with Timer(self.log, 'API call list_volumes'):
|
||||
@ -701,7 +712,6 @@ class OpenStackAdapter(statemachine.Adapter):
|
||||
except EndpointNotFound:
|
||||
return []
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
|
||||
def _listFloatingIps(self):
|
||||
with Timer(self.log, 'API call list_floating_ips'):
|
||||
return self._client.list_floating_ips()
|
||||
|
@ -457,3 +457,77 @@ class RateLimiter:
|
||||
|
||||
def _exit(self, etype, value, tb):
|
||||
pass
|
||||
|
||||
|
||||
class LazyExecutorTTLCache:
|
||||
"""This is a lazy executor TTL cache.
|
||||
|
||||
It's lazy because if it has cached data, it will always return it
|
||||
instantly.
|
||||
|
||||
It's executor based, which means that if a cache miss occurs, it
|
||||
will submit a task to an executor to fetch new data.
|
||||
|
||||
Finally, it's a TTL cache, which means it automatically expires data.
|
||||
|
||||
Since it is only expected to be used when caching provider
|
||||
resource listing methods, it assumes there will only be one entry
|
||||
and ignores arguments -- it will return the same cached data no
|
||||
matter what arguments are supplied; but it will pass on those
|
||||
arguments to the underlying method in a cache miss.
|
||||
|
||||
:param numeric ttl: The cache timeout in seconds.
|
||||
:param concurrent.futures.Executor executor: An executor to use to
|
||||
update data asynchronously in case of a cache miss.
|
||||
"""
|
||||
|
||||
def __init__(self, ttl, executor):
|
||||
self.ttl = ttl
|
||||
self.executor = executor
|
||||
# If we have an outstanding update being run by the executor,
|
||||
# this is the future.
|
||||
self.future = None
|
||||
# The last time the underlying method completed.
|
||||
self.last_time = None
|
||||
# The last value from the underlying method.
|
||||
self.last_value = None
|
||||
# A lock to make all of this thread safe (especially to ensure
|
||||
# we don't fire off multiple updates).
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def __call__(self, func):
|
||||
def decorator(*args, **kw):
|
||||
with self.lock:
|
||||
now = time.monotonic()
|
||||
if self.future and self.future.done():
|
||||
# If a previous call spawned an update, resolve
|
||||
# that now so we can use the data.
|
||||
try:
|
||||
self.last_time, self.last_value = self.future.result()
|
||||
finally:
|
||||
# Clear the future regardless so we don't loop.
|
||||
self.future = None
|
||||
if (self.last_time is not None and
|
||||
now - self.last_time < self.ttl):
|
||||
# A cache hit.
|
||||
return self.last_value
|
||||
# The rest of the method is a cache miss.
|
||||
if self.last_time is not None:
|
||||
if not self.future:
|
||||
# Fire off an asynchronous update request.
|
||||
# This second wrapper ensures that we record
|
||||
# the time that the update is complete along
|
||||
# with the value.
|
||||
def func_with_time():
|
||||
ret = func(*args, **kw)
|
||||
now = time.monotonic()
|
||||
return (now, ret)
|
||||
self.future = self.executor.submit(func_with_time)
|
||||
else:
|
||||
# This is the first time this method has been
|
||||
# called; since we don't have any cached data, we
|
||||
# will synchronously update the data.
|
||||
self.last_value = func(*args, **kw)
|
||||
self.last_time = time.monotonic()
|
||||
return self.last_value
|
||||
return decorator
|
||||
|
@ -12,11 +12,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import copy
|
||||
import math
|
||||
import time
|
||||
|
||||
from nodepool import tests
|
||||
from nodepool.driver.utils import QuotaInformation
|
||||
from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache
|
||||
from nodepool.nodeutils import iterate_timeout
|
||||
|
||||
|
||||
class TestQutoInformation(tests.BaseTestCase):
|
||||
@ -66,3 +69,41 @@ class TestQutoInformation(tests.BaseTestCase):
|
||||
remain.subtract(needed)
|
||||
|
||||
self.assertEqual(expected.quota, remain.quota)
|
||||
|
||||
|
||||
class FakeAdapter:
|
||||
CACHE_TTL = 0.5
|
||||
|
||||
def __init__(self):
|
||||
self.api_executor = ThreadPoolExecutor(max_workers=4)
|
||||
self.get_time = LazyExecutorTTLCache(
|
||||
self.CACHE_TTL, self.api_executor)(
|
||||
self.get_time)
|
||||
|
||||
def get_time(self):
|
||||
return time.monotonic()
|
||||
|
||||
|
||||
class TestLazyExecutorTTLCache(tests.BaseTestCase):
|
||||
def test_lazy_cache(self):
|
||||
adapter = FakeAdapter()
|
||||
t0 = time.monotonic()
|
||||
ret1 = adapter.get_time()
|
||||
t1 = time.monotonic()
|
||||
self.assertTrue(t0 < ret1 < t1)
|
||||
# Assuming the computer isn't completely overloaded, this
|
||||
# should happen instantly and be a cache hit.
|
||||
ret2 = adapter.get_time()
|
||||
self.assertEqual(ret1, ret2)
|
||||
# Sleep longer than the ttl
|
||||
time.sleep(adapter.CACHE_TTL + 0.1)
|
||||
# This should be a cache miss that triggers an update and
|
||||
# returns the old value.
|
||||
ret3 = adapter.get_time()
|
||||
self.assertEqual(ret1, ret3)
|
||||
# Eventually the async update should return and we should get
|
||||
# a newer value.
|
||||
for _ in iterate_timeout(30, Exception, 'cache update'):
|
||||
ret4 = adapter.get_time()
|
||||
if ret4 > ret3:
|
||||
break
|
||||
|
Loading…
x
Reference in New Issue
Block a user