Add AWS volume quota support
Like the OpenStack driver, this automatically applies volume quota limits if specified in the label configuration. Change-Id: I71c1b95de08dc72cc777099952892de659d45d41
This commit is contained in:
parent
3fa6821437
commit
acb6772c3a
@ -83,6 +83,16 @@ QUOTA_CODES = {
|
|||||||
'hpc': ['L-F7808C92', '']
|
'hpc': ['L-F7808C92', '']
|
||||||
}
|
}
|
||||||
|
|
||||||
|
VOLUME_QUOTA_CODES = {
|
||||||
|
'io1': dict(iops='L-B3A130E6', storage='L-FD252861'),
|
||||||
|
'io2': dict(iops='L-8D977E7E', storage='L-09BD8365'),
|
||||||
|
'sc1': dict(storage='L-17AF77E8'),
|
||||||
|
'gp2': dict(storage='L-D18FCD1D'),
|
||||||
|
'gp3': dict(storage='L-7A658B76'),
|
||||||
|
'standard': dict(storage='L-9CF3C2EB'),
|
||||||
|
'st1': dict(storage='L-82ACEF56'),
|
||||||
|
}
|
||||||
|
|
||||||
CACHE_TTL = 10
|
CACHE_TTL = 10
|
||||||
ON_DEMAND = 0
|
ON_DEMAND = 0
|
||||||
SPOT = 1
|
SPOT = 1
|
||||||
@ -194,9 +204,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
|
|||||||
if instance is None:
|
if instance is None:
|
||||||
return
|
return
|
||||||
self.instance = instance
|
self.instance = instance
|
||||||
self.quota = self.adapter._getQuotaForInstanceType(
|
self.quota = self.adapter.getQuotaForLabel(self.label)
|
||||||
self.instance.instance_type,
|
|
||||||
SPOT if self.label.use_spot else ON_DEMAND)
|
|
||||||
self.state = self.INSTANCE_CREATING
|
self.state = self.INSTANCE_CREATING
|
||||||
|
|
||||||
if self.state == self.INSTANCE_CREATING:
|
if self.state == self.INSTANCE_CREATING:
|
||||||
@ -360,23 +368,31 @@ class AwsAdapter(statemachine.Adapter):
|
|||||||
self._deleteObject(resource.id)
|
self._deleteObject(resource.id)
|
||||||
|
|
||||||
def listInstances(self):
|
def listInstances(self):
|
||||||
|
volumes = {}
|
||||||
|
for volume in self._listVolumes():
|
||||||
|
volumes[volume.volume_id] = volume
|
||||||
for instance in self._listInstances():
|
for instance in self._listInstances():
|
||||||
if instance.state["Name"].lower() == "terminated":
|
if instance.state["Name"].lower() == "terminated":
|
||||||
continue
|
continue
|
||||||
quota = self._getQuotaForInstanceType(
|
quota = self._getQuotaForInstanceType(
|
||||||
instance.instance_type,
|
instance.instance_type,
|
||||||
SPOT if instance.instance_lifecycle == 'spot' else ON_DEMAND)
|
SPOT if instance.instance_lifecycle == 'spot' else ON_DEMAND)
|
||||||
|
for attachment in instance.block_device_mappings:
|
||||||
|
volume = volumes.get(attachment['Ebs']['VolumeId'])
|
||||||
|
quota.add(self._getQuotaForVolume(volume))
|
||||||
yield AwsInstance(self.provider, instance, quota)
|
yield AwsInstance(self.provider, instance, quota)
|
||||||
|
|
||||||
def getQuotaLimits(self):
|
def getQuotaLimits(self):
|
||||||
# Get the instance types that this provider handles
|
# Get the instance and volume types that this provider handles
|
||||||
instance_types = {}
|
instance_types = {}
|
||||||
|
volume_types = set()
|
||||||
for pool in self.provider.pools.values():
|
for pool in self.provider.pools.values():
|
||||||
for label in pool.labels.values():
|
for label in pool.labels.values():
|
||||||
if label.instance_type not in instance_types:
|
if label.instance_type not in instance_types:
|
||||||
instance_types[label.instance_type] = set()
|
instance_types[label.instance_type] = set()
|
||||||
instance_types[label.instance_type].add(
|
instance_types[label.instance_type].add(
|
||||||
SPOT if label.use_spot else ON_DEMAND)
|
SPOT if label.use_spot else ON_DEMAND)
|
||||||
|
volume_types.add(label.volume_type)
|
||||||
args = dict(default=math.inf)
|
args = dict(default=math.inf)
|
||||||
for instance_type in instance_types:
|
for instance_type in instance_types:
|
||||||
for market_type_option in instance_types[instance_type]:
|
for market_type_option in instance_types[instance_type]:
|
||||||
@ -390,18 +406,46 @@ class AwsAdapter(statemachine.Adapter):
|
|||||||
instance_type)
|
instance_type)
|
||||||
continue
|
continue
|
||||||
with self.non_mutating_rate_limiter:
|
with self.non_mutating_rate_limiter:
|
||||||
self.log.debug("Getting quota limits for %s", code)
|
self.log.debug("Getting EC2 quota limits for %s", code)
|
||||||
response = self.aws_quotas.get_service_quota(
|
response = self.aws_quotas.get_service_quota(
|
||||||
ServiceCode='ec2',
|
ServiceCode='ec2',
|
||||||
QuotaCode=code,
|
QuotaCode=code,
|
||||||
)
|
)
|
||||||
args[code] = response['Quota']['Value']
|
args[code] = response['Quota']['Value']
|
||||||
|
for volume_type in volume_types:
|
||||||
|
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type)
|
||||||
|
if not vquota_codes:
|
||||||
|
self.log.warning(
|
||||||
|
"Unknown quota code for volume type: %s",
|
||||||
|
volume_type)
|
||||||
|
continue
|
||||||
|
for resource, code in vquota_codes.items():
|
||||||
|
if code in args:
|
||||||
|
continue
|
||||||
|
with self.non_mutating_rate_limiter:
|
||||||
|
self.log.debug("Getting EBS quota limits for %s", code)
|
||||||
|
response = self.aws_quotas.get_service_quota(
|
||||||
|
ServiceCode='ebs',
|
||||||
|
QuotaCode=code,
|
||||||
|
)
|
||||||
|
value = response['Quota']['Value']
|
||||||
|
# Unit mismatch: storage limit is in TB, but usage
|
||||||
|
# is in GB. Translate the limit to GB.
|
||||||
|
if resource == 'storage':
|
||||||
|
value *= 1000
|
||||||
|
args[code] = value
|
||||||
return QuotaInformation(**args)
|
return QuotaInformation(**args)
|
||||||
|
|
||||||
def getQuotaForLabel(self, label):
|
def getQuotaForLabel(self, label):
|
||||||
return self._getQuotaForInstanceType(
|
quota = self._getQuotaForInstanceType(
|
||||||
label.instance_type,
|
label.instance_type,
|
||||||
SPOT if label.use_spot else ON_DEMAND)
|
SPOT if label.use_spot else ON_DEMAND)
|
||||||
|
if label.volume_type:
|
||||||
|
quota.add(self._getQuotaForVolumeType(
|
||||||
|
label.volume_type,
|
||||||
|
storage=label.volume_size,
|
||||||
|
iops=label.iops))
|
||||||
|
return quota
|
||||||
|
|
||||||
def uploadImage(self, provider_image, image_name, filename,
|
def uploadImage(self, provider_image, image_name, filename,
|
||||||
image_format, metadata, md5, sha256):
|
image_format, metadata, md5, sha256):
|
||||||
@ -788,6 +832,26 @@ class AwsAdapter(statemachine.Adapter):
|
|||||||
args = dict(cores=cores, ram=ram, instances=1)
|
args = dict(cores=cores, ram=ram, instances=1)
|
||||||
if code:
|
if code:
|
||||||
args[code] = vcpus
|
args[code] = vcpus
|
||||||
|
|
||||||
|
return QuotaInformation(**args)
|
||||||
|
|
||||||
|
def _getQuotaForVolume(self, volume):
|
||||||
|
volume_type = volume.volume_type
|
||||||
|
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type, {})
|
||||||
|
args = {}
|
||||||
|
if 'iops' in vquota_codes and getattr(volume, 'iops', None):
|
||||||
|
args[vquota_codes['iops']] = volume.iops
|
||||||
|
if 'storage' in vquota_codes and getattr(volume, 'size', None):
|
||||||
|
args[vquota_codes['storage']] = volume.size
|
||||||
|
return QuotaInformation(**args)
|
||||||
|
|
||||||
|
def _getQuotaForVolumeType(self, volume_type, storage=None, iops=None):
|
||||||
|
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type, {})
|
||||||
|
args = {}
|
||||||
|
if 'iops' in vquota_codes and iops is not None:
|
||||||
|
args[vquota_codes['iops']] = iops
|
||||||
|
if 'storage' in vquota_codes and storage is not None:
|
||||||
|
args[vquota_codes['storage']] = storage
|
||||||
return QuotaInformation(**args)
|
return QuotaInformation(**args)
|
||||||
|
|
||||||
# This method is wrapped with an LRU cache in the constructor.
|
# This method is wrapped with an LRU cache in the constructor.
|
||||||
@ -900,7 +964,14 @@ class AwsAdapter(statemachine.Adapter):
|
|||||||
def _completeCreateInstance(self, future):
|
def _completeCreateInstance(self, future):
|
||||||
if not future.done():
|
if not future.done():
|
||||||
return None
|
return None
|
||||||
|
try:
|
||||||
return future.result()
|
return future.result()
|
||||||
|
except botocore.exceptions.ClientError as error:
|
||||||
|
if error.response['Error']['Code'] == 'VolumeLimitExceeded':
|
||||||
|
# Re-raise as a quota exception so that the
|
||||||
|
# statemachine driver resets quota.
|
||||||
|
raise exceptions.QuotaException(str(error))
|
||||||
|
raise
|
||||||
|
|
||||||
def _createInstance(self, label, image_external_id,
|
def _createInstance(self, label, image_external_id,
|
||||||
tags, hostname, log):
|
tags, hostname, log):
|
||||||
|
42
nodepool/tests/fixtures/aws/aws-volume-quota.yaml
vendored
Normal file
42
nodepool/tests/fixtures/aws/aws-volume-quota.yaml
vendored
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
zookeeper-servers:
|
||||||
|
- host: {zookeeper_host}
|
||||||
|
port: {zookeeper_port}
|
||||||
|
chroot: {zookeeper_chroot}
|
||||||
|
|
||||||
|
zookeeper-tls:
|
||||||
|
ca: {zookeeper_ca}
|
||||||
|
cert: {zookeeper_cert}
|
||||||
|
key: {zookeeper_key}
|
||||||
|
|
||||||
|
labels:
|
||||||
|
- name: volume-gp2
|
||||||
|
- name: volume-gp3
|
||||||
|
|
||||||
|
providers:
|
||||||
|
- name: ec2-us-west-2
|
||||||
|
driver: aws
|
||||||
|
region-name: us-west-2
|
||||||
|
cloud-images:
|
||||||
|
- name: ubuntu1404
|
||||||
|
image-id: ami-1e749f67
|
||||||
|
username: ubuntu
|
||||||
|
pools:
|
||||||
|
- name: main
|
||||||
|
subnet-id: {subnet_id}
|
||||||
|
security-group-id: {security_group_id}
|
||||||
|
node-attributes:
|
||||||
|
key1: value1
|
||||||
|
key2: value2
|
||||||
|
labels:
|
||||||
|
- name: volume-gp2
|
||||||
|
cloud-image: ubuntu1404
|
||||||
|
instance-type: t3.medium
|
||||||
|
key-name: zuul
|
||||||
|
volume-type: gp2
|
||||||
|
volume-size: 1000
|
||||||
|
- name: volume-gp3
|
||||||
|
cloud-image: ubuntu1404
|
||||||
|
instance-type: t3.medium
|
||||||
|
key-name: zuul
|
||||||
|
volume-type: gp3
|
||||||
|
volume-size: 1000
|
@ -438,6 +438,65 @@ class TestDriverAws(tests.DBTestCase):
|
|||||||
# Assert that the second request is still being deferred
|
# Assert that the second request is still being deferred
|
||||||
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
|
req2 = self.waitForNodeRequest(req2, (zk.REQUESTED,))
|
||||||
|
|
||||||
|
@aws_quotas({
|
||||||
|
'L-1216C47A': 200, # instance
|
||||||
|
'L-D18FCD1D': 1.0, # gp2 storage (TB)
|
||||||
|
'L-7A658B76': 1.0, # gp3 storage (TB)
|
||||||
|
})
|
||||||
|
def test_aws_volume_quota(self):
|
||||||
|
# Test volume quotas
|
||||||
|
|
||||||
|
# Moto doesn't correctly pass through iops when creating
|
||||||
|
# instances, so we can't test volume types that require iops.
|
||||||
|
# Therefore in this test we only cover storage quotas.
|
||||||
|
configfile = self.setup_config('aws/aws-volume-quota.yaml')
|
||||||
|
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||||
|
self.startPool(pool)
|
||||||
|
|
||||||
|
# Create an gp2 request
|
||||||
|
req1 = zk.NodeRequest()
|
||||||
|
req1.state = zk.REQUESTED
|
||||||
|
req1.node_types.append('volume-gp2')
|
||||||
|
self.zk.storeNodeRequest(req1)
|
||||||
|
self.log.debug("Waiting for request %s", req1.id)
|
||||||
|
req1 = self.waitForNodeRequest(req1)
|
||||||
|
node1 = self.assertSuccess(req1)
|
||||||
|
|
||||||
|
# Create a second gp2 node request; this should be
|
||||||
|
# over quota so it won't be fulfilled.
|
||||||
|
req2 = zk.NodeRequest()
|
||||||
|
req2.state = zk.REQUESTED
|
||||||
|
req2.node_types.append('volume-gp2')
|
||||||
|
self.zk.storeNodeRequest(req2)
|
||||||
|
self.log.debug("Waiting for request %s", req2.id)
|
||||||
|
req2 = self.waitForNodeRequest(req2, (zk.PENDING,))
|
||||||
|
|
||||||
|
# Make sure we're paused while we attempt to fulfill the
|
||||||
|
# second request.
|
||||||
|
pool_worker = pool.getPoolWorkers('ec2-us-west-2')
|
||||||
|
for _ in iterate_timeout(30, Exception, 'paused handler'):
|
||||||
|
if pool_worker[0].paused_handlers:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Release the first node so that the second can be fulfilled.
|
||||||
|
node1.state = zk.USED
|
||||||
|
self.zk.storeNode(node1)
|
||||||
|
self.waitForNodeDeletion(node1)
|
||||||
|
|
||||||
|
# Make sure the second high node exists now.
|
||||||
|
req2 = self.waitForNodeRequest(req2)
|
||||||
|
self.assertSuccess(req2)
|
||||||
|
|
||||||
|
# Create a gp3 node request which should succeed even
|
||||||
|
# though we're at quota for gp2 (but not gp3).
|
||||||
|
req3 = zk.NodeRequest()
|
||||||
|
req3.state = zk.REQUESTED
|
||||||
|
req3.node_types.append('volume-gp3')
|
||||||
|
self.zk.storeNodeRequest(req3)
|
||||||
|
self.log.debug("Waiting for request %s", req3.id)
|
||||||
|
req3 = self.waitForNodeRequest(req3)
|
||||||
|
self.assertSuccess(req3)
|
||||||
|
|
||||||
def test_aws_node(self):
|
def test_aws_node(self):
|
||||||
req = self.requestNode('aws/aws.yaml', 'ubuntu1404')
|
req = self.requestNode('aws/aws.yaml', 'ubuntu1404')
|
||||||
node = self.assertSuccess(req)
|
node = self.assertSuccess(req)
|
||||||
|
@ -0,0 +1,6 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
The AWS driver now supports volume quota. It will automatically
|
||||||
|
register the limits from the cloud and ensure that labels that
|
||||||
|
specify EBS volume attributes stay under the limit.
|
Loading…
x
Reference in New Issue
Block a user