AWS: improve service quota handling

The AWS API call to get the service quota has its own rate limit
that is separate from EC2.  It is not documented, but the defaults
appear to be very small; experimentally it appear to be something
like a bucket size of 30 tokens and a refill rate somewhere
between 3 and 10 tokens per minute.

This change moves the quota lookup calls to their own rate limiter
so they are accounted for separately from other calls.

We should configure that rate limiter with the new very low values,
however, that would significantly slow startup since we need to issue
serveral calls at once when we start; after that we are not sensitive
to a delay.  The API can handle a burst at startup (with a bucket
size of 30) but our rate limiter doesn't have a burst option.  Instead
of cofiguring it properly, we will just configure it with the rate
limit we use for normal operations (so that we at least have some
delay), but otherwise, rely on caching so that we know that we won't
actually exceed the rate limit.

This change therefore also adds a Lazy Executor TTL cache to the
operations with a timeout of 5 minutes.  This means that we will issue
bursts of requests every 5 minutes, and as long as the number of
requests is less than the token replacement rate, we'll be fine.

Because this cache is on the adapter, multiple pool workers will use
the same cache.  This will cause a reduction in API calls since
currently there is only pool-worker level caching of nodepool quota
information objects.  When the 5 minute cache on the nodepool quota
info object expires, we will now hit the adapter cache (with its own
5 minute timeout) rather than go directly to the API repeatedly for
each pool worker.  This does mean that quota changes may take between
5 and 10 minutes to appear in nodepool.

The current code only looks up quota information for instance and
volume types actually used.  If that number is low, all is well, but
if it is high, then we could potentially approach or exceed the token
replacement rate.  To make this more predictable, we will switch the
API call to list all quotas instead of fetching only the ones we need.
Due to pagination, this results in a total of 8 API calls as of writing;
5 for ec2 quotas and 3 for ebs.  These are likely to grow over time,
but very slowly.

Taken all together, these changes mean that a single launcher should
issue at most 8 quota service api requests every 5 minutes, which is
below the lowest observed token replacement rate.

Change-Id: Idb3fb114f5b8cda8a7b6d5edc9c011cb7261be9f
This commit is contained in:
James E. Blair 2023-10-17 11:00:02 -07:00
parent c973be0a1b
commit 7d7d81cd46
2 changed files with 120 additions and 42 deletions

View File

@ -1,5 +1,5 @@
# Copyright 2018 Red Hat
# Copyright 2022 Acme Gating, LLC
# Copyright 2022-2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -98,6 +98,7 @@ VOLUME_QUOTA_CODES = {
}
CACHE_TTL = 10
SERVICE_QUOTA_CACHE_TTL = 300
ON_DEMAND = 0
SPOT = 1
@ -281,6 +282,15 @@ class AwsAdapter(statemachine.Adapter):
# of mutating requests by default.
self.non_mutating_rate_limiter = RateLimiter(self.provider.name,
self.provider.rate * 10.0)
# Experimentally, this rate limit refreshes tokens at
# something like 0.16/second, so if we operated at the rate
# limit, it would take us almost a minute to determine the
# quota. Instead, we're going to just use the normal provider
# rate and rely on caching to avoid going over the limit. At
# the of writing, we'll issue bursts of 5 requests every 5
# minutes.
self.quota_service_rate_limiter = RateLimiter(self.provider.name,
self.provider.rate)
self.image_id_by_filter_cache = cachetools.TTLCache(
maxsize=8192, ttl=(5 * 60))
self.aws = boto3.Session(
@ -316,6 +326,12 @@ class AwsAdapter(statemachine.Adapter):
self._listObjects = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listObjects)
self._listEC2Quotas = LazyExecutorTTLCache(
SERVICE_QUOTA_CACHE_TTL, self.api_executor)(
self._listEC2Quotas)
self._listEBSQuotas = LazyExecutorTTLCache(
SERVICE_QUOTA_CACHE_TTL, self.api_executor)(
self._listEBSQuotas)
# In listResources, we reconcile AMIs which appear to be
# imports but have no nodepool tags, however it's possible
@ -425,13 +441,16 @@ class AwsAdapter(statemachine.Adapter):
# Get the instance and volume types that this provider handles
instance_types = {}
volume_types = set()
ec2_quotas = self._listEC2Quotas()
ebs_quotas = self._listEBSQuotas()
for pool in self.provider.pools.values():
for label in pool.labels.values():
if label.instance_type not in instance_types:
instance_types[label.instance_type] = set()
instance_types[label.instance_type].add(
SPOT if label.use_spot else ON_DEMAND)
volume_types.add(label.volume_type)
if label.volume_type:
volume_types.add(label.volume_type)
args = dict(default=math.inf)
for instance_type in instance_types:
for market_type_option in instance_types[instance_type]:
@ -444,13 +463,12 @@ class AwsAdapter(statemachine.Adapter):
"Unknown quota code for instance type: %s",
instance_type)
continue
with self.non_mutating_rate_limiter:
self.log.debug("Getting EC2 quota limits for %s", code)
response = self.aws_quotas.get_service_quota(
ServiceCode='ec2',
QuotaCode=code,
)
args[code] = response['Quota']['Value']
if code not in ec2_quotas:
self.log.warning(
"AWS quota code %s for instance type: %s not known",
code, instance_type)
continue
args[code] = ec2_quotas[code]
for volume_type in volume_types:
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type)
if not vquota_codes:
@ -461,18 +479,17 @@ class AwsAdapter(statemachine.Adapter):
for resource, code in vquota_codes.items():
if code in args:
continue
with self.non_mutating_rate_limiter:
self.log.debug("Getting EBS quota limits for %s", code)
response = self.aws_quotas.get_service_quota(
ServiceCode='ebs',
QuotaCode=code,
)
value = response['Quota']['Value']
# Unit mismatch: storage limit is in TB, but usage
# is in GB. Translate the limit to GB.
if resource == 'storage':
value *= 1000
args[code] = value
if code not in ebs_quotas:
self.log.warning(
"AWS quota code %s for volume type: %s not known",
code, volume_type)
continue
value = ebs_quotas[code]
# Unit mismatch: storage limit is in TB, but usage
# is in GB. Translate the limit to GB.
if resource == 'storage':
value *= 1000
args[code] = value
return QuotaInformation(**args)
def getQuotaForLabel(self, label):
@ -977,6 +994,23 @@ class AwsAdapter(statemachine.Adapter):
return instance
return None
def _listServiceQuotas(self, service_code):
with self.quota_service_rate_limiter(
self.log.debug, f"Listed {service_code} quotas"):
paginator = self.aws_quotas.get_paginator(
'list_service_quotas')
quotas = {}
for page in paginator.paginate(ServiceCode=service_code):
for quota in page['Quotas']:
quotas[quota['QuotaCode']] = quota['Value']
return quotas
def _listEC2Quotas(self):
return self._listServiceQuotas('ec2')
def _listEBSQuotas(self):
return self._listServiceQuotas('ebs')
def _listInstances(self):
with self.non_mutating_rate_limiter(
self.log.debug, "Listed instances"):

View File

@ -1,5 +1,5 @@
# Copyright (C) 2018 Red Hat
# Copyright 2022 Acme Gating, LLC
# Copyright 2022-2023 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -73,23 +73,53 @@ class FakeAwsAdapter(AwsAdapter):
# moto does not mock service-quotas, so we do it ourselves:
def _fake_get_service_quota(ServiceCode, QuotaCode, *args, **kwargs):
# This is a simple fake that only returns the number
# of cores.
if self.__quotas is None:
return {'Quota': {'Value': 100}}
if ServiceCode == 'ec2':
qdict = self.__ec2_quotas
elif ServiceCode == 'ebs':
qdict = self.__ebs_quotas
else:
return {'Quota': {'Value': self.__quotas.get(QuotaCode)}}
raise NotImplementedError(
f"Quota code {ServiceCode} not implemented")
return {'Quota': {'Value': qdict.get(QuotaCode)}}
self.aws_quotas.get_service_quota = _fake_get_service_quota
def _fake_list_service_quotas(ServiceCode, *args, **kwargs):
if ServiceCode == 'ec2':
qdict = self.__ec2_quotas
elif ServiceCode == 'ebs':
qdict = self.__ebs_quotas
else:
raise NotImplementedError(
f"Quota code {ServiceCode} not implemented")
quotas = []
for code, value in qdict.items():
quotas.append(
{'Value': value, 'QuotaCode': code}
)
return {'Quotas': quotas}
self.aws_quotas.list_service_quotas = _fake_list_service_quotas
def aws_quotas(quotas):
"""Specify a set of AWS quota values for use by a test method.
def ec2_quotas(quotas):
"""Specify a set of AWS EC2 quota values for use by a test method.
:arg dict quotas: The quota dictionary.
"""
def decorator(test):
test.__aws_quotas__ = quotas
test.__aws_ec2_quotas__ = quotas
return test
return decorator
def ebs_quotas(quotas):
"""Specify a set of AWS EBS quota values for use by a test method.
:arg dict quotas: The quota dictionary.
"""
def decorator(test):
test.__aws_ebs_quotas__ = quotas
return test
return decorator
@ -165,11 +195,8 @@ class TestDriverAws(tests.DBTestCase):
self.patch(nodepool.driver.statemachine, 'nodescan', fake_nodescan)
test_name = self.id().split('.')[-1]
test = getattr(self, test_name)
if hasattr(test, '__aws_quotas__'):
quotas = getattr(test, '__aws_quotas__')
else:
quotas = None
self.patchAdapter(quotas=quotas)
self.patchAdapter(ec2_quotas=getattr(test, '__aws_ec2_quotas__', None),
ebs_quotas=getattr(test, '__aws_ebs_quotas__', None))
def tearDown(self):
self.mock_ec2.stop()
@ -183,12 +210,27 @@ class TestDriverAws(tests.DBTestCase):
kw['instance_profile_arn'] = self.instance_profile_arn
return super().setup_config(*args, **kw)
def patchAdapter(self, quotas=None):
def patchAdapter(self, ec2_quotas=None, ebs_quotas=None):
default_ec2_quotas = {
'L-1216C47A': 100,
'L-43DA4232': 100,
'L-34B43A08': 100,
}
default_ebs_quotas = {
'L-D18FCD1D': 100.0,
'L-7A658B76': 100.0,
}
if ec2_quotas is None:
ec2_quotas = default_ec2_quotas
if ebs_quotas is None:
ebs_quotas = default_ebs_quotas
self.patch(nodepool.driver.aws.adapter, 'AwsAdapter', FakeAwsAdapter)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__testcase', self)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__quotas', quotas)
'_FakeAwsAdapter__ec2_quotas', ec2_quotas)
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
'_FakeAwsAdapter__ebs_quotas', ebs_quotas)
def requestNode(self, config_path, label):
# A helper method to perform a single node request
@ -247,7 +289,7 @@ class TestDriverAws(tests.DBTestCase):
for node in nodes:
self.waitForNodeDeletion(node)
@aws_quotas({
@ec2_quotas({
'L-1216C47A': 2,
'L-43DA4232': 448,
'L-34B43A08': 2
@ -302,7 +344,7 @@ class TestDriverAws(tests.DBTestCase):
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
@aws_quotas({
@ec2_quotas({
'L-43DA4232': 448,
'L-1216C47A': 200,
'L-34B43A08': 200
@ -355,7 +397,7 @@ class TestDriverAws(tests.DBTestCase):
req3 = self.waitForNodeRequest(req3)
self.assertSuccess(req3)
@aws_quotas({
@ec2_quotas({
'L-1216C47A': 1000,
'L-43DA4232': 1000,
})
@ -400,7 +442,7 @@ class TestDriverAws(tests.DBTestCase):
req2 = self.waitForNodeRequest(req2)
self.assertSuccess(req2)
@aws_quotas({
@ec2_quotas({
'L-1216C47A': 1000,
'L-43DA4232': 1000,
})
@ -444,8 +486,10 @@ class TestDriverAws(tests.DBTestCase):
# Assert that the second request is still being deferred
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
@aws_quotas({
@ec2_quotas({
'L-1216C47A': 200, # instance
})
@ebs_quotas({
'L-D18FCD1D': 1.0, # gp2 storage (TB)
'L-7A658B76': 1.0, # gp3 storage (TB)
})