Merge "Add support for AWS dedicated hosts"

This commit is contained in:
Zuul 2024-06-27 23:14:46 +00:00 committed by Gerrit Code Review
commit cbe9e0a070
7 changed files with 664 additions and 79 deletions

View File

@ -501,6 +501,13 @@ Selecting the ``aws`` driver adds the following options to the
A unique name within the provider for this pool of resources.
.. attr:: availability-zone
:type: str
If provided, instances launched from this pool will be
assigned to the specified availibility zone. If omitted, AWS
will select from the available zones.
.. attr:: priority
:type: int
:default: 100
@ -648,6 +655,23 @@ Selecting the ``aws`` driver adds the following options to the
:attr:`providers.[aws].diskimages`. Mutually exclusive
with :attr:`providers.[aws].pools.labels.cloud-image`
.. attr:: dedicated-host
:type: bool
If set to ``true``, an AWS dedicated host will be
allocated for the instance. Nodepool only supports
running a single instance on dedicated hosts, so it will
treat the host and the instance launched on it as a
matched pair. The host will not be used for any other
instances, and will be released when the associated
Nodepool node is deleted.
If this option is set, the
:attr:`providers.[aws].pools.labels.use-spot` option is
not available, and
:attr:`providers.[aws].pools.availability-zone`
option is required.
.. attr:: ebs-optimized
:type: bool
:default: False

View File

@ -64,7 +64,7 @@ def tag_list_to_dict(taglist):
# "Running On-Demand High Memory instances" was determined from
# https://aws.amazon.com/ec2/instance-types/high-memory/
QUOTA_CODES = {
INSTANCE_QUOTA_CODES = {
# INSTANCE FAMILY: [ON-DEMAND, SPOT]
'a': ['L-1216C47A', 'L-34B43A08'],
'c': ['L-1216C47A', 'L-34B43A08'],
@ -87,6 +87,119 @@ QUOTA_CODES = {
'hpc': ['L-F7808C92', '']
}
HOST_QUOTA_CODES = {
'a1': 'L-949445B0',
'c3': 'L-8D142A2E',
'c4': 'L-E4BF28E0',
'c5': 'L-81657574',
'c5a': 'L-03F01FD8',
'c5d': 'L-C93F66A2',
'c5n': 'L-20F13EBD',
'c6a': 'L-D75D2E84',
'c6g': 'L-A749B537',
'c6gd': 'L-545AED39',
'c6gn': 'L-5E3A299D',
'c6i': 'L-5FA3355A',
'c6id': 'L-1BBC5241',
'c6in': 'L-6C2C40CC',
'c7a': 'L-698B67E5',
'c7g': 'L-13B8FCE8',
'c7gd': 'L-EF58B059',
'c7gn': 'L-97677CE3',
'c7i': 'L-587AA6E3',
'd2': 'L-8B27377A',
'dl1': 'L-AD667A3D',
'f1': 'L-5C4CD236',
'g3': 'L-DE82EABA',
'g3s': 'L-9675FDCD',
'g4ad': 'L-FD8E9B9A',
'g4dn': 'L-CAE24619',
'g5': 'L-A6E7FE5E',
'g5g': 'L-4714FFEA',
'g6': 'L-B88B9D6B',
'gr6': 'L-E68C3AFF',
'h1': 'L-84391ECC',
'i2': 'L-6222C1B6',
'i3': 'L-8E60B0B1',
'i3en': 'L-77EE2B11',
'i4g': 'L-F62CBADB',
'i4i': 'L-0300530D',
'im4gn': 'L-93155D6F',
'inf': 'L-5480EFD2',
'inf2': 'L-E5BCF7B5',
'is4gen': 'L-CB4F5825',
'm3': 'L-3C82F907',
'm4': 'L-EF30B25E',
'm5': 'L-8B7BF662',
'm5a': 'L-B10F70D6',
'm5ad': 'L-74F41837',
'm5d': 'L-8CCBD91B',
'm5dn': 'L-DA07429F',
'm5n': 'L-24D7D4AD',
'm5zn': 'L-BD9BD803',
'm6a': 'L-80F2B67F',
'm6g': 'L-D50A37FA',
'm6gd': 'L-84FB37AA',
'm6i': 'L-D269BEFD',
'm6id': 'L-FDB0A352',
'm6idn': 'L-9721EDD9',
'm6in': 'L-D037CF10',
'm7a': 'L-4740F819',
'm7g': 'L-9126620E',
'm7gd': 'L-F8516154',
'm7i': 'L-30E31217',
'mac1': 'L-A8448DC5',
'mac2': 'L-5D8DADF5',
'mac2-m2': 'L-B90B5B66',
'mac2-m2pro': 'L-14F120D1',
'p2': 'L-2753CF59',
'p3': 'L-A0A19F79',
'p3dn': 'L-B601B3B6',
'p4d': 'L-86A789C3',
'p5': 'L-5136197D',
'r3': 'L-B7208018',
'r4': 'L-313524BA',
'r5': 'L-EA4FD6CF',
'r5a': 'L-8FE30D52',
'r5ad': 'L-EC7178B6',
'r5b': 'L-A2D59C67',
'r5d': 'L-8814B54F',
'r5dn': 'L-4AB14223',
'r5n': 'L-52EF324A',
'r6a': 'L-BC1589C5',
'r6g': 'L-B6D6065D',
'r6gd': 'L-EF284EFB',
'r6i': 'L-F13A970A',
'r6id': 'L-B89271A9',
'r6idn': 'L-C4EABC2C',
'r6in': 'L-EA99608B',
'r7a': 'L-4D15192B',
'r7g': 'L-67B8B4C7',
'r7gd': 'L-01137DCE',
'r7i': 'L-55E05032',
'r7iz': 'L-BC9FCC71',
't3': 'L-1586174D',
'trn1': 'L-5E4FB836',
'trn1n': 'L-39926A58',
'u-12tb1': 'L-D6994875',
'u-18tb1': 'L-5F7FD336',
'u-24tb1': 'L-FACBE655',
'u-3tb1': 'L-7F5506AB',
'u-6tb1': 'L-89870E8E',
'u-9tb1': 'L-98E1FFAC',
'u7in-16tb': 'L-75B9BECB',
'u7in-24tb': 'L-CA51381E',
'u7in-32tb': 'L-9D28191F',
'vt1': 'L-A68CFBF7',
'x1': 'L-DE3D9563',
'x1e': 'L-DEF8E115',
'x2gd': 'L-5CC9EA82',
'x2idn': 'L-A84ABF80',
'x2iedn': 'L-D0AA08B1',
'x2iezn': 'L-888B4496',
'z1d': 'L-F035E935',
}
VOLUME_QUOTA_CODES = {
'io1': dict(iops='L-B3A130E6', storage='L-FD252861'),
'io2': dict(iops='L-8D977E7E', storage='L-09BD8365'),
@ -104,9 +217,13 @@ SPOT = 1
class AwsInstance(statemachine.Instance):
def __init__(self, provider, instance, quota):
def __init__(self, provider, instance, host, quota):
super().__init__()
self.external_id = instance['InstanceId']
self.external_id = dict()
if instance:
self.external_id['instance'] = instance['InstanceId']
if host:
self.external_id['host'] = host['HostId']
self.metadata = tag_list_to_dict(instance.get('Tags'))
self.private_ipv4 = instance.get('PrivateIpAddress')
self.private_ipv6 = None
@ -131,6 +248,7 @@ class AwsInstance(statemachine.Instance):
class AwsResource(statemachine.Resource):
TYPE_HOST = 'host'
TYPE_INSTANCE = 'instance'
TYPE_AMI = 'ami'
TYPE_SNAPSHOT = 'snapshot'
@ -143,24 +261,52 @@ class AwsResource(statemachine.Resource):
class AwsDeleteStateMachine(statemachine.StateMachine):
VM_DELETING = 'deleting vm'
HOST_RELEASING_START = 'start releasing host'
HOST_RELEASING = 'releasing host'
INSTANCE_DELETING_START = 'start deleting instance'
INSTANCE_DELETING = 'deleting instance'
COMPLETE = 'complete'
def __init__(self, adapter, external_id, log):
self.log = log
super().__init__()
self.adapter = adapter
# Backwards compatible for old nodes where external_id is a
# str
if type(external_id) is str:
external_id = dict(instance=external_id)
self.external_id = external_id
def advance(self):
if self.state == self.START:
self.instance = self.adapter._deleteInstance(
self.external_id, self.log)
self.state = self.VM_DELETING
if 'instance' in self.external_id:
self.state = self.INSTANCE_DELETING_START
elif 'host' in self.external_id:
self.state = self.HOST_RELEASING_START
else:
self.state = self.COMPLETE
if self.state == self.VM_DELETING:
if self.state == self.INSTANCE_DELETING_START:
self.instance = self.adapter._deleteInstance(
self.external_id['instance'], self.log)
self.state = self.INSTANCE_DELETING
if self.state == self.INSTANCE_DELETING:
self.instance = self.adapter._refreshDelete(self.instance)
if self.instance is None:
if 'host' in self.external_id:
self.state = self.HOST_RELEASING_START
else:
self.state = self.COMPLETE
if self.state == self.HOST_RELEASING_START:
self.host = self.adapter._releaseHost(
self.external_id['host'], self.log)
self.state = self.HOST_RELEASING
if self.state == self.HOST_RELEASING:
self.host = self.adapter._refreshDelete(self.host)
if self.host is None:
self.state = self.COMPLETE
if self.state == self.COMPLETE:
@ -168,6 +314,10 @@ class AwsDeleteStateMachine(statemachine.StateMachine):
class AwsCreateStateMachine(statemachine.StateMachine):
HOST_ALLOCATING_START = 'start allocating host'
HOST_ALLOCATING_SUBMIT = 'submit allocating host'
HOST_ALLOCATING = 'allocating host'
INSTANCE_CREATING_START = 'start creating instance'
INSTANCE_CREATING_SUBMIT = 'submit creating instance'
INSTANCE_CREATING = 'creating instance'
COMPLETE = 'complete'
@ -194,12 +344,50 @@ class AwsCreateStateMachine(statemachine.StateMachine):
self.public_ipv6 = None
self.nic = None
self.instance = None
self.host = None
self.external_id = dict()
self.dedicated_host_id = None
def advance(self):
if self.state == self.START:
if self.label.dedicated_host:
self.state = self.HOST_ALLOCATING_START
else:
self.state = self.INSTANCE_CREATING_START
if self.state == self.HOST_ALLOCATING_START:
self.host_create_future = self.adapter._submitAllocateHost(
self.label,
self.tags, self.hostname, self.log)
self.state = self.HOST_ALLOCATING_SUBMIT
if self.state == self.HOST_ALLOCATING_SUBMIT:
host = self.adapter._completeAllocateHost(self.host_create_future)
if host is None:
return
self.host = host
self.external_id['host'] = host['HostId']
self.state = self.HOST_ALLOCATING
if self.state == self.HOST_ALLOCATING:
self.host = self.adapter._refresh(self.host)
state = self.host['State'].lower()
if state == 'available':
self.dedicated_host_id = self.host['HostId']
self.state = self.INSTANCE_CREATING_START
elif state in [
'permanent-failure', 'released',
'released-permanent-failure']:
raise exceptions.LaunchStatusException(
f"Host in {state} state")
else:
return
if self.state == self.INSTANCE_CREATING_START:
self.create_future = self.adapter._submitCreateInstance(
self.label, self.image_external_id,
self.tags, self.hostname, self.log)
self.tags, self.hostname, self.dedicated_host_id, self.log)
self.state = self.INSTANCE_CREATING_SUBMIT
if self.state == self.INSTANCE_CREATING_SUBMIT:
@ -207,7 +395,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
if instance is None:
return
self.instance = instance
self.external_id = instance['InstanceId']
self.external_id['instance'] = instance['InstanceId']
self.quota = self.adapter.getQuotaForLabel(self.label)
self.state = self.INSTANCE_CREATING
@ -225,7 +413,7 @@ class AwsCreateStateMachine(statemachine.StateMachine):
if self.state == self.COMPLETE:
self.complete = True
return AwsInstance(self.adapter.provider, self.instance,
self.quota)
self.host, self.quota)
class AwsAdapter(statemachine.Adapter):
@ -271,7 +459,8 @@ class AwsAdapter(statemachine.Adapter):
# by larger batches. That strikes a balance between
# responsiveness and efficiency. Reducing the overall number
# of requests leaves more time for create instance calls.
self.delete_queue = queue.Queue()
self.delete_host_queue = queue.Queue()
self.delete_instance_queue = queue.Queue()
self.delete_thread = threading.Thread(target=self._deleteThread)
self.delete_thread.daemon = True
self.delete_thread.start()
@ -311,6 +500,9 @@ class AwsAdapter(statemachine.Adapter):
# asynchronously update the cached values, meanwhile returning
# the previous cached data if available. This means every
# call after the first one is instantaneous.
self._listHosts = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listHosts)
self._listInstances = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listInstances)
@ -357,6 +549,16 @@ class AwsAdapter(statemachine.Adapter):
def listResources(self):
self._tagSnapshots()
self._tagAmis()
for host in self._listHosts():
try:
if host['State'].lower() in [
"released", "released-permanent-failure"]:
continue
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(host.get('Tags')),
AwsResource.TYPE_HOST,
host['HostId'])
for instance in self._listInstances():
try:
if instance['State']['Name'].lower() == "terminated":
@ -403,6 +605,8 @@ class AwsAdapter(statemachine.Adapter):
def deleteResource(self, resource):
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
if resource.type == AwsResource.TYPE_HOST:
self._releaseHost(resource.id, immediate=True)
if resource.type == AwsResource.TYPE_INSTANCE:
self._deleteInstance(resource.id, immediate=True)
if resource.type == AwsResource.TYPE_VOLUME:
@ -421,10 +625,25 @@ class AwsAdapter(statemachine.Adapter):
for instance in self._listInstances():
if instance['State']["Name"].lower() == "terminated":
continue
quota = self._getQuotaForInstanceType(
instance['InstanceType'],
SPOT if instance.get('InstanceLifecycle') == 'spot'
else ON_DEMAND)
# For now, we are optimistically assuming that when an
# instance is launched on a dedicated host, it is not
# counted against instance quota. That may be overly
# optimistic. If it is, then we will merge the two quotas
# below rather than switch.
# Additionally, we are using the instance as a proxy for
# the host. It would be more correct to also list hosts
# here to include hosts with no instances. But since our
# support for dedicated hosts is currently 1:1 with
# instances, this should be sufficient.
if instance['Placement'].get('HostId'):
# Dedicated host
quota = self._getQuotaForHostType(
instance['InstanceType'])
else:
quota = self._getQuotaForInstanceType(
instance['InstanceType'],
SPOT if instance.get('InstanceLifecycle') == 'spot'
else ON_DEMAND)
for attachment in instance['BlockDeviceMappings']:
volume_id = attachment['Ebs']['VolumeId']
volume = volumes.get(volume_id)
@ -435,20 +654,24 @@ class AwsAdapter(statemachine.Adapter):
continue
quota.add(self._getQuotaForVolume(volume))
yield AwsInstance(self.provider, instance, quota)
yield AwsInstance(self.provider, instance, None, quota)
def getQuotaLimits(self):
# Get the instance and volume types that this provider handles
instance_types = {}
host_types = set()
volume_types = set()
ec2_quotas = self._listEC2Quotas()
ebs_quotas = self._listEBSQuotas()
for pool in self.provider.pools.values():
for label in pool.labels.values():
if label.instance_type not in instance_types:
instance_types[label.instance_type] = set()
instance_types[label.instance_type].add(
SPOT if label.use_spot else ON_DEMAND)
if label.dedicated_host:
host_types.add(label.instance_type)
else:
if label.instance_type not in instance_types:
instance_types[label.instance_type] = set()
instance_types[label.instance_type].add(
SPOT if label.use_spot else ON_DEMAND)
if label.volume_type:
volume_types.add(label.volume_type)
args = dict(default=math.inf)
@ -466,6 +689,18 @@ class AwsAdapter(statemachine.Adapter):
code, instance_type)
continue
args[code] = ec2_quotas[code]
for host_type in host_types:
code = self._getQuotaCodeForHostType(host_type)
if code in args:
continue
if not code:
continue
if code not in ec2_quotas:
self.log.warning(
"AWS quota code %s for host type: %s not known",
code, host_type)
continue
args[code] = ec2_quotas[code]
for volume_type in volume_types:
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type)
if not vquota_codes:
@ -490,9 +725,18 @@ class AwsAdapter(statemachine.Adapter):
return QuotaInformation(**args)
def getQuotaForLabel(self, label):
quota = self._getQuotaForInstanceType(
label.instance_type,
SPOT if label.use_spot else ON_DEMAND)
# For now, we are optimistically assuming that when an
# instance is launched on a dedicated host, it is not counted
# against instance quota. That may be overly optimistic. If
# it is, then we will merge the two quotas below rather than
# switch.
if label.dedicated_host:
quota = self._getQuotaForHostType(
label.instance_type)
else:
quota = self._getQuotaForInstanceType(
label.instance_type,
SPOT if label.use_spot else ON_DEMAND)
if label.volume_type:
quota.add(self._getQuotaForVolumeType(
label.volume_type,
@ -926,7 +1170,7 @@ class AwsAdapter(statemachine.Adapter):
m = self.instance_key_re.match(instance_type)
if m:
key = m.group(1)
code = QUOTA_CODES.get(key)
code = INSTANCE_QUOTA_CODES.get(key)
if code:
return code[market_type_option]
self.log.warning(
@ -962,6 +1206,28 @@ class AwsAdapter(statemachine.Adapter):
return QuotaInformation(**args)
host_key_re = re.compile(r'([a-z\d\-]+)\..*')
def _getQuotaCodeForHostType(self, host_type):
m = self.host_key_re.match(host_type)
if m:
key = m.group(1)
code = HOST_QUOTA_CODES.get(key)
if code:
return code
self.log.warning(
"Unknown quota code for host type: %s",
host_type)
return None
def _getQuotaForHostType(self, host_type):
code = self._getQuotaCodeForHostType(host_type)
args = dict(instances=1)
if code:
args[code] = 1
return QuotaInformation(**args)
def _getQuotaForVolume(self, volume):
volume_type = volume['VolumeType']
vquota_codes = VOLUME_QUOTA_CODES.get(volume_type, {})
@ -990,20 +1256,33 @@ class AwsAdapter(statemachine.Adapter):
InstanceTypes=[instance_type])
def _refresh(self, obj):
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
return instance
if 'InstanceId' in obj:
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
return instance
elif 'HostId' in obj:
for host in self._listHosts():
if host['HostId'] == obj['HostId']:
return host
return obj
def _refreshDelete(self, obj):
if obj is None:
return obj
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
if instance['State']['Name'].lower() == "terminated":
return None
return instance
if 'InstanceId' in obj:
for instance in self._listInstances():
if instance['InstanceId'] == obj['InstanceId']:
if instance['State']['Name'].lower() == "terminated":
return None
return instance
elif 'HostId' in obj:
for host in self._listHosts():
if host['HostId'] == obj['HostId']:
if host['State'].lower() in [
'released', 'released-permanent-failure']:
return None
return host
return None
def _listServiceQuotas(self, service_code):
@ -1023,6 +1302,15 @@ class AwsAdapter(statemachine.Adapter):
def _listEBSQuotas(self):
return self._listServiceQuotas('ebs')
def _listHosts(self):
with self.non_mutating_rate_limiter(
self.log.debug, "Listed hosts"):
paginator = self.ec2_client.get_paginator('describe_hosts')
hosts = []
for page in paginator.paginate():
hosts.extend(page['Hosts'])
return hosts
def _listInstances(self):
with self.non_mutating_rate_limiter(
self.log.debug, "Listed instances"):
@ -1115,12 +1403,63 @@ class AwsAdapter(statemachine.Adapter):
resp = self.ec2_client.describe_images(ImageIds=[image_id])
return resp['Images'][0]
def _submitAllocateHost(self, label,
tags, hostname, log):
return self.create_executor.submit(
self._allocateHost,
label,
tags, hostname, log)
def _completeAllocateHost(self, future):
if not future.done():
return None
try:
return future.result()
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'HostLimitExceeded':
# Re-raise as a quota exception so that the
# statemachine driver resets quota.
raise exceptions.QuotaException(str(error))
if (error.response['Error']['Code'] ==
'InsufficientInstanceCapacity'):
# Re-raise as CapacityException so it would have
# "error.capacity" statsd_key, which can be handled
# differently than "error.unknown"
raise exceptions.CapacityException(str(error))
raise
def _allocateHost(self, label,
tags, hostname, log):
args = dict(
AutoPlacement='off',
AvailabilityZone=label.pool.az,
InstanceType=label.instance_type,
Quantity=1,
HostRecovery='off',
HostMaintenance='off',
TagSpecifications=[
{
'ResourceType': 'dedicated-host',
'Tags': tag_dict_to_list(tags),
},
]
)
with self.rate_limiter(log.debug, "Allocated host"):
log.debug("Allocating host %s", hostname)
resp = self.ec2_client.allocate_hosts(**args)
host_ids = resp['HostIds']
log.debug("Allocated host %s as host %s",
hostname, host_ids[0])
return dict(HostId=host_ids[0],
State='pending')
def _submitCreateInstance(self, label, image_external_id,
tags, hostname, log):
tags, hostname, dedicated_host_id, log):
return self.create_executor.submit(
self._createInstance,
label, image_external_id,
tags, hostname, log)
tags, hostname, dedicated_host_id, log)
def _completeCreateInstance(self, future):
if not future.done():
@ -1141,7 +1480,7 @@ class AwsAdapter(statemachine.Adapter):
raise
def _createInstance(self, label, image_external_id,
tags, hostname, log):
tags, hostname, dedicated_host_id, log):
if image_external_id:
image_id = image_external_id
else:
@ -1240,12 +1579,29 @@ class AwsAdapter(statemachine.Adapter):
'HttpEndpoint': 'enabled',
}
if dedicated_host_id:
placement = args.setdefault('Placement', {})
placement.update({
'Tenancy': 'host',
'HostId': dedicated_host_id,
'Affinity': 'host',
})
if label.pool.az:
placement = args.setdefault('Placement', {})
placement['AvailabilityZone'] = label.pool.az
with self.rate_limiter(log.debug, "Created instance"):
log.debug("Creating VM %s", hostname)
resp = self.ec2_client.run_instances(**args)
instances = resp['Instances']
log.debug("Created VM %s as instance %s",
hostname, instances[0]['InstanceId'])
if dedicated_host_id:
log.debug("Created VM %s as instance %s on host %s",
hostname, instances[0]['InstanceId'],
dedicated_host_id)
else:
log.debug("Created VM %s as instance %s",
hostname, instances[0]['InstanceId'])
return instances[0]
def _deleteThread(self):
@ -1256,28 +1612,62 @@ class AwsAdapter(statemachine.Adapter):
self.log.exception("Error in delete thread:")
time.sleep(5)
def _deleteThreadInner(self):
@staticmethod
def _getBatch(the_queue):
records = []
try:
records.append(self.delete_queue.get(block=True, timeout=10))
records.append(the_queue.get(block=True, timeout=10))
except queue.Empty:
return
return []
while True:
try:
records.append(self.delete_queue.get(block=False))
records.append(the_queue.get(block=False))
except queue.Empty:
break
# The terminate call has a limit of 1k, but AWS recommends
# smaller batches. We limit to 50 here.
if len(records) >= 50:
break
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Deleting instance {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Deleted {count} instances"):
self.ec2_client.terminate_instances(InstanceIds=ids)
return records
def _deleteThreadInner(self):
records = self._getBatch(self.delete_instance_queue)
if records:
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Deleting instance {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Deleted {count} instances"):
self.ec2_client.terminate_instances(InstanceIds=ids)
records = self._getBatch(self.delete_host_queue)
if records:
ids = []
for (del_id, log) in records:
ids.append(del_id)
log.debug(f"Releasing host {del_id}")
count = len(ids)
with self.rate_limiter(log.debug, f"Released {count} hosts"):
self.ec2_client.release_hosts(HostIds=ids)
def _releaseHost(self, external_id, log=None, immediate=False):
if log is None:
log = self.log
for host in self._listHosts():
if host['HostId'] == external_id:
break
else:
log.warning(f"Host not found when releasing {external_id}")
return None
if immediate:
with self.rate_limiter(log.debug, "Released host"):
log.debug(f"Deleting host {external_id}")
self.ec2_client.release_hosts(
HostIds=[host['HostId']])
else:
self.delete_host_queue.put((external_id, log))
return host
def _deleteInstance(self, external_id, log=None, immediate=False):
if log is None:
@ -1294,7 +1684,7 @@ class AwsAdapter(statemachine.Adapter):
self.ec2_client.terminate_instances(
InstanceIds=[instance['InstanceId']])
else:
self.delete_queue.put((external_id, log))
self.delete_instance_queue.put((external_id, log))
return instance
def _deleteVolume(self, external_id):

View File

@ -186,6 +186,14 @@ class AwsLabel(ConfigValue):
self.host_key_checking = self.pool.host_key_checking
self.use_spot = bool(label.get('use-spot', False))
self.imdsv2 = label.get('imdsv2', None)
self.dedicated_host = bool(label.get('dedicated-host', False))
if self.dedicated_host:
if self.use_spot:
raise Exception(
"Spot instances can not be used on dedicated hosts")
if not self.pool.az:
raise Exception(
"Availability-zone is required for dedicated hosts")
@staticmethod
def getSchema():
@ -209,6 +217,7 @@ class AwsLabel(ConfigValue):
'dynamic-tags': dict,
'use-spot': bool,
'imdsv2': v.Any(None, 'required', 'optional'),
'dedicated-host': bool,
}
@ -243,6 +252,7 @@ class AwsPool(ConfigPool):
self.max_resources = self.provider.max_resources.copy()
for k, val in pool_config.get('max-resources', {}).items():
self.max_resources[k] = val
self.az = pool_config.get('availability-zone')
@staticmethod
def getSchema():
@ -261,6 +271,7 @@ class AwsPool(ConfigPool):
'max-cores': int,
'max-ram': int,
'max-resources': {str: int},
'availability-zone': str,
})
return pool

View File

@ -314,8 +314,9 @@ class StateMachineNodeLauncher(stats.StatsReporter):
statsd_key = 'error.zksession'
self.manager.nodescan_worker.removeRequest(self.nodescan_request)
self.nodescan_request = None
except exceptions.QuotaException:
self.log.info("Aborting node %s due to quota failure", node.id)
except exceptions.QuotaException as e:
self.log.info("Aborting node %s due to quota failure: %s",
node.id, str(e))
node.state = zk.ABORTED
if self.state_machine:
node.external_id = self.state_machine.external_id
@ -1000,7 +1001,7 @@ class Instance:
* ready: bool (whether the instance is ready)
* deleted: bool (whether the instance is in a deleted state)
* external_id: str (the unique id of the instance)
* external_id: str or dict (the unique id of the instance)
* interface_ip: str
* metadata: dict
@ -1549,7 +1550,7 @@ class Adapter:
This method should return a new state machine object
initialized to delete the described instance.
:param str external_id: The external_id of the instance, as
:param str or dict external_id: The external_id of the instance, as
supplied by a creation StateMachine or an Instance.
:param log Logger: A logger instance for emitting annotated
logs related to the request.
@ -1682,7 +1683,7 @@ class Adapter:
"""Return the console log from the specified server
:param label ConfigLabel: The label config for the node
:param external_id str: The external id of the server
:param external_id str or dict: The external id of the server
"""
raise NotImplementedError()
@ -1690,6 +1691,6 @@ class Adapter:
"""Notify the adapter of a nodescan failure
:param label ConfigLabel: The label config for the node
:param external_id str: The external id of the server
:param external_id str or dict: The external id of the server
"""
pass

View File

@ -0,0 +1,37 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
labels:
- name: ubuntu
providers:
- name: ec2-us-west-2
driver: aws
region-name: us-west-2
cloud-images:
- name: ubuntu1404
image-id: ami-1e749f67
username: ubuntu
launch-retries: 1
pools:
- name: main
availability-zone: us-west-2a
max-servers: 10
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: ubuntu
cloud-image: ubuntu1404
instance-type: t3.medium
key-name: zuul
dedicated-host: True

View File

@ -26,6 +26,7 @@ import testtools
from nodepool import config as nodepool_config
from nodepool import tests
import nodepool.status
from nodepool.zk import zookeeper as zk
from nodepool.nodeutils import iterate_timeout
import nodepool.driver.statemachine
@ -49,9 +50,16 @@ class FakeAwsAdapter(AwsAdapter):
# when in fake mode so we need to intercept the
# run_instances call and validate the args we supply.
def _fake_run_instances(*args, **kwargs):
self.__testcase.run_instance_calls.append(kwargs)
self.__testcase.run_instances_calls.append(kwargs)
if self.__testcase.run_instances_exception:
raise self.__testcase.run_instances_exception
return self.ec2_client.run_instances_orig(*args, **kwargs)
def _fake_allocate_hosts(*args, **kwargs):
if self.__testcase.allocate_hosts_exception:
raise self.__testcase.allocate_hosts_exception
return self.ec2_client.allocate_hosts_orig(*args, **kwargs)
# The ImdsSupport parameter isn't handled by moto
def _fake_register_image(*args, **kwargs):
self.__testcase.register_image_calls.append(kwargs)
@ -65,6 +73,8 @@ class FakeAwsAdapter(AwsAdapter):
self.ec2_client.run_instances_orig = self.ec2_client.run_instances
self.ec2_client.run_instances = _fake_run_instances
self.ec2_client.allocate_hosts_orig = self.ec2_client.allocate_hosts
self.ec2_client.allocate_hosts = _fake_allocate_hosts
self.ec2_client.register_image_orig = self.ec2_client.register_image
self.ec2_client.register_image = _fake_register_image
self.ec2_client.import_snapshot = \
@ -161,7 +171,9 @@ class TestDriverAws(tests.DBTestCase):
CreateBucketConfiguration={'LocationConstraint': 'us-west-2'})
# A list of args to method calls for validation
self.run_instance_calls = []
self.run_instances_calls = []
self.run_instances_exception = None
self.allocate_hosts_exception = None
self.register_image_calls = []
# TEST-NET-3
@ -236,8 +248,8 @@ class TestDriverAws(tests.DBTestCase):
def requestNode(self, config_path, label):
# A helper method to perform a single node request
configfile = self.setup_config(config_path)
pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(pool)
self.pool = self.useNodepool(configfile, watermark_sleep=1)
self.startPool(self.pool)
req = zk.NodeRequest()
req.state = zk.REQUESTED
@ -583,12 +595,12 @@ class TestDriverAws(tests.DBTestCase):
# Like us-west-2x where x is random
self.assertTrue(len(node.az) == len('us-west-2x'))
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertFalse(response['EbsOptimized']['Value'])
self.assertFalse(
'MetadataOptions' in self.run_instance_calls[0])
'MetadataOptions' in self.run_instances_calls[0])
node.state = zk.USED
self.zk.storeNode(node)
@ -630,7 +642,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(
Attribute='userData')
self.assertIn('UserData', response)
@ -647,7 +659,8 @@ class TestDriverAws(tests.DBTestCase):
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id, associations[0]['InstanceId'])
self.assertEqual(node.external_id['instance'],
associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
@ -660,7 +673,8 @@ class TestDriverAws(tests.DBTestCase):
associations = self.ec2_client.\
describe_iam_instance_profile_associations()[
"IamInstanceProfileAssociations"]
self.assertEqual(node.external_id, associations[0]['InstanceId'])
self.assertEqual(node.external_id['instance'],
associations[0]['InstanceId'])
self.assertEqual(self.instance_profile_arn,
associations[0]['IamInstanceProfile']['Arn'])
@ -696,7 +710,7 @@ class TestDriverAws(tests.DBTestCase):
# Make sure we make the call to AWS as expected
self.assertEqual(
self.run_instance_calls[0]['NetworkInterfaces']
self.run_instances_calls[0]['NetworkInterfaces']
[0]['Ipv6AddressCount'], 1)
# This is like what we should get back from AWS, verify the
@ -712,7 +726,7 @@ class TestDriverAws(tests.DBTestCase):
instance['NetworkInterfaces'] = [iface]
provider = Dummy()
provider.region_name = 'us-west-2'
awsi = AwsInstance(provider, instance, None)
awsi = AwsInstance(provider, instance, None, None)
self.assertEqual(awsi.public_ipv4, '1.2.3.4')
self.assertEqual(awsi.private_ipv4, '10.0.0.1')
self.assertEqual(awsi.public_ipv6, 'fe80::dead:beef')
@ -725,7 +739,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
@ -743,7 +757,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
tag_list = instance.tags
self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list)
self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list)
@ -773,7 +787,7 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertTrue(response['EbsOptimized']['Value'])
@ -785,10 +799,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.image_id, 'ubuntu1404')
self.assertEqual(
self.run_instance_calls[0]['MetadataOptions']['HttpTokens'],
self.run_instances_calls[0]['MetadataOptions']['HttpTokens'],
'required')
self.assertEqual(
self.run_instance_calls[0]['MetadataOptions']['HttpEndpoint'],
self.run_instances_calls[0]['MetadataOptions']['HttpEndpoint'],
'enabled')
def test_aws_invalid_instance_type(self):
@ -846,10 +860,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image(self):
@ -889,10 +903,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_snapshot_imdsv2(self):
@ -936,10 +950,10 @@ class TestDriverAws(tests.DBTestCase):
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_image_imdsv2(self):
@ -1235,7 +1249,106 @@ class TestDriverAws(tests.DBTestCase):
# Test creating a spot instances instead of an on-demand on.
req = self.requestNode('aws/aws-spot.yaml', 'ubuntu1404-spot')
node = self.assertSuccess(req)
instance = self.ec2.Instance(node.external_id)
instance = self.ec2.Instance(node.external_id['instance'])
self.assertEqual(instance.instance_lifecycle, 'spot')
# moto doesn't provide the spot_instance_request_id
# self.assertIsNotNone(instance.spot_instance_request_id)
def test_aws_dedicated_host(self):
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
for _ in iterate_timeout(60, Exception,
"Node request state transition",
interval=1):
# Ensure that we can render the node list (and that our
# use of a dictionary for external_id does not cause an
# error).
node_list = nodepool.status.node_list(self.zk)
nodepool.status.output(node_list, 'pretty')
nodepool.status.output(node_list, 'json')
req = self.zk.getNodeRequest(req.id)
if req.state in (zk.FULFILLED,):
break
node = self.assertSuccess(req)
self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY'])
self.assertEqual(node.image_id, 'ubuntu1404')
# Verify instance and host are created
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 1)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 1)
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
# verify instance and host are deleted
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_dedicated_host_instance_failure(self):
self.run_instances_exception = Exception("some failure")
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
self.assertEqual(req.state, zk.FAILED)
# verify instance and host are deleted
provider = self.pool.getProviderManager('ec2-us-west-2')
for _ in iterate_timeout(60, Exception,
"Cloud cleanup",
interval=1):
if not (provider.launchers or provider.deleters):
break
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)
def test_aws_dedicated_host_allocation_failure(self):
self.allocate_hosts_exception = Exception("some failure")
req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu')
self.assertEqual(req.state, zk.FAILED)
# verify instance and host are deleted
provider = self.pool.getProviderManager('ec2-us-west-2')
for _ in iterate_timeout(60, Exception,
"Cloud cleanup",
interval=1):
if not (provider.launchers or provider.deleters):
break
reservations = self.ec2_client.describe_instances()['Reservations']
instances = [
i
for r in reservations
for i in r['Instances']
if i['State']['Name'] != 'terminated'
]
self.assertEqual(len(instances), 0)
hosts = self.ec2_client.describe_hosts()['Hosts']
hosts = [h for h in hosts if h['State'] != 'released']
self.assertEqual(len(hosts), 0)

View File

@ -0,0 +1,9 @@
---
features:
- |
Limited support for AWS dedicated hosts is now available using the
:attr:`providers.[aws].pools.labels.dedicated-host` option. This
allows for a dedicated host to be allocated to serve a single
instance (the dedicated host can not be used for multiple
instances). This enables Nodepool to launch certain instance
types which require dedicated hosts.