Merge "AWS: improve service quota handling"
This commit is contained in:
commit
63542ece0d
@ -1,5 +1,5 @@
|
||||
# Copyright 2018 Red Hat
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
# Copyright 2022-2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -98,6 +98,7 @@ VOLUME_QUOTA_CODES = {
|
||||
}
|
||||
|
||||
CACHE_TTL = 10
|
||||
SERVICE_QUOTA_CACHE_TTL = 300
|
||||
ON_DEMAND = 0
|
||||
SPOT = 1
|
||||
|
||||
@ -281,6 +282,15 @@ class AwsAdapter(statemachine.Adapter):
|
||||
# of mutating requests by default.
|
||||
self.non_mutating_rate_limiter = RateLimiter(self.provider.name,
|
||||
self.provider.rate * 10.0)
|
||||
# Experimentally, this rate limit refreshes tokens at
|
||||
# something like 0.16/second, so if we operated at the rate
|
||||
# limit, it would take us almost a minute to determine the
|
||||
# quota. Instead, we're going to just use the normal provider
|
||||
# rate and rely on caching to avoid going over the limit. At
|
||||
# the of writing, we'll issue bursts of 5 requests every 5
|
||||
# minutes.
|
||||
self.quota_service_rate_limiter = RateLimiter(self.provider.name,
|
||||
self.provider.rate)
|
||||
self.image_id_by_filter_cache = cachetools.TTLCache(
|
||||
maxsize=8192, ttl=(5 * 60))
|
||||
self.aws = boto3.Session(
|
||||
@ -316,6 +326,12 @@ class AwsAdapter(statemachine.Adapter):
|
||||
self._listObjects = LazyExecutorTTLCache(
|
||||
CACHE_TTL, self.api_executor)(
|
||||
self._listObjects)
|
||||
self._listEC2Quotas = LazyExecutorTTLCache(
|
||||
SERVICE_QUOTA_CACHE_TTL, self.api_executor)(
|
||||
self._listEC2Quotas)
|
||||
self._listEBSQuotas = LazyExecutorTTLCache(
|
||||
SERVICE_QUOTA_CACHE_TTL, self.api_executor)(
|
||||
self._listEBSQuotas)
|
||||
|
||||
# In listResources, we reconcile AMIs which appear to be
|
||||
# imports but have no nodepool tags, however it's possible
|
||||
@ -425,13 +441,16 @@ class AwsAdapter(statemachine.Adapter):
|
||||
# Get the instance and volume types that this provider handles
|
||||
instance_types = {}
|
||||
volume_types = set()
|
||||
ec2_quotas = self._listEC2Quotas()
|
||||
ebs_quotas = self._listEBSQuotas()
|
||||
for pool in self.provider.pools.values():
|
||||
for label in pool.labels.values():
|
||||
if label.instance_type not in instance_types:
|
||||
instance_types[label.instance_type] = set()
|
||||
instance_types[label.instance_type].add(
|
||||
SPOT if label.use_spot else ON_DEMAND)
|
||||
volume_types.add(label.volume_type)
|
||||
if label.volume_type:
|
||||
volume_types.add(label.volume_type)
|
||||
args = dict(default=math.inf)
|
||||
for instance_type in instance_types:
|
||||
for market_type_option in instance_types[instance_type]:
|
||||
@ -444,13 +463,12 @@ class AwsAdapter(statemachine.Adapter):
|
||||
"Unknown quota code for instance type: %s",
|
||||
instance_type)
|
||||
continue
|
||||
with self.non_mutating_rate_limiter:
|
||||
self.log.debug("Getting EC2 quota limits for %s", code)
|
||||
response = self.aws_quotas.get_service_quota(
|
||||
ServiceCode='ec2',
|
||||
QuotaCode=code,
|
||||
)
|
||||
args[code] = response['Quota']['Value']
|
||||
if code not in ec2_quotas:
|
||||
self.log.warning(
|
||||
"AWS quota code %s for instance type: %s not known",
|
||||
code, instance_type)
|
||||
continue
|
||||
args[code] = ec2_quotas[code]
|
||||
for volume_type in volume_types:
|
||||
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type)
|
||||
if not vquota_codes:
|
||||
@ -461,18 +479,17 @@ class AwsAdapter(statemachine.Adapter):
|
||||
for resource, code in vquota_codes.items():
|
||||
if code in args:
|
||||
continue
|
||||
with self.non_mutating_rate_limiter:
|
||||
self.log.debug("Getting EBS quota limits for %s", code)
|
||||
response = self.aws_quotas.get_service_quota(
|
||||
ServiceCode='ebs',
|
||||
QuotaCode=code,
|
||||
)
|
||||
value = response['Quota']['Value']
|
||||
# Unit mismatch: storage limit is in TB, but usage
|
||||
# is in GB. Translate the limit to GB.
|
||||
if resource == 'storage':
|
||||
value *= 1000
|
||||
args[code] = value
|
||||
if code not in ebs_quotas:
|
||||
self.log.warning(
|
||||
"AWS quota code %s for volume type: %s not known",
|
||||
code, volume_type)
|
||||
continue
|
||||
value = ebs_quotas[code]
|
||||
# Unit mismatch: storage limit is in TB, but usage
|
||||
# is in GB. Translate the limit to GB.
|
||||
if resource == 'storage':
|
||||
value *= 1000
|
||||
args[code] = value
|
||||
return QuotaInformation(**args)
|
||||
|
||||
def getQuotaForLabel(self, label):
|
||||
@ -977,6 +994,23 @@ class AwsAdapter(statemachine.Adapter):
|
||||
return instance
|
||||
return None
|
||||
|
||||
def _listServiceQuotas(self, service_code):
|
||||
with self.quota_service_rate_limiter(
|
||||
self.log.debug, f"Listed {service_code} quotas"):
|
||||
paginator = self.aws_quotas.get_paginator(
|
||||
'list_service_quotas')
|
||||
quotas = {}
|
||||
for page in paginator.paginate(ServiceCode=service_code):
|
||||
for quota in page['Quotas']:
|
||||
quotas[quota['QuotaCode']] = quota['Value']
|
||||
return quotas
|
||||
|
||||
def _listEC2Quotas(self):
|
||||
return self._listServiceQuotas('ec2')
|
||||
|
||||
def _listEBSQuotas(self):
|
||||
return self._listServiceQuotas('ebs')
|
||||
|
||||
def _listInstances(self):
|
||||
with self.non_mutating_rate_limiter(
|
||||
self.log.debug, "Listed instances"):
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Copyright (C) 2018 Red Hat
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
# Copyright 2022-2023 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -73,23 +73,53 @@ class FakeAwsAdapter(AwsAdapter):
|
||||
|
||||
# moto does not mock service-quotas, so we do it ourselves:
|
||||
def _fake_get_service_quota(ServiceCode, QuotaCode, *args, **kwargs):
|
||||
# This is a simple fake that only returns the number
|
||||
# of cores.
|
||||
if self.__quotas is None:
|
||||
return {'Quota': {'Value': 100}}
|
||||
if ServiceCode == 'ec2':
|
||||
qdict = self.__ec2_quotas
|
||||
elif ServiceCode == 'ebs':
|
||||
qdict = self.__ebs_quotas
|
||||
else:
|
||||
return {'Quota': {'Value': self.__quotas.get(QuotaCode)}}
|
||||
raise NotImplementedError(
|
||||
f"Quota code {ServiceCode} not implemented")
|
||||
return {'Quota': {'Value': qdict.get(QuotaCode)}}
|
||||
self.aws_quotas.get_service_quota = _fake_get_service_quota
|
||||
|
||||
def _fake_list_service_quotas(ServiceCode, *args, **kwargs):
|
||||
if ServiceCode == 'ec2':
|
||||
qdict = self.__ec2_quotas
|
||||
elif ServiceCode == 'ebs':
|
||||
qdict = self.__ebs_quotas
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f"Quota code {ServiceCode} not implemented")
|
||||
quotas = []
|
||||
for code, value in qdict.items():
|
||||
quotas.append(
|
||||
{'Value': value, 'QuotaCode': code}
|
||||
)
|
||||
return {'Quotas': quotas}
|
||||
self.aws_quotas.list_service_quotas = _fake_list_service_quotas
|
||||
|
||||
def aws_quotas(quotas):
|
||||
"""Specify a set of AWS quota values for use by a test method.
|
||||
|
||||
def ec2_quotas(quotas):
|
||||
"""Specify a set of AWS EC2 quota values for use by a test method.
|
||||
|
||||
:arg dict quotas: The quota dictionary.
|
||||
"""
|
||||
|
||||
def decorator(test):
|
||||
test.__aws_quotas__ = quotas
|
||||
test.__aws_ec2_quotas__ = quotas
|
||||
return test
|
||||
return decorator
|
||||
|
||||
|
||||
def ebs_quotas(quotas):
|
||||
"""Specify a set of AWS EBS quota values for use by a test method.
|
||||
|
||||
:arg dict quotas: The quota dictionary.
|
||||
"""
|
||||
|
||||
def decorator(test):
|
||||
test.__aws_ebs_quotas__ = quotas
|
||||
return test
|
||||
return decorator
|
||||
|
||||
@ -165,11 +195,8 @@ class TestDriverAws(tests.DBTestCase):
|
||||
self.patch(nodepool.driver.statemachine, 'nodescan', fake_nodescan)
|
||||
test_name = self.id().split('.')[-1]
|
||||
test = getattr(self, test_name)
|
||||
if hasattr(test, '__aws_quotas__'):
|
||||
quotas = getattr(test, '__aws_quotas__')
|
||||
else:
|
||||
quotas = None
|
||||
self.patchAdapter(quotas=quotas)
|
||||
self.patchAdapter(ec2_quotas=getattr(test, '__aws_ec2_quotas__', None),
|
||||
ebs_quotas=getattr(test, '__aws_ebs_quotas__', None))
|
||||
|
||||
def tearDown(self):
|
||||
self.mock_ec2.stop()
|
||||
@ -183,12 +210,27 @@ class TestDriverAws(tests.DBTestCase):
|
||||
kw['instance_profile_arn'] = self.instance_profile_arn
|
||||
return super().setup_config(*args, **kw)
|
||||
|
||||
def patchAdapter(self, quotas=None):
|
||||
def patchAdapter(self, ec2_quotas=None, ebs_quotas=None):
|
||||
default_ec2_quotas = {
|
||||
'L-1216C47A': 100,
|
||||
'L-43DA4232': 100,
|
||||
'L-34B43A08': 100,
|
||||
}
|
||||
default_ebs_quotas = {
|
||||
'L-D18FCD1D': 100.0,
|
||||
'L-7A658B76': 100.0,
|
||||
}
|
||||
if ec2_quotas is None:
|
||||
ec2_quotas = default_ec2_quotas
|
||||
if ebs_quotas is None:
|
||||
ebs_quotas = default_ebs_quotas
|
||||
self.patch(nodepool.driver.aws.adapter, 'AwsAdapter', FakeAwsAdapter)
|
||||
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
|
||||
'_FakeAwsAdapter__testcase', self)
|
||||
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
|
||||
'_FakeAwsAdapter__quotas', quotas)
|
||||
'_FakeAwsAdapter__ec2_quotas', ec2_quotas)
|
||||
self.patch(nodepool.driver.aws.adapter.AwsAdapter,
|
||||
'_FakeAwsAdapter__ebs_quotas', ebs_quotas)
|
||||
|
||||
def requestNode(self, config_path, label):
|
||||
# A helper method to perform a single node request
|
||||
@ -247,7 +289,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
for node in nodes:
|
||||
self.waitForNodeDeletion(node)
|
||||
|
||||
@aws_quotas({
|
||||
@ec2_quotas({
|
||||
'L-1216C47A': 2,
|
||||
'L-43DA4232': 448,
|
||||
'L-34B43A08': 2
|
||||
@ -302,7 +344,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
req3 = self.waitForNodeRequest(req3)
|
||||
self.assertSuccess(req3)
|
||||
|
||||
@aws_quotas({
|
||||
@ec2_quotas({
|
||||
'L-43DA4232': 448,
|
||||
'L-1216C47A': 200,
|
||||
'L-34B43A08': 200
|
||||
@ -355,7 +397,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
req3 = self.waitForNodeRequest(req3)
|
||||
self.assertSuccess(req3)
|
||||
|
||||
@aws_quotas({
|
||||
@ec2_quotas({
|
||||
'L-1216C47A': 1000,
|
||||
'L-43DA4232': 1000,
|
||||
})
|
||||
@ -400,7 +442,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
req2 = self.waitForNodeRequest(req2)
|
||||
self.assertSuccess(req2)
|
||||
|
||||
@aws_quotas({
|
||||
@ec2_quotas({
|
||||
'L-1216C47A': 1000,
|
||||
'L-43DA4232': 1000,
|
||||
})
|
||||
@ -444,8 +486,10 @@ class TestDriverAws(tests.DBTestCase):
|
||||
# Assert that the second request is still being deferred
|
||||
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
|
||||
|
||||
@aws_quotas({
|
||||
@ec2_quotas({
|
||||
'L-1216C47A': 200, # instance
|
||||
})
|
||||
@ebs_quotas({
|
||||
'L-D18FCD1D': 1.0, # gp2 storage (TB)
|
||||
'L-7A658B76': 1.0, # gp3 storage (TB)
|
||||
})
|
||||
|
Loading…
x
Reference in New Issue
Block a user