Add support for AWS dedicated hosts

This adds limited support for dedicated hosts in AWS.  This is
primarily to enable users to launch certain instance types (such
as macos) which can only be launched on dedicated hosts.

While AWS allows multiple instances to run on a dedicated host,
this change to nodepool does not implement that.  Instead, it
launches only a single instance on a dedicated host.  The lifecycle
of the two are bound, so that when the nodepool node is deleted,
both are removed from AWS.  Supporting multiple instances on
a dedicated host would be significantly more complicated.

This change introduces a new way of tracking multiple resources for
a single node.  Since we need to create two resources (the host and
the instance), we need to handle the case where the host allocation
fails before the instance is created.  Nodepool relies on the
"external id" to handle cleanup in case of failure, but there are
two extenal ids here.  We could store the id of the host first, then
switch to the id of the instance later (AWS makes this easy by
prefixing their ids by type, eg "h-..." and "i-...").  However, this
change implements a more generalized solution:

This change updates the external_id for AWS nodes from a string to
a dictionary.  The dict holds up to two keys: 'host' and 'instance'.
The ZK code path handles this transparently.  The status commands
coerce the value to a string before printing them (and a test is
added for this).  Zuul will need an update to stringify the value
on the Nodes page.

Here are some alternatives:
* Store the host value first, then the instance later
  (This only works for this specific case since we can disambiguate them)
* Serialize the value to JSON inside the adapter
  (This is unecessary work, but simple)
* Rely on the leaked resource cleanup
  (This will clean up resources more slowly)

The application of quota is not entirely clear from documentation
at the moment.  We know that there are quotas for specific dedicated
host types.  There are also vcpu and other quotas.  What is not
clear is if vcpu quotas apply to dedicated hosts, and if instance
quotas apply to instances launched on dedicated hosts.

For the moment, we assume that only dedicated host quotas apply
(since that is the least likely to cause us to under-provision).  So
we count a node request against the dedicated host quota if it
involves a dedicated host.

If we later find out that we should also count the instance that we
launch on the dedicated host against the account's instance quota,
it should be a simple matter to merge these quota calculations rather
than use one or the other as we do now.  The points where this should
happen are noted in code comments.

Dedicated hosts require an availability-zone setting, so this is now
added to the pool configuration in general (it can be used for on-demand
instances as well).

References to "run_instance" are updated to "run_instances" in the aws
driver tests for consistency and clarity.

The statemachine driver now logs the error from the cloud provider when
it encounters a quota-related error.  In the case of AWS, this provides
useful information about which quota was hit.

Change-Id: I651327a8ace3b8921588a5ec9490f02fb7e685f7
Depends-On: https://review.opendev.org/c/zuul/zuul/+/921346
This commit is contained in:
James E. Blair 2024-06-03 15:21:56 -07:00
parent 69cbf03c1a
commit 6713bba9b7
7 changed files with 664 additions and 79 deletions

View File

@ -501,6 +501,13 @@ Selecting the ``aws`` driver adds the following options to the
A unique name within the provider for this pool of resources.
.. attr:: availability-zone
:type: str
If provided, instances launched from this pool will be
assigned to the specified availibility zone. If omitted, AWS
will select from the available zones.
.. attr:: priority
:type: int
:default: 100
@ -648,6 +655,23 @@ Selecting the ``aws`` driver adds the following options to the
:attr:`providers.[aws].diskimages`. Mutually exclusive
with :attr:`providers.[aws].pools.labels.cloud-image`
.. attr:: dedicated-host
:type: bool
If set to ``true``, an AWS dedicated host will be
allocated for the instance. Nodepool only supports
running a single instance on dedicated hosts, so it will
treat the host and the instance launched on it as a
matched pair. The host will not be used for any other
instances, and will be released when the associated
Nodepool node is deleted.
If this option is set, the
:attr:`providers.[aws].pools.labels.use-spot` option is
not available, and
:attr:`providers.[aws].pools.availability-zone`
option is required.
.. attr:: ebs-optimized
:type: bool
:default: False

View File

@ -64,7 +64,7 @@ def tag_list_to_dict(taglist):
# "Running On-Demand High Memory instances" was determined from
# https://aws.amazon.com/ec2/instance-types/high-memory/
QUOTA_CODES = {
INSTANCE_QUOTA_CODES = {
# INSTANCE FAMILY: [ON-DEMAND, SPOT]
'a': ['L-1216C47A', 'L-34B43A08'],
'c': ['L-1216C47A', 'L-34B43A08'],
@ -87,6 +87,119 @@ QUOTA_CODES = {
'hpc': ['L-F7808C92', '']
}
HOST_QUOTA_CODES = {
'a1': 'L-949445B0',
'c3': 'L-8D142A2E',
'c4': 'L-E4BF28E0',
'c5': 'L-81657574',
'c5a': 'L-03F01FD8',
'c5d': 'L-C93F66A2',
'c5n': 'L-20F13EBD',
'c6a': 'L-D75D2E84',
'c6g': 'L-A749B537',
'c6gd': 'L-545AED39',
'c6gn': 'L-5E3A299D',
'c6i': 'L-5FA3355A',
'c6id': 'L-1BBC5241',
'c6in': 'L-6C2C40CC',
'c7a': 'L-698B67E5',
'c7g': 'L-13B8FCE8',
'c7gd': 'L-EF58B059',
'c7gn': 'L-97677CE3',
'c7i': 'L-587AA6E3',
'd2': 'L-8B27377A',
'dl1': 'L-AD667A3D',
'f1': 'L-5C4CD236',
'g3': 'L-DE82EABA',
'g3s': 'L-9675FDCD',
'g4ad': 'L-FD8E9B9A',
'g4dn': 'L-CAE24619',
'g5': 'L-A6E7FE5E',
'g5g': 'L-4714FFEA',
'g6': 'L-B88B9D6B',
'gr6': 'L-E68C3AFF',
'h1': 'L-84391ECC',
'i2': 'L-6222C1B6',
'i3': 'L-8E60B0B1',
'i3en': 'L-77EE2B11',
'i4g': 'L-F62CBADB',
'i4i': 'L-0300530D',
'im4gn': 'L-93155D6F',
'inf': 'L-5480EFD2',
'inf2': 'L-E5BCF7B5',
'is4gen': 'L-CB4F5825',
'm3': 'L-3C82F907',
'm4': 'L-EF30B25E',
'm5': 'L-8B7BF662',
'm5a': 'L-B10F70D6',
'm5ad': 'L-74F41837',
'm5d': 'L-8CCBD91B',
'm5dn': 'L-DA07429F',
'm5n': 'L-24D7D4AD',
'm5zn': 'L-BD9BD803',
'm6a': 'L-80F2B67F',
'm6g': 'L-D50A37FA',
'm6gd': 'L-84FB37AA',
'm6i': 'L-D269BEFD',
'm6id': 'L-FDB0A352',
'm6idn': 'L-9721EDD9',
'm6in': 'L-D037CF10',
'm7a': 'L-4740F819',
'm7g': 'L-9126620E',
'm7gd': 'L-F8516154',
'm7i': 'L-30E31217',
'mac1': 'L-A8448DC5',
'mac2': 'L-5D8DADF5',
'mac2-m2': 'L-B90B5B66',
'mac2-m2pro': 'L-14F120D1',
'p2': 'L-2753CF59',
'p3': 'L-A0A19F79',
'p3dn': 'L-B601B3B6',
'p4d': 'L-86A789C3',
'p5': 'L-5136197D',
'r3': 'L-B7208018',
'r4': 'L-313524BA',
'r5': 'L-EA4FD6CF',
'r5a': 'L-8FE30D52',
'r5ad': 'L-EC7178B6',
'r5b': 'L-A2D59C67',
'r5d': 'L-8814B54F',
'r5dn': 'L-4AB14223',
'r5n': 'L-52EF324A',
'r6a': 'L-BC1589C5',
'r6g': 'L-B6D6065D',
'r6gd': 'L-EF284EFB',
'r6i': 'L-F13A970A',
'r6id': 'L-B89271A9',
'r6idn': 'L-C4EABC2C',
'r6in': 'L-EA99608B',
'r7a': 'L-4D15192B',
'r7g': 'L-67B8B4C7',
'r7gd': 'L-01137DCE',
'r7i': 'L-55E05032',
'r7iz': 'L-BC9FCC71',
't3': 'L-1586174D',
'trn1': 'L-5E4FB836',
'trn1n': 'L-39926A58',
'u-12tb1': 'L-D6994875',
'u-18tb1': 'L-5F7FD336',
'u-24tb1': 'L-FACBE655',
'u-3tb1': 'L-7F5506AB',
'u-6tb1': 'L-89870E8E',
'u-9tb1': 'L-98E1FFAC',
'u7in-16tb': 'L-75B9BECB',
'u7in-24tb': 'L-CA51381E',
'u7in-32tb': 'L-9D28191F',
'vt1': 'L-A68CFBF7',
'x1': 'L-DE3D9563',
'x1e': 'L-DEF8E115',
'x2gd': 'L-5CC9EA82',
'x2idn': 'L-A84ABF80',
'x2iedn': 'L-D0AA08B1',
'x2iezn': 'L-888B4496',
'z1d': 'L-F035E935',
}
VOLUME_QUOTA_CODES = {
'io1': dict(iops='L-B3A130E6', storage='L-FD252861'),
'io2': dict(iops='L-8D977E7E', storage='L-09BD8365'),
@ -104,9 +217,13 @@ SPOT = 1
class AwsInstance(statemachine.Instance):
def __init__(self, provider, instance, quota):
def __init__(self, provider, instance, host, quota):
super().__init__()
self.external_id = instance['InstanceId']
self.external_id = dict()
if instance:
self.external_id['instance'] = instance['InstanceId']
if host:
self.external_id['host'] = host['HostId']
self.metadata = tag_list_to_dict(instance.get('Tags'))
self.private_ipv4 = instance.get('PrivateIpAddress')
self.private_ipv6 = None
@ -131,6 +248,7 @@ class AwsInstance(statemachine.Instance):
class AwsResource(statemachine.Resource):
TYPE_HOST = 'host'
TYPE_INSTANCE = 'instance'
TYPE_AMI = 'ami'
TYPE_SNAPSHOT = 'snapshot'
@ -143,24 +261,52 @@ class AwsResource(statemachine.Resource):
class AwsDeleteStateMachine(statemachine.StateMachine):
VM_DELETING = 'deleting vm'
HOST_RELEASING_START = 'start releasing host'
HOST_RELEASING = 'releasing host'
INSTANCE_DELETING_START = 'start deleting instance'
INSTANCE_DELETING = 'deleting instance'
COMPLETE = 'complete'
def __init__(self, adapter, external_id, log):
self.log = log
super().__init__()
self.adapter = adapter
# Backwards compatible for old nodes where external_id is a
# str
if type(external_id) is str:
external_id = dict(instance=external_id)
self.external_id = external_id
def advance(self):
if self.state == self.START:
self.instance = self.adapter._deleteInstance(
self.external_id, self.log)
self.state = self.VM_DELETING
if 'instance' in self.external_id:
self.state = self.INSTANCE_DELETING_START
elif 'host' in self.external_id:
self.state = self.HOST_RELEASING_START
else:
self.state = self.COMPLETE
if self.state == self.VM_DELETING:
if self.state == self.INSTANCE_DELETING_START:
self.instance = self.adapter._deleteInstance(
self.external_id['instance'], self.log)
self.state = self.INSTANCE_DELETING
if self.state == self.INSTANCE_DELETING:
self.instance = self.adapter._refreshDelete(self.instance)
if self.instance is None:
if 'host' in self.external_id:
self.state = self.HOST_RELEASING_START
else:
self.state = self.COMPLETE
if self.state == self.HOST_RELEASING_START:
self.host = self.adapter._releaseHost(
self.external_id['host'], self.log)
self.state = self.HOST_RELEASING
if self.state == self.HOST_RELEASING:
self.host = self.adapter._refreshDelete(self.host)
if self.host is None:
self.state = self.COMPLETE
if self.state == self.COMPLETE:
@ -168,6 +314,10 @@ class AwsDeleteStateMachine(statemachine.StateMachine):
class AwsCreateStateMachine(statemachine.StateMachine):
HOST_ALLOCATING_START = 'start allocating host'
HOST_ALLOCATING_SUBMIT = 'submit allocating host'
HOST_ALLOCATING = 'allocating host'
INSTANCE_CREATING_START = 'start creating instance'
INSTANCE_CREATING_SUBMIT = 'submit creating instance'
INSTANCE_CREATING = 'creating instance'
COMPLETE = 'complete'
@ -194,12 +344,50 @@ class AwsCreateStateMachine(statemachine.StateMachine):
self.public_ipv6 = None
self.nic = None
self.instance = None
self.host = None
self.external_id = dict()
self.dedicated_host_id = None
def advance(self):
if self.state == self.START:
if self.label.dedicated_host:
self.state = self.HOST_ALLOCATING_START
else:
self.state = self.INSTANCE_CREATING_START
if self.state == self.HOST_ALLOCATING_START:
self.host_create_future = self.adapter._submitAllocateHost(
self.label,
self.tags, self.hostname, self.log)
self.state = self.HOST_ALLOCATING_SUBMIT
if self.state == self.HOST_ALLOCATING_SUBMIT:
host = self.adapter._completeAllocateHost(self.host_create_future)
if host is None:
return
self.host = host
self.external_id['host'] = host['HostId']
self.state = self.HOST_ALLOCATING
if self.state == self.HOST_ALLOCATING:
self.host = self.adapter._refresh(self.host)
state = self.host['State'].lower()
if state == 'available':
self.dedicated_host_id = self.host['HostId']
self.state = self.INSTANCE_CREATING_START
elif state in [
'permanent-failure', 'released',
'released-permanent-failure']:
raise exceptions.LaunchStatusException(
f"Host in {state} state")
else:
return
if self.state == self.INSTANCE_CREATING_START:
self.create_future = self.adapter._submitCreateInstance(
self.label, self.image_external_id,
self.tags, self.hostname, self.log)
self.tags, self.hostname, self.dedicated_host_id, self.log)
self.state = self.INSTANCE_CREATING_SUBMIT
if self.state == self.INSTANCE_CREATING_SUBMIT:
@ -207,7 +395,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
if instance is None:
return
self.instance = instance
self.external_id = instance['InstanceId']
self.external_id['instance'] = instance['InstanceId']
self.quota = self.adapter.getQuotaForLabel(self.label)
self.state = self.INSTANCE_CREATING
@ -225,7 +413,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
if self.state == self.COMPLETE:
self.complete = True
return AwsInstance(self.adapter.provider, self.instance,
self.quota)
self.host, self.quota)
class AwsAdapter(statemachine.Adapter):
@ -271,7 +459,8 @@ class AwsAdapter(statemachine.Adapter):
# by larger batches. That strikes a balance between
# responsiveness and efficiency. Reducing the overall number
# of requests leaves more time for create instance calls.
self.delete_queue = queue.Queue()
self.delete_host_queue = queue.Queue()
self.delete_instance_queue = queue.Queue()
self.delete_thread = threading.Thread(target=self._deleteThread)
self.delete_thread.daemon = True
self.delete_thread.start()
@ -311,6 +500,9 @@ class AwsAdapter(statemachine.Adapter):
# asynchronously update the cached values, meanwhile returning
# the previous cached data if available. This means every
# call after the first one is instantaneous.
self._listHosts = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listHosts)
self._listInstances = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listInstances)
@ -357,6 +549,16 @@ class AwsAdapter(statemachine.Adapter):
def listResources(self):
self._tagSnapshots()
self._tagAmis()
for host in self._listHosts():
try:
if host['State'].lower() in [
"released", "released-permanent-failure"]:
continue
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(host.get('Tags')),
AwsResource.TYPE_HOST,
host['HostId'])
for instance in self._listInstances():
try:
if instance['State']['Name'].lower() == "terminated":
@ -403,6 +605,8 @@ class AwsAdapter(statemachine.Adapter):
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == AwsResource.TYPE_HOST:
self._releaseHost(resource.id, immediate=True)
if resource.type == AwsResource.TYPE_INSTANCE:
self._deleteInstance(resource.id, immediate=True)
if resource.type == AwsResource.TYPE_VOLUME:
@ -421,10 +625,25 @@ class AwsAdapter(statemachine.Adapter):
for instance in self._listInstances():
if instance['State']["Name"].lower() == "terminated":
continue
quota = self._getQuotaForInstanceType(
instance['InstanceType'],
SPOT if instance.get('InstanceLifecycle') == 'spot'
else ON_DEMAND)
# For now, we are optimistically assuming that when an
# instance is launched on a dedicated host, it is not
# counted against instance quota. That may be overly
# optimistic. If it is, then we will merge the two quotas
# below rather than switch.
# Additionally, we are using the instance as a proxy for
# the host. It would be more correct to also list hosts
# here to include hosts with no instances. But since our
# support for dedicated hosts is currently 1:1 with
# instances, this should be sufficient.
if instance['Placement'].get('HostId'):
# Dedicated host
quota = self._getQuotaForHostType(
instance['InstanceType'])
else:
quota = self._getQuotaForInstanceType(
instance['InstanceType'],
SPOT if instance.get('InstanceLifecycle') == 'spot'
else ON_DEMAND)
for attachment in instance['BlockDeviceMappings']:
volume_id = attachment['Ebs']['VolumeId']
volume = volumes.get(volume_id)
@ -435,20 +654,24 @@ class AwsAdapter(statemachine.Adapter):
continue
quota.add(self._getQuotaForVolume(volume))
yield AwsInstance(self.provider, instance, quota)
yield AwsInstance(self.provider, instance, None, quota)
def getQuotaLimits(self):
# Get the instance and volume types that this provider handles
instance_types = {}
host_types = set()
volume_types = set()
ec2_quotas = self._listEC2Quotas()
ebs_quotas = self._listEBSQuotas()
for pool in self.provider.pools.values():
for label in pool.labels.values():
if label.instance_type not in instance_types:
instance_types[label.instance_type] = set()
instance_types[label.instance_type].add(
SPOT if label.use_spot else ON_DEMAND)
if label.dedicated_host:
host_types.add(label.instance_type)
else:
if label.instance_type not in instance_types:
instance_types[label.instance_type] = set()
instance_types[label.instance_type].add(
SPOT if label.use_spot else ON_DEMAND)
if label.volume_type:
volume_types.add(label.volume_type)
args = dict(default=math.inf)
@ -466,6 +689,18 @@ class AwsAdapter(statemachine.Adapter):
code, instance_type)
continue
args[code] = ec2_quotas[code]
for host_type in host_types:
code = self._getQuotaCodeForHostType(host_type)
if code in args:
continue
if not code:
continue
if code not in ec2_quotas:
self.log.warning(
"AWS quota code %s for host type: %s not known",
code, host_type)
continue
args[code] = ec2_quotas[code]
for volume_type in volume_types:
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type)
if not vquota_codes:
@ -490,9 +725,18 @@ class AwsAdapter(statemachine.Adapter):
return QuotaInformation(**args)
def getQuotaForLabel(self, label):
quota = self._getQuotaForInstanceType(
label.instance_type,
SPOT if label.use_spot else ON_DEMAND)
# For now, we are optimistically assuming that when an
# instance is launched on a dedicated host, it is not counted
# against instance quota. That may be overly optimistic. If
# it is, then we will merge the two quotas below rather than
# switch.
if label.dedicated_host:
quota = self._getQuotaForHostType(
label.instance_type)
else:
quota = self._getQuotaForInstanceType(
label.instance_type,
SPOT if label.use_spot else ON_DEMAND)
if label.volume_type:
quota.add(self._getQuotaForVolumeType(
label.volume_type,
@ -926,7 +1170,7 @@ class AwsAdapter(statemachine.Adapter):
m = self.instance_key_re.match(instance_type)
if m:
key = m.group(1)
code = QUOTA_CODES.get(key)
code = INSTANCE_QUOTA_CODES.get(key)
if code:
return code[market_type_option]
self.log.warning(
@ -962,6 +1206,28 @@ class AwsAdapter(statemachine.Adapter):
return QuotaInformation(**args)
host_key_re = re.compile(r'([a-z\d\-]+)\..*')
def _getQuotaCodeForHostType(self, host_type):
m = self.host_key_re.match(host_type)
if m:
key = m.group(1)
code = HOST_QUOTA_CODES.get(key)
if code:
return code
self.log.warning(
"Unknown quota code for host type: %s",
host_type)
return None
def _getQuotaForHostType(self, host_type):
code = self._getQuotaCodeForHostType(host_type)
args = dict(instances=1)
if code:
args[code] = 1
return QuotaInformation(**args)
def _getQuotaForVolume(self, volume):
volume_type = volume['VolumeType']
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type, {})
@ -990,20 +1256,33 @@ class AwsAdapter(statemachine.Adapter):
InstanceTypes=[instance_type])
def _refresh(self, obj):
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
return instance
if 'InstanceId' in obj:
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
return instance
elif 'HostId' in obj:
for host in self._listHosts():
if host['HostId'] == obj['HostId']:
return host
return obj
def _refreshDelete(self, obj):
if obj is None:
return obj
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
if instance['State']['Name'].lower() == "terminated":
return None
return instance
if 'InstanceId' in obj:
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
if instance['State']['Name'].lower() == "terminated":
return None
return instance
elif 'HostId' in obj:
for host in self._listHosts():
if host['HostId'] == obj['HostId']:
if host['State'].lower() in [
'released', 'released-permanent-failure']:
return None
return host
return None
def _listServiceQuotas(self, service_code):
@ -1023,6 +1302,15 @@ class AwsAdapter(statemachine.Adapter):
def _listEBSQuotas(self):
return self._listServiceQuotas('ebs')
def _listHosts(self):
with self.non_mutating_rate_limiter(
self.log.debug, "Listed hosts"):
paginator = self.ec2_client.get_paginator('describe_hosts')
hosts = []
for page in paginator.paginate():
hosts.extend(page['Hosts'])
return hosts
def _listInstances(self):
with self.non_mutating_rate_limiter(
self.log.debug, "Listed instances"):
@ -1115,12 +1403,63 @@ class AwsAdapter(statemachine.Adapter):
resp = self.ec2_client.describe_images(ImageIds=[image_id])
return resp['Images'][0]
def _submitAllocateHost(self, label,
tags, hostname, log):
return self.create_executor.submit(
self._allocateHost,
label,
tags, hostname, log)
def _completeAllocateHost(self, future):
if not future.done():
return None
try:
return future.result()
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'HostLimitExceeded':
# Re-raise as a quota exception so that the
# statemachine driver resets quota.
raise exceptions.QuotaException(str(error))
if (error.response['Error']['Code'] ==
'InsufficientInstanceCapacity'):
# Re-raise as CapacityException so it would have
# "error.capacity" statsd_key, which can be handled
# differently than "error.unknown"
raise exceptions.CapacityException(str(error))
raise
def _allocateHost(self, label,
tags, hostname, log):
args = dict(
AutoPlacement='off',
AvailabilityZone=label.pool.az,
InstanceType=label.instance_type,
Quantity=1,
HostRecovery='off',
HostMaintenance='off',
TagSpecifications=[
{
'ResourceType': 'dedicated-host',
'Tags': tag_dict_to_list(tags),
},
]
)
with self.rate_limiter(log.debug, "Allocated host"):
log.debug("Allocating host %s", hostname)
resp = self.ec2_client.allocate_hosts(**args)
host_ids = resp['HostIds']
log.debug("Allocated host %s as host %s",
hostname, host_ids[0])
return dict(HostId=host_ids[0],
State='pending')
def _submitCreateInstance(self, label, image_external_id,
tags, hostname, log):
tags, hostname, dedicated_host_id, log):
return self.create_executor.submit(
self._createInstance,
label, image_external_id,
tags, hostname, log)
tags, hostname, dedicated_host_id, log)
def _completeCreateInstance(self, future):
if not future.done():
@ -1141,7 +1480,7 @@ class AwsAdapter(statemachine.Adapter):
raise
def _createInstance(self, label, image_external_id,
tags, hostname, log):
tags, hostname, dedicated_host_id, log):
if image_external_id:
image_id = image_external_id
else:
@ -1240,12 +1579,29 @@ class AwsAdapter(statemachine.Adapter):
'HttpEndpoint': 'enabled',
}
if dedicated_host_id:
placement = args.setdefault('Placement', {})
placement.update({
'Tenancy': 'host',
'HostId': dedicated_host_id,
'Affinity': 'host',
})
if label.pool.az:
placement = args.setdefault('Placement', {})
placement['AvailabilityZone'] = label.pool.az
with self.rate_limiter(log.debug, "Created instance"):
log.debug("Creating VM %s", hostname)
resp = self.ec2_client.run_instances(**args)
instances = resp['Instances']
log.debug("Created VM %s as instance %s",
hostname, instances[0]['InstanceId'])
if dedicated_host_id:
log.debug("Created VM %s as instance %s on host %s",
hostname, instances[0]['InstanceId'],
dedicated_host_id)
else:
log.debug("Created VM %s as instance %s",
hostname, instances[0]['InstanceId'])
return instances[0]
def _deleteThread(self):
@ -1256,28 +1612,62 @@ class AwsAdapter(statemachine.Adapter):
self.log.exception("Error in delete thread:")
time.sleep(5)
def _deleteThreadInner(self):
@staticmethod
def _getBatch(the_queue):
records = []
try:
records.append(self.delete_queue.get(block=True, timeout=10))
records.append(the_queue.get(block=True, timeout=10))
except queue.Empty:
return
return []
while True:
try:
records.append(self.delete_queue.get(block=False))
records.append(the_queue.get(block=False))
except queue.Empty:
break
# The terminate call has a limit of 1k, but AWS recommends
# smaller batches. We limit to 50 here.
if len(records) >= 50:
break
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Deleting instance {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Deleted {count} instances"):
self.ec2_client.terminate_instances(InstanceIds=ids)
return records
def _deleteThreadInner(self):
records = self._getBatch(self.delete_instance_queue)
if records:
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Deleting instance {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Deleted {count} instances"):
self.ec2_client.terminate_instances(InstanceIds=ids)
records = self._getBatch(self.delete_host_queue)
if records:
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Releasing host {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Released {count} hosts"):
self.ec2_client.release_hosts(HostIds=ids)
def _releaseHost(self, external_id, log=None, immediate=False):
if log is None:
log = self.log
for host in self._listHosts():
if host['HostId'] == external_id:
break
else:
log.warning(f"Host not found when releasing {external_id}")
return None
if immediate:
with self.rate_limiter(log.debug, "Released host"):
log.debug(f"Deleting host {external_id}")
self.ec2_client.release_hosts(
HostIds=[host['HostId']])
else:
self.delete_host_queue.put((external_id, log))
return host
def _deleteInstance(self, external_id, log=None, immediate=False):
if log is None:
@ -1294,7 +1684,7 @@ class AwsAdapter(statemachine.Adapter):
self.ec2_client.terminate_instances(
InstanceIds=[instance['InstanceId']])
else:
self.delete_queue.put((external_id, log))
self.delete_instance_queue.put((external_id, log))
return instance
def _deleteVolume(self, external_id):

View File

@ -186,6 +186,14 @@ class AwsLabel(ConfigValue):
self.host_key_checking = self.pool.host_key_checking
self.use_spot = bool(label.get('use-spot', False))
self.imdsv2 = label.get('imdsv2', None)
self.dedicated_host = bool(label.get('dedicated-host', False))
if self.dedicated_host:
if self.use_spot:
raise Exception(
"Spot instances can not be used on dedicated hosts")
if not self.pool.az:
raise Exception(
"Availability-zone is required for dedicated hosts")
@staticmethod
def getSchema():
@ -209,6 +217,7 @@ class AwsLabel(ConfigValue):
'dynamic-tags': dict,
'use-spot': bool,
'imdsv2': v.Any(None, 'required', 'optional'),
'dedicated-host': bool,
}
@ -243,6 +252,7 @@ class AwsPool(ConfigPool):
self.max_resources = self.provider.max_resources.copy()
for k, val in pool_config.get('max-resources', {}).items():
self.max_resources[k] = val
self.az = pool_config.get('availability-zone')
@staticmethod
def getSchema():
@ -261,6 +271,7 @@ class AwsPool(ConfigPool):
'max-cores': int,
'max-ram': int,
'max-resources': {str: int},
'availability-zone': str,
})
return pool

View File

@ -314,8 +314,9 @@ class StateMachineNodeLauncher(stats.StatsReporter):
statsd_key = 'error.zksession'
self.manager.nodescan_worker.removeRequest(self.nodescan_request)
self.nodescan_request = None
except exceptions.QuotaException:
self.log.info("Aborting node %s due to quota failure", node.id)
except exceptions.QuotaException as e:
self.log.info("Aborting node %s due to quota failure: %s",
node.id, str(e))
node.state = zk.ABORTED
if self.state_machine:
node.external_id = self.state_machine.external_id
@ -1000,7 +1001,7 @@ class Instance:
* ready: bool (whether the instance is ready)
* deleted: bool (whether the instance is in a deleted state)
* external_id: str (the unique id of the instance)
* external_id: str or dict (the unique id of the instance)
* interface_ip: str
* metadata: dict
@ -1549,7 +1550,7 @@ class Adapter:
This method should return a new state machine object
initialized to delete the described instance.
:param str external_id: The external_id of the instance, as
:param str or dict external_id: The external_id of the instance, as
supplied by a creation StateMachine or an Instance.
:param log Logger: A logger instance for emitting annotated
logs related to the request.
@ -1682,7 +1683,7 @@ class Adapter:
"""Return the console log from the specified server
:param label ConfigLabel: The label config for the node
:param external_id str: The external id of the server
:param external_id str or dict: The external id of the server
"""
raise NotImplementedError()
@ -1690,6 +1691,6 @@ class Adapter:
"""Notify the adapter of a nodescan failure
:param label ConfigLabel: The label config for the node
:param external_id str: The external id of the server
:param external_id str or dict: The external id of the server
"""
pass

View File

@ -0,0 +1,37 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
labels:
- name: ubuntu
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
launch-retries: 1
pools:
- name: main
availability-zone: us-west-2a
max-servers: 10
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu
cloud-image: ubuntu1404
instance-type: t3.medium
key-name: zuul
dedicated-host: True

View File

@ -26,6 +26,7 @@ import testtools
from nodepool import config as nodepool_config
from nodepool import tests
import nodepool.status
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout
import nodepool.driver.statemachine
@ -49,9 +50,16 @@ class FakeAwsAdapter(AwsAdapter):
# when in fake mode so we need to intercept the
# run_instances call and validate the args we supply.
def _fake_run_instances(*args, **kwargs):
self.__testcase.run_instance_calls.append(kwargs)
self.__testcase.run_instances_calls.append(kwargs)
if self.__testcase.run_instances_exception:
raise self.__testcase.run_instances_exception
return self.ec2_client.run_instances_orig(*args, **kwargs)
def _fake_allocate_hosts(*args, **kwargs):
if self.__testcase.allocate_hosts_exception:
raise self.__testcase.allocate_hosts_exception
return self.ec2_client.allocate_hosts_orig(*args, **kwargs)
# The ImdsSupport parameter isn't handled by moto
def _fake_register_image(*args, **kwargs):
self.__testcase.register_image_calls.append(kwargs)
@ -65,6 +73,8 @@ class FakeAwsAdapter(AwsAdapter):
self.ec2_client.run_instances_orig = self.ec2_client.run_instances
self.ec2_client.run_instances = _fake_run_instances
self.ec2_client.allocate_hosts_orig = self.ec2_client.allocate_hosts
self.ec2_client.allocate_hosts = _fake_allocate_hosts
self.ec2_client.register_image_orig = self.ec2_client.register_image
self.ec2_client.register_image = _fake_register_image
self.ec2_client.import_snapshot = \
@ -161,7 +171,9 @@ class TestDriverAws(tests.DBTestCase):
CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
# A list of args to method calls for validation
self.run_instance_calls = []
self.run_instances_calls = []
self.run_instances_exception = None
self.allocate_hosts_exception = None
self.register_image_calls = []
# TEST-NET-3
@ -236,8 +248,8 @@ class TestDriverAws(tests.DBTestCase):
def requestNode(self, config_path, label):
# A helper method to perform a single node request
configfile = self.setup_config(config_path)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
self.pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(self.pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
@ -583,12 +595,12 @@ class TestDriverAws(tests.DBTestCase):
# Like us-west-2x where x is random
self.assertTrue(len(node.az) == len('us-west-2x'))
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertFalse(response['EbsOptimized']['Value'])
self.assertFalse(
'MetadataOptions' in self.run_instance_calls[0])
'MetadataOptions' in self.run_instances_calls[0])
node.state = zk.USED
self.zk.storeNode(node)
@ -630,7 +642,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(
Attribute='userData')
self.assertIn('UserData', response)
@ -647,7 +659,8 @@ class TestDriverAws(tests.DBTestCase):
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id, associations[0]['InstanceId'])
self.assertEqual(node.external_id['instance'],
associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
@ -660,7 +673,8 @@ class TestDriverAws(tests.DBTestCase):
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id, associations[0]['InstanceId'])
self.assertEqual(node.external_id['instance'],
associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
@ -696,7 +710,7 @@ class TestDriverAws(tests.DBTestCase):
# Make sure we make the call to AWS as expected
self.assertEqual(
self.run_instance_calls[0]['NetworkInterfaces']
self.run_instances_calls[0]['NetworkInterfaces']
[0]['Ipv6AddressCount'], 1)
# This is like what we should get back from AWS, verify the
@ -712,7 +726,7 @@ class TestDriverAws(tests.DBTestCase):
instance['NetworkInterfaces'] = [iface]
provider = Dummy()
provider.region_name = 'us-west-2'
awsi = AwsInstance(provider, instance, None)
awsi = AwsInstance(provider, instance, None, None)
self.assertEqual(awsi.public_ipv4, '1.2.3.4')
self.assertEqual(awsi.private_ipv4, '10.0.0.1')
self.assertEqual(awsi.public_ipv6, 'fe80::dead:beef')
@ -725,7 +739,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
@ -743,7 +757,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
@ -773,7 +787,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertTrue(response['EbsOptimized']['Value'])
@ -785,10 +799,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertEqual(
self.run_instance_calls[0]['MetadataOptions']['HttpTokens'],
self.run_instances_calls[0]['MetadataOptions']['HttpTokens'],
'required')
self.assertEqual(
self.run_instance_calls[0]['MetadataOptions']['HttpEndpoint'],
self.run_instances_calls[0]['MetadataOptions']['HttpEndpoint'],
'enabled')
def test_aws_invalid_instance_type(self):
@ -846,10 +860,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image(self):
@ -889,10 +903,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_snapshot_imdsv2(self):
@ -936,10 +950,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image_imdsv2(self):
@ -1235,7 +1249,106 @@ class TestDriverAws(tests.DBTestCase):
# Test creating a spot instances instead of an on-demand on.
req = self.requestNode('aws/aws-spot.yaml', 'ubuntu1404-spot')
node = self.assertSuccess(req)
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
self.assertEqual(instance.instance_lifecycle, 'spot')
# moto doesn't provide the spot_instance_request_id
# self.assertIsNotNone(instance.spot_instance_request_id)
def test_aws_dedicated_host(self):
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
for _ in iterate_timeout(60, Exception,
"Node request state transition",
interval=1):
# Ensure that we can render the node list (and that our
# use of a dictionary for external_id does not cause an
# error).
node_list = nodepool.status.node_list(self.zk)
nodepool.status.output(node_list, 'pretty')
nodepool.status.output(node_list, 'json')
req = self.zk.getNodeRequest(req.id)
if req.state in (zk.FULFILLED,):
break
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
# Verify instance and host are created
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 1)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 1)
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
# verify instance and host are deleted
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_dedicated_host_instance_failure(self):
self.run_instances_exception = Exception("some failure")
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
self.assertEqual(req.state, zk.FAILED)
# verify instance and host are deleted
provider = self.pool.getProviderManager('ec2-us-west-2')
for _ in iterate_timeout(60, Exception,
"Cloud cleanup",
interval=1):
if not (provider.launchers or provider.deleters):
break
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_dedicated_host_allocation_failure(self):
self.allocate_hosts_exception = Exception("some failure")
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
self.assertEqual(req.state, zk.FAILED)
# verify instance and host are deleted
provider = self.pool.getProviderManager('ec2-us-west-2')
for _ in iterate_timeout(60, Exception,
"Cloud cleanup",
interval=1):
if not (provider.launchers or provider.deleters):
break
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)

View File

@ -0,0 +1,9 @@
---
features:
- |
Limited support for AWS dedicated hosts is now available using the
:attr:`providers.[aws].pools.labels.dedicated-host` option. This
allows for a dedicated host to be allocated to serve a single
instance (the dedicated host can not be used for multiple
instances). This enables Nodepool to launch certain instance
types which require dedicated hosts.