diff --git a/doc/source/aws.rst b/doc/source/aws.rst index 633f89988..5f621d415 100644 --- a/doc/source/aws.rst +++ b/doc/source/aws.rst @@ -501,6 +501,13 @@ Selecting the ``aws`` driver adds the following options to the A unique name within the provider for this pool of resources. + .. attr:: availability-zone + :type: str + + If provided, instances launched from this pool will be + assigned to the specified availibility zone. If omitted, AWS + will select from the available zones. + .. attr:: priority :type: int :default: 100 @@ -648,6 +655,23 @@ Selecting the ``aws`` driver adds the following options to the :attr:`providers.[aws].diskimages`. Mutually exclusive with :attr:`providers.[aws].pools.labels.cloud-image` + .. attr:: dedicated-host + :type: bool + + If set to ``true``, an AWS dedicated host will be + allocated for the instance. Nodepool only supports + running a single instance on dedicated hosts, so it will + treat the host and the instance launched on it as a + matched pair. The host will not be used for any other + instances, and will be released when the associated + Nodepool node is deleted. + + If this option is set, the + :attr:`providers.[aws].pools.labels.use-spot` option is + not available, and + :attr:`providers.[aws].pools.availability-zone` + option is required. + .. attr:: ebs-optimized :type: bool :default: False diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index b015fef95..057bfb708 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -64,7 +64,7 @@ def tag_list_to_dict(taglist): # "Running On-Demand High Memory instances" was determined from # https://aws.amazon.com/ec2/instance-types/high-memory/ -QUOTA_CODES = { +INSTANCE_QUOTA_CODES = { # INSTANCE FAMILY: [ON-DEMAND, SPOT] 'a': ['L-1216C47A', 'L-34B43A08'], 'c': ['L-1216C47A', 'L-34B43A08'], @@ -87,6 +87,119 @@ QUOTA_CODES = { 'hpc': ['L-F7808C92', ''] } +HOST_QUOTA_CODES = { + 'a1': 'L-949445B0', + 'c3': 'L-8D142A2E', + 'c4': 'L-E4BF28E0', + 'c5': 'L-81657574', + 'c5a': 'L-03F01FD8', + 'c5d': 'L-C93F66A2', + 'c5n': 'L-20F13EBD', + 'c6a': 'L-D75D2E84', + 'c6g': 'L-A749B537', + 'c6gd': 'L-545AED39', + 'c6gn': 'L-5E3A299D', + 'c6i': 'L-5FA3355A', + 'c6id': 'L-1BBC5241', + 'c6in': 'L-6C2C40CC', + 'c7a': 'L-698B67E5', + 'c7g': 'L-13B8FCE8', + 'c7gd': 'L-EF58B059', + 'c7gn': 'L-97677CE3', + 'c7i': 'L-587AA6E3', + 'd2': 'L-8B27377A', + 'dl1': 'L-AD667A3D', + 'f1': 'L-5C4CD236', + 'g3': 'L-DE82EABA', + 'g3s': 'L-9675FDCD', + 'g4ad': 'L-FD8E9B9A', + 'g4dn': 'L-CAE24619', + 'g5': 'L-A6E7FE5E', + 'g5g': 'L-4714FFEA', + 'g6': 'L-B88B9D6B', + 'gr6': 'L-E68C3AFF', + 'h1': 'L-84391ECC', + 'i2': 'L-6222C1B6', + 'i3': 'L-8E60B0B1', + 'i3en': 'L-77EE2B11', + 'i4g': 'L-F62CBADB', + 'i4i': 'L-0300530D', + 'im4gn': 'L-93155D6F', + 'inf': 'L-5480EFD2', + 'inf2': 'L-E5BCF7B5', + 'is4gen': 'L-CB4F5825', + 'm3': 'L-3C82F907', + 'm4': 'L-EF30B25E', + 'm5': 'L-8B7BF662', + 'm5a': 'L-B10F70D6', + 'm5ad': 'L-74F41837', + 'm5d': 'L-8CCBD91B', + 'm5dn': 'L-DA07429F', + 'm5n': 'L-24D7D4AD', + 'm5zn': 'L-BD9BD803', + 'm6a': 'L-80F2B67F', + 'm6g': 'L-D50A37FA', + 'm6gd': 'L-84FB37AA', + 'm6i': 'L-D269BEFD', + 'm6id': 'L-FDB0A352', + 'm6idn': 'L-9721EDD9', + 'm6in': 'L-D037CF10', + 'm7a': 'L-4740F819', + 'm7g': 'L-9126620E', + 'm7gd': 'L-F8516154', + 'm7i': 'L-30E31217', + 'mac1': 'L-A8448DC5', + 'mac2': 'L-5D8DADF5', + 'mac2-m2': 'L-B90B5B66', + 'mac2-m2pro': 'L-14F120D1', + 'p2': 'L-2753CF59', + 'p3': 'L-A0A19F79', + 'p3dn': 'L-B601B3B6', + 'p4d': 'L-86A789C3', + 'p5': 'L-5136197D', + 'r3': 'L-B7208018', + 'r4': 'L-313524BA', + 'r5': 'L-EA4FD6CF', + 'r5a': 'L-8FE30D52', + 'r5ad': 'L-EC7178B6', + 'r5b': 'L-A2D59C67', + 'r5d': 'L-8814B54F', + 'r5dn': 'L-4AB14223', + 'r5n': 'L-52EF324A', + 'r6a': 'L-BC1589C5', + 'r6g': 'L-B6D6065D', + 'r6gd': 'L-EF284EFB', + 'r6i': 'L-F13A970A', + 'r6id': 'L-B89271A9', + 'r6idn': 'L-C4EABC2C', + 'r6in': 'L-EA99608B', + 'r7a': 'L-4D15192B', + 'r7g': 'L-67B8B4C7', + 'r7gd': 'L-01137DCE', + 'r7i': 'L-55E05032', + 'r7iz': 'L-BC9FCC71', + 't3': 'L-1586174D', + 'trn1': 'L-5E4FB836', + 'trn1n': 'L-39926A58', + 'u-12tb1': 'L-D6994875', + 'u-18tb1': 'L-5F7FD336', + 'u-24tb1': 'L-FACBE655', + 'u-3tb1': 'L-7F5506AB', + 'u-6tb1': 'L-89870E8E', + 'u-9tb1': 'L-98E1FFAC', + 'u7in-16tb': 'L-75B9BECB', + 'u7in-24tb': 'L-CA51381E', + 'u7in-32tb': 'L-9D28191F', + 'vt1': 'L-A68CFBF7', + 'x1': 'L-DE3D9563', + 'x1e': 'L-DEF8E115', + 'x2gd': 'L-5CC9EA82', + 'x2idn': 'L-A84ABF80', + 'x2iedn': 'L-D0AA08B1', + 'x2iezn': 'L-888B4496', + 'z1d': 'L-F035E935', +} + VOLUME_QUOTA_CODES = { 'io1': dict(iops='L-B3A130E6', storage='L-FD252861'), 'io2': dict(iops='L-8D977E7E', storage='L-09BD8365'), @@ -104,9 +217,13 @@ SPOT = 1 class AwsInstance(statemachine.Instance): - def __init__(self, provider, instance, quota): + def __init__(self, provider, instance, host, quota): super().__init__() - self.external_id = instance['InstanceId'] + self.external_id = dict() + if instance: + self.external_id['instance'] = instance['InstanceId'] + if host: + self.external_id['host'] = host['HostId'] self.metadata = tag_list_to_dict(instance.get('Tags')) self.private_ipv4 = instance.get('PrivateIpAddress') self.private_ipv6 = None @@ -131,6 +248,7 @@ class AwsInstance(statemachine.Instance): class AwsResource(statemachine.Resource): + TYPE_HOST = 'host' TYPE_INSTANCE = 'instance' TYPE_AMI = 'ami' TYPE_SNAPSHOT = 'snapshot' @@ -143,24 +261,52 @@ class AwsResource(statemachine.Resource): class AwsDeleteStateMachine(statemachine.StateMachine): - VM_DELETING = 'deleting vm' + HOST_RELEASING_START = 'start releasing host' + HOST_RELEASING = 'releasing host' + INSTANCE_DELETING_START = 'start deleting instance' + INSTANCE_DELETING = 'deleting instance' COMPLETE = 'complete' def __init__(self, adapter, external_id, log): self.log = log super().__init__() self.adapter = adapter + # Backwards compatible for old nodes where external_id is a + # str + if type(external_id) is str: + external_id = dict(instance=external_id) self.external_id = external_id def advance(self): if self.state == self.START: - self.instance = self.adapter._deleteInstance( - self.external_id, self.log) - self.state = self.VM_DELETING + if 'instance' in self.external_id: + self.state = self.INSTANCE_DELETING_START + elif 'host' in self.external_id: + self.state = self.HOST_RELEASING_START + else: + self.state = self.COMPLETE - if self.state == self.VM_DELETING: + if self.state == self.INSTANCE_DELETING_START: + self.instance = self.adapter._deleteInstance( + self.external_id['instance'], self.log) + self.state = self.INSTANCE_DELETING + + if self.state == self.INSTANCE_DELETING: self.instance = self.adapter._refreshDelete(self.instance) if self.instance is None: + if 'host' in self.external_id: + self.state = self.HOST_RELEASING_START + else: + self.state = self.COMPLETE + + if self.state == self.HOST_RELEASING_START: + self.host = self.adapter._releaseHost( + self.external_id['host'], self.log) + self.state = self.HOST_RELEASING + + if self.state == self.HOST_RELEASING: + self.host = self.adapter._refreshDelete(self.host) + if self.host is None: self.state = self.COMPLETE if self.state == self.COMPLETE: @@ -168,6 +314,10 @@ class AwsDeleteStateMachine(statemachine.StateMachine): class AwsCreateStateMachine(statemachine.StateMachine): + HOST_ALLOCATING_START = 'start allocating host' + HOST_ALLOCATING_SUBMIT = 'submit allocating host' + HOST_ALLOCATING = 'allocating host' + INSTANCE_CREATING_START = 'start creating instance' INSTANCE_CREATING_SUBMIT = 'submit creating instance' INSTANCE_CREATING = 'creating instance' COMPLETE = 'complete' @@ -194,12 +344,50 @@ class AwsCreateStateMachine(statemachine.StateMachine): self.public_ipv6 = None self.nic = None self.instance = None + self.host = None + self.external_id = dict() + self.dedicated_host_id = None def advance(self): if self.state == self.START: + if self.label.dedicated_host: + self.state = self.HOST_ALLOCATING_START + else: + self.state = self.INSTANCE_CREATING_START + + if self.state == self.HOST_ALLOCATING_START: + self.host_create_future = self.adapter._submitAllocateHost( + self.label, + self.tags, self.hostname, self.log) + self.state = self.HOST_ALLOCATING_SUBMIT + + if self.state == self.HOST_ALLOCATING_SUBMIT: + host = self.adapter._completeAllocateHost(self.host_create_future) + if host is None: + return + self.host = host + self.external_id['host'] = host['HostId'] + self.state = self.HOST_ALLOCATING + + if self.state == self.HOST_ALLOCATING: + self.host = self.adapter._refresh(self.host) + + state = self.host['State'].lower() + if state == 'available': + self.dedicated_host_id = self.host['HostId'] + self.state = self.INSTANCE_CREATING_START + elif state in [ + 'permanent-failure', 'released', + 'released-permanent-failure']: + raise exceptions.LaunchStatusException( + f"Host in {state} state") + else: + return + + if self.state == self.INSTANCE_CREATING_START: self.create_future = self.adapter._submitCreateInstance( self.label, self.image_external_id, - self.tags, self.hostname, self.log) + self.tags, self.hostname, self.dedicated_host_id, self.log) self.state = self.INSTANCE_CREATING_SUBMIT if self.state == self.INSTANCE_CREATING_SUBMIT: @@ -207,7 +395,7 @@ class AwsCreateStateMachine(statemachine.StateMachine): if instance is None: return self.instance = instance - self.external_id = instance['InstanceId'] + self.external_id['instance'] = instance['InstanceId'] self.quota = self.adapter.getQuotaForLabel(self.label) self.state = self.INSTANCE_CREATING @@ -225,7 +413,7 @@ class AwsCreateStateMachine(statemachine.StateMachine): if self.state == self.COMPLETE: self.complete = True return AwsInstance(self.adapter.provider, self.instance, - self.quota) + self.host, self.quota) class AwsAdapter(statemachine.Adapter): @@ -271,7 +459,8 @@ class AwsAdapter(statemachine.Adapter): # by larger batches. That strikes a balance between # responsiveness and efficiency. Reducing the overall number # of requests leaves more time for create instance calls. - self.delete_queue = queue.Queue() + self.delete_host_queue = queue.Queue() + self.delete_instance_queue = queue.Queue() self.delete_thread = threading.Thread(target=self._deleteThread) self.delete_thread.daemon = True self.delete_thread.start() @@ -311,6 +500,9 @@ class AwsAdapter(statemachine.Adapter): # asynchronously update the cached values, meanwhile returning # the previous cached data if available. This means every # call after the first one is instantaneous. + self._listHosts = LazyExecutorTTLCache( + CACHE_TTL, self.api_executor)( + self._listHosts) self._listInstances = LazyExecutorTTLCache( CACHE_TTL, self.api_executor)( self._listInstances) @@ -357,6 +549,16 @@ class AwsAdapter(statemachine.Adapter): def listResources(self): self._tagSnapshots() self._tagAmis() + for host in self._listHosts(): + try: + if host['State'].lower() in [ + "released", "released-permanent-failure"]: + continue + except botocore.exceptions.ClientError: + continue + yield AwsResource(tag_list_to_dict(host.get('Tags')), + AwsResource.TYPE_HOST, + host['HostId']) for instance in self._listInstances(): try: if instance['State']['Name'].lower() == "terminated": @@ -403,6 +605,8 @@ class AwsAdapter(statemachine.Adapter): def deleteResource(self, resource): self.log.info(f"Deleting leaked {resource.type}: {resource.id}") + if resource.type == AwsResource.TYPE_HOST: + self._releaseHost(resource.id, immediate=True) if resource.type == AwsResource.TYPE_INSTANCE: self._deleteInstance(resource.id, immediate=True) if resource.type == AwsResource.TYPE_VOLUME: @@ -421,10 +625,25 @@ class AwsAdapter(statemachine.Adapter): for instance in self._listInstances(): if instance['State']["Name"].lower() == "terminated": continue - quota = self._getQuotaForInstanceType( - instance['InstanceType'], - SPOT if instance.get('InstanceLifecycle') == 'spot' - else ON_DEMAND) + # For now, we are optimistically assuming that when an + # instance is launched on a dedicated host, it is not + # counted against instance quota. That may be overly + # optimistic. If it is, then we will merge the two quotas + # below rather than switch. + # Additionally, we are using the instance as a proxy for + # the host. It would be more correct to also list hosts + # here to include hosts with no instances. But since our + # support for dedicated hosts is currently 1:1 with + # instances, this should be sufficient. + if instance['Placement'].get('HostId'): + # Dedicated host + quota = self._getQuotaForHostType( + instance['InstanceType']) + else: + quota = self._getQuotaForInstanceType( + instance['InstanceType'], + SPOT if instance.get('InstanceLifecycle') == 'spot' + else ON_DEMAND) for attachment in instance['BlockDeviceMappings']: volume_id = attachment['Ebs']['VolumeId'] volume = volumes.get(volume_id) @@ -435,20 +654,24 @@ class AwsAdapter(statemachine.Adapter): continue quota.add(self._getQuotaForVolume(volume)) - yield AwsInstance(self.provider, instance, quota) + yield AwsInstance(self.provider, instance, None, quota) def getQuotaLimits(self): # Get the instance and volume types that this provider handles instance_types = {} + host_types = set() volume_types = set() ec2_quotas = self._listEC2Quotas() ebs_quotas = self._listEBSQuotas() for pool in self.provider.pools.values(): for label in pool.labels.values(): - if label.instance_type not in instance_types: - instance_types[label.instance_type] = set() - instance_types[label.instance_type].add( - SPOT if label.use_spot else ON_DEMAND) + if label.dedicated_host: + host_types.add(label.instance_type) + else: + if label.instance_type not in instance_types: + instance_types[label.instance_type] = set() + instance_types[label.instance_type].add( + SPOT if label.use_spot else ON_DEMAND) if label.volume_type: volume_types.add(label.volume_type) args = dict(default=math.inf) @@ -466,6 +689,18 @@ class AwsAdapter(statemachine.Adapter): code, instance_type) continue args[code] = ec2_quotas[code] + for host_type in host_types: + code = self._getQuotaCodeForHostType(host_type) + if code in args: + continue + if not code: + continue + if code not in ec2_quotas: + self.log.warning( + "AWS quota code %s for host type: %s not known", + code, host_type) + continue + args[code] = ec2_quotas[code] for volume_type in volume_types: vquota_codes = VOLUME_QUOTA_CODES.get(volume_type) if not vquota_codes: @@ -490,9 +725,18 @@ class AwsAdapter(statemachine.Adapter): return QuotaInformation(**args) def getQuotaForLabel(self, label): - quota = self._getQuotaForInstanceType( - label.instance_type, - SPOT if label.use_spot else ON_DEMAND) + # For now, we are optimistically assuming that when an + # instance is launched on a dedicated host, it is not counted + # against instance quota. That may be overly optimistic. If + # it is, then we will merge the two quotas below rather than + # switch. + if label.dedicated_host: + quota = self._getQuotaForHostType( + label.instance_type) + else: + quota = self._getQuotaForInstanceType( + label.instance_type, + SPOT if label.use_spot else ON_DEMAND) if label.volume_type: quota.add(self._getQuotaForVolumeType( label.volume_type, @@ -926,7 +1170,7 @@ class AwsAdapter(statemachine.Adapter): m = self.instance_key_re.match(instance_type) if m: key = m.group(1) - code = QUOTA_CODES.get(key) + code = INSTANCE_QUOTA_CODES.get(key) if code: return code[market_type_option] self.log.warning( @@ -962,6 +1206,28 @@ class AwsAdapter(statemachine.Adapter): return QuotaInformation(**args) + host_key_re = re.compile(r'([a-z\d\-]+)\..*') + + def _getQuotaCodeForHostType(self, host_type): + m = self.host_key_re.match(host_type) + if m: + key = m.group(1) + code = HOST_QUOTA_CODES.get(key) + if code: + return code + self.log.warning( + "Unknown quota code for host type: %s", + host_type) + return None + + def _getQuotaForHostType(self, host_type): + code = self._getQuotaCodeForHostType(host_type) + args = dict(instances=1) + if code: + args[code] = 1 + + return QuotaInformation(**args) + def _getQuotaForVolume(self, volume): volume_type = volume['VolumeType'] vquota_codes = VOLUME_QUOTA_CODES.get(volume_type, {}) @@ -990,20 +1256,33 @@ class AwsAdapter(statemachine.Adapter): InstanceTypes=[instance_type]) def _refresh(self, obj): - for instance in self._listInstances(): - if instance['InstanceId'] == obj['InstanceId']: - return instance + if 'InstanceId' in obj: + for instance in self._listInstances(): + if instance['InstanceId'] == obj['InstanceId']: + return instance + elif 'HostId' in obj: + for host in self._listHosts(): + if host['HostId'] == obj['HostId']: + return host return obj def _refreshDelete(self, obj): if obj is None: return obj - for instance in self._listInstances(): - if instance['InstanceId'] == obj['InstanceId']: - if instance['State']['Name'].lower() == "terminated": - return None - return instance + if 'InstanceId' in obj: + for instance in self._listInstances(): + if instance['InstanceId'] == obj['InstanceId']: + if instance['State']['Name'].lower() == "terminated": + return None + return instance + elif 'HostId' in obj: + for host in self._listHosts(): + if host['HostId'] == obj['HostId']: + if host['State'].lower() in [ + 'released', 'released-permanent-failure']: + return None + return host return None def _listServiceQuotas(self, service_code): @@ -1023,6 +1302,15 @@ class AwsAdapter(statemachine.Adapter): def _listEBSQuotas(self): return self._listServiceQuotas('ebs') + def _listHosts(self): + with self.non_mutating_rate_limiter( + self.log.debug, "Listed hosts"): + paginator = self.ec2_client.get_paginator('describe_hosts') + hosts = [] + for page in paginator.paginate(): + hosts.extend(page['Hosts']) + return hosts + def _listInstances(self): with self.non_mutating_rate_limiter( self.log.debug, "Listed instances"): @@ -1115,12 +1403,63 @@ class AwsAdapter(statemachine.Adapter): resp = self.ec2_client.describe_images(ImageIds=[image_id]) return resp['Images'][0] + def _submitAllocateHost(self, label, + tags, hostname, log): + return self.create_executor.submit( + self._allocateHost, + label, + tags, hostname, log) + + def _completeAllocateHost(self, future): + if not future.done(): + return None + try: + return future.result() + except botocore.exceptions.ClientError as error: + if error.response['Error']['Code'] == 'HostLimitExceeded': + # Re-raise as a quota exception so that the + # statemachine driver resets quota. + raise exceptions.QuotaException(str(error)) + if (error.response['Error']['Code'] == + 'InsufficientInstanceCapacity'): + # Re-raise as CapacityException so it would have + # "error.capacity" statsd_key, which can be handled + # differently than "error.unknown" + raise exceptions.CapacityException(str(error)) + raise + + def _allocateHost(self, label, + tags, hostname, log): + args = dict( + AutoPlacement='off', + AvailabilityZone=label.pool.az, + InstanceType=label.instance_type, + Quantity=1, + HostRecovery='off', + HostMaintenance='off', + TagSpecifications=[ + { + 'ResourceType': 'dedicated-host', + 'Tags': tag_dict_to_list(tags), + }, + ] + ) + + with self.rate_limiter(log.debug, "Allocated host"): + log.debug("Allocating host %s", hostname) + resp = self.ec2_client.allocate_hosts(**args) + host_ids = resp['HostIds'] + log.debug("Allocated host %s as host %s", + hostname, host_ids[0]) + return dict(HostId=host_ids[0], + State='pending') + def _submitCreateInstance(self, label, image_external_id, - tags, hostname, log): + tags, hostname, dedicated_host_id, log): return self.create_executor.submit( self._createInstance, label, image_external_id, - tags, hostname, log) + tags, hostname, dedicated_host_id, log) def _completeCreateInstance(self, future): if not future.done(): @@ -1141,7 +1480,7 @@ class AwsAdapter(statemachine.Adapter): raise def _createInstance(self, label, image_external_id, - tags, hostname, log): + tags, hostname, dedicated_host_id, log): if image_external_id: image_id = image_external_id else: @@ -1240,12 +1579,29 @@ class AwsAdapter(statemachine.Adapter): 'HttpEndpoint': 'enabled', } + if dedicated_host_id: + placement = args.setdefault('Placement', {}) + placement.update({ + 'Tenancy': 'host', + 'HostId': dedicated_host_id, + 'Affinity': 'host', + }) + + if label.pool.az: + placement = args.setdefault('Placement', {}) + placement['AvailabilityZone'] = label.pool.az + with self.rate_limiter(log.debug, "Created instance"): log.debug("Creating VM %s", hostname) resp = self.ec2_client.run_instances(**args) instances = resp['Instances'] - log.debug("Created VM %s as instance %s", - hostname, instances[0]['InstanceId']) + if dedicated_host_id: + log.debug("Created VM %s as instance %s on host %s", + hostname, instances[0]['InstanceId'], + dedicated_host_id) + else: + log.debug("Created VM %s as instance %s", + hostname, instances[0]['InstanceId']) return instances[0] def _deleteThread(self): @@ -1256,28 +1612,62 @@ class AwsAdapter(statemachine.Adapter): self.log.exception("Error in delete thread:") time.sleep(5) - def _deleteThreadInner(self): + @staticmethod + def _getBatch(the_queue): records = [] try: - records.append(self.delete_queue.get(block=True, timeout=10)) + records.append(the_queue.get(block=True, timeout=10)) except queue.Empty: - return + return [] while True: try: - records.append(self.delete_queue.get(block=False)) + records.append(the_queue.get(block=False)) except queue.Empty: break # The terminate call has a limit of 1k, but AWS recommends # smaller batches. We limit to 50 here. if len(records) >= 50: break - ids = [] - for (del_id, log) in records: - ids.append(del_id) - log.debug(f"Deleting instance {del_id}") - count = len(ids) - with self.rate_limiter(log.debug, f"Deleted {count} instances"): - self.ec2_client.terminate_instances(InstanceIds=ids) + return records + + def _deleteThreadInner(self): + records = self._getBatch(self.delete_instance_queue) + if records: + ids = [] + for (del_id, log) in records: + ids.append(del_id) + log.debug(f"Deleting instance {del_id}") + count = len(ids) + with self.rate_limiter(log.debug, f"Deleted {count} instances"): + self.ec2_client.terminate_instances(InstanceIds=ids) + + records = self._getBatch(self.delete_host_queue) + if records: + ids = [] + for (del_id, log) in records: + ids.append(del_id) + log.debug(f"Releasing host {del_id}") + count = len(ids) + with self.rate_limiter(log.debug, f"Released {count} hosts"): + self.ec2_client.release_hosts(HostIds=ids) + + def _releaseHost(self, external_id, log=None, immediate=False): + if log is None: + log = self.log + for host in self._listHosts(): + if host['HostId'] == external_id: + break + else: + log.warning(f"Host not found when releasing {external_id}") + return None + if immediate: + with self.rate_limiter(log.debug, "Released host"): + log.debug(f"Deleting host {external_id}") + self.ec2_client.release_hosts( + HostIds=[host['HostId']]) + else: + self.delete_host_queue.put((external_id, log)) + return host def _deleteInstance(self, external_id, log=None, immediate=False): if log is None: @@ -1294,7 +1684,7 @@ class AwsAdapter(statemachine.Adapter): self.ec2_client.terminate_instances( InstanceIds=[instance['InstanceId']]) else: - self.delete_queue.put((external_id, log)) + self.delete_instance_queue.put((external_id, log)) return instance def _deleteVolume(self, external_id): diff --git a/nodepool/driver/aws/config.py b/nodepool/driver/aws/config.py index 58a322afe..a9c8ed916 100644 --- a/nodepool/driver/aws/config.py +++ b/nodepool/driver/aws/config.py @@ -186,6 +186,14 @@ class AwsLabel(ConfigValue): self.host_key_checking = self.pool.host_key_checking self.use_spot = bool(label.get('use-spot', False)) self.imdsv2 = label.get('imdsv2', None) + self.dedicated_host = bool(label.get('dedicated-host', False)) + if self.dedicated_host: + if self.use_spot: + raise Exception( + "Spot instances can not be used on dedicated hosts") + if not self.pool.az: + raise Exception( + "Availability-zone is required for dedicated hosts") @staticmethod def getSchema(): @@ -209,6 +217,7 @@ class AwsLabel(ConfigValue): 'dynamic-tags': dict, 'use-spot': bool, 'imdsv2': v.Any(None, 'required', 'optional'), + 'dedicated-host': bool, } @@ -243,6 +252,7 @@ class AwsPool(ConfigPool): self.max_resources = self.provider.max_resources.copy() for k, val in pool_config.get('max-resources', {}).items(): self.max_resources[k] = val + self.az = pool_config.get('availability-zone') @staticmethod def getSchema(): @@ -261,6 +271,7 @@ class AwsPool(ConfigPool): 'max-cores': int, 'max-ram': int, 'max-resources': {str: int}, + 'availability-zone': str, }) return pool diff --git a/nodepool/driver/statemachine.py b/nodepool/driver/statemachine.py index bb1c1a2ec..a90ea0bfe 100644 --- a/nodepool/driver/statemachine.py +++ b/nodepool/driver/statemachine.py @@ -314,8 +314,9 @@ class StateMachineNodeLauncher(stats.StatsReporter): statsd_key = 'error.zksession' self.manager.nodescan_worker.removeRequest(self.nodescan_request) self.nodescan_request = None - except exceptions.QuotaException: - self.log.info("Aborting node %s due to quota failure", node.id) + except exceptions.QuotaException as e: + self.log.info("Aborting node %s due to quota failure: %s", + node.id, str(e)) node.state = zk.ABORTED if self.state_machine: node.external_id = self.state_machine.external_id @@ -1000,7 +1001,7 @@ class Instance: * ready: bool (whether the instance is ready) * deleted: bool (whether the instance is in a deleted state) - * external_id: str (the unique id of the instance) + * external_id: str or dict (the unique id of the instance) * interface_ip: str * metadata: dict @@ -1549,7 +1550,7 @@ class Adapter: This method should return a new state machine object initialized to delete the described instance. - :param str external_id: The external_id of the instance, as + :param str or dict external_id: The external_id of the instance, as supplied by a creation StateMachine or an Instance. :param log Logger: A logger instance for emitting annotated logs related to the request. @@ -1682,7 +1683,7 @@ class Adapter: """Return the console log from the specified server :param label ConfigLabel: The label config for the node - :param external_id str: The external id of the server + :param external_id str or dict: The external id of the server """ raise NotImplementedError() @@ -1690,6 +1691,6 @@ class Adapter: """Notify the adapter of a nodescan failure :param label ConfigLabel: The label config for the node - :param external_id str: The external id of the server + :param external_id str or dict: The external id of the server """ pass diff --git a/nodepool/tests/fixtures/aws/aws-dedicated-host.yaml b/nodepool/tests/fixtures/aws/aws-dedicated-host.yaml new file mode 100644 index 000000000..dc0b3ad03 --- /dev/null +++ b/nodepool/tests/fixtures/aws/aws-dedicated-host.yaml @@ -0,0 +1,37 @@ +zookeeper-servers: + - host: {zookeeper_host} + port: {zookeeper_port} + chroot: {zookeeper_chroot} + +zookeeper-tls: + ca: {zookeeper_ca} + cert: {zookeeper_cert} + key: {zookeeper_key} + +labels: + - name: ubuntu + +providers: + - name: ec2-us-west-2 + driver: aws + region-name: us-west-2 + cloud-images: + - name: ubuntu1404 + image-id: ami-1e749f67 + username: ubuntu + launch-retries: 1 + pools: + - name: main + availability-zone: us-west-2a + max-servers: 10 + subnet-id: {subnet_id} + security-group-id: {security_group_id} + node-attributes: + key1: value1 + key2: value2 + labels: + - name: ubuntu + cloud-image: ubuntu1404 + instance-type: t3.medium + key-name: zuul + dedicated-host: True diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index 52d0f9772..b2ed241a1 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -26,6 +26,7 @@ import testtools from nodepool import config as nodepool_config from nodepool import tests +import nodepool.status from nodepool.zk import zookeeper as zk from nodepool.nodeutils import iterate_timeout import nodepool.driver.statemachine @@ -49,9 +50,16 @@ class FakeAwsAdapter(AwsAdapter): # when in fake mode so we need to intercept the # run_instances call and validate the args we supply. def _fake_run_instances(*args, **kwargs): - self.__testcase.run_instance_calls.append(kwargs) + self.__testcase.run_instances_calls.append(kwargs) + if self.__testcase.run_instances_exception: + raise self.__testcase.run_instances_exception return self.ec2_client.run_instances_orig(*args, **kwargs) + def _fake_allocate_hosts(*args, **kwargs): + if self.__testcase.allocate_hosts_exception: + raise self.__testcase.allocate_hosts_exception + return self.ec2_client.allocate_hosts_orig(*args, **kwargs) + # The ImdsSupport parameter isn't handled by moto def _fake_register_image(*args, **kwargs): self.__testcase.register_image_calls.append(kwargs) @@ -65,6 +73,8 @@ class FakeAwsAdapter(AwsAdapter): self.ec2_client.run_instances_orig = self.ec2_client.run_instances self.ec2_client.run_instances = _fake_run_instances + self.ec2_client.allocate_hosts_orig = self.ec2_client.allocate_hosts + self.ec2_client.allocate_hosts = _fake_allocate_hosts self.ec2_client.register_image_orig = self.ec2_client.register_image self.ec2_client.register_image = _fake_register_image self.ec2_client.import_snapshot = \ @@ -161,7 +171,9 @@ class TestDriverAws(tests.DBTestCase): CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}) # A list of args to method calls for validation - self.run_instance_calls = [] + self.run_instances_calls = [] + self.run_instances_exception = None + self.allocate_hosts_exception = None self.register_image_calls = [] # TEST-NET-3 @@ -236,8 +248,8 @@ class TestDriverAws(tests.DBTestCase): def requestNode(self, config_path, label): # A helper method to perform a single node request configfile = self.setup_config(config_path) - pool = self.useNodepool(configfile, watermark_sleep=1) - self.startPool(pool) + self.pool = self.useNodepool(configfile, watermark_sleep=1) + self.startPool(self.pool) req = zk.NodeRequest() req.state = zk.REQUESTED @@ -583,12 +595,12 @@ class TestDriverAws(tests.DBTestCase): # Like us-west-2x where x is random self.assertTrue(len(node.az) == len('us-west-2x')) - instance = self.ec2.Instance(node.external_id) + instance = self.ec2.Instance(node.external_id['instance']) response = instance.describe_attribute(Attribute='ebsOptimized') self.assertFalse(response['EbsOptimized']['Value']) self.assertFalse( - 'MetadataOptions' in self.run_instance_calls[0]) + 'MetadataOptions' in self.run_instances_calls[0]) node.state = zk.USED self.zk.storeNode(node) @@ -630,7 +642,7 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY']) self.assertEqual(node.image_id, 'ubuntu1404') - instance = self.ec2.Instance(node.external_id) + instance = self.ec2.Instance(node.external_id['instance']) response = instance.describe_attribute( Attribute='userData') self.assertIn('UserData', response) @@ -647,7 +659,8 @@ class TestDriverAws(tests.DBTestCase): associations = self.ec2_client.\ describe_iam_instance_profile_associations()[ "IamInstanceProfileAssociations"] - self.assertEqual(node.external_id, associations[0]['InstanceId']) + self.assertEqual(node.external_id['instance'], + associations[0]['InstanceId']) self.assertEqual(self.instance_profile_arn, associations[0]['IamInstanceProfile']['Arn']) @@ -660,7 +673,8 @@ class TestDriverAws(tests.DBTestCase): associations = self.ec2_client.\ describe_iam_instance_profile_associations()[ "IamInstanceProfileAssociations"] - self.assertEqual(node.external_id, associations[0]['InstanceId']) + self.assertEqual(node.external_id['instance'], + associations[0]['InstanceId']) self.assertEqual(self.instance_profile_arn, associations[0]['IamInstanceProfile']['Arn']) @@ -696,7 +710,7 @@ class TestDriverAws(tests.DBTestCase): # Make sure we make the call to AWS as expected self.assertEqual( - self.run_instance_calls[0]['NetworkInterfaces'] + self.run_instances_calls[0]['NetworkInterfaces'] [0]['Ipv6AddressCount'], 1) # This is like what we should get back from AWS, verify the @@ -712,7 +726,7 @@ class TestDriverAws(tests.DBTestCase): instance['NetworkInterfaces'] = [iface] provider = Dummy() provider.region_name = 'us-west-2' - awsi = AwsInstance(provider, instance, None) + awsi = AwsInstance(provider, instance, None, None) self.assertEqual(awsi.public_ipv4, '1.2.3.4') self.assertEqual(awsi.private_ipv4, '10.0.0.1') self.assertEqual(awsi.public_ipv6, 'fe80::dead:beef') @@ -725,7 +739,7 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY']) self.assertEqual(node.image_id, 'ubuntu1404') - instance = self.ec2.Instance(node.external_id) + instance = self.ec2.Instance(node.external_id['instance']) tag_list = instance.tags self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list) self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list) @@ -743,7 +757,7 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY']) self.assertEqual(node.image_id, 'ubuntu1404') - instance = self.ec2.Instance(node.external_id) + instance = self.ec2.Instance(node.external_id['instance']) tag_list = instance.tags self.assertIn({"Key": "has-tags", "Value": "true"}, tag_list) self.assertIn({"Key": "Name", "Value": "np0000000000"}, tag_list) @@ -773,7 +787,7 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY']) self.assertEqual(node.image_id, 'ubuntu1404') - instance = self.ec2.Instance(node.external_id) + instance = self.ec2.Instance(node.external_id['instance']) response = instance.describe_attribute(Attribute='ebsOptimized') self.assertTrue(response['EbsOptimized']['Value']) @@ -785,10 +799,10 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.image_id, 'ubuntu1404') self.assertEqual( - self.run_instance_calls[0]['MetadataOptions']['HttpTokens'], + self.run_instances_calls[0]['MetadataOptions']['HttpTokens'], 'required') self.assertEqual( - self.run_instance_calls[0]['MetadataOptions']['HttpEndpoint'], + self.run_instances_calls[0]['MetadataOptions']['HttpEndpoint'], 'enabled') def test_aws_invalid_instance_type(self): @@ -846,10 +860,10 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.attributes, {'key1': 'value1', 'key2': 'value2'}) self.assertEqual( - self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs'] + self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs'] ['Iops'], 2000) self.assertEqual( - self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs'] + self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs'] ['Throughput'], 200) def test_aws_diskimage_image(self): @@ -889,10 +903,10 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.attributes, {'key1': 'value1', 'key2': 'value2'}) self.assertEqual( - self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs'] + self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs'] ['Iops'], 2000) self.assertEqual( - self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs'] + self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs'] ['Throughput'], 200) def test_aws_diskimage_snapshot_imdsv2(self): @@ -936,10 +950,10 @@ class TestDriverAws(tests.DBTestCase): self.assertEqual(node.attributes, {'key1': 'value1', 'key2': 'value2'}) self.assertEqual( - self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs'] + self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs'] ['Iops'], 2000) self.assertEqual( - self.run_instance_calls[0]['BlockDeviceMappings'][0]['Ebs'] + self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs'] ['Throughput'], 200) def test_aws_diskimage_image_imdsv2(self): @@ -1235,7 +1249,106 @@ class TestDriverAws(tests.DBTestCase): # Test creating a spot instances instead of an on-demand on. req = self.requestNode('aws/aws-spot.yaml', 'ubuntu1404-spot') node = self.assertSuccess(req) - instance = self.ec2.Instance(node.external_id) + instance = self.ec2.Instance(node.external_id['instance']) self.assertEqual(instance.instance_lifecycle, 'spot') # moto doesn't provide the spot_instance_request_id # self.assertIsNotNone(instance.spot_instance_request_id) + + def test_aws_dedicated_host(self): + req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu') + for _ in iterate_timeout(60, Exception, + "Node request state transition", + interval=1): + # Ensure that we can render the node list (and that our + # use of a dictionary for external_id does not cause an + # error). + node_list = nodepool.status.node_list(self.zk) + nodepool.status.output(node_list, 'pretty') + nodepool.status.output(node_list, 'json') + req = self.zk.getNodeRequest(req.id) + if req.state in (zk.FULFILLED,): + break + node = self.assertSuccess(req) + + self.assertEqual(node.host_keys, ['ssh-rsa FAKEKEY']) + self.assertEqual(node.image_id, 'ubuntu1404') + + # Verify instance and host are created + reservations = self.ec2_client.describe_instances()['Reservations'] + instances = [ + i + for r in reservations + for i in r['Instances'] + if i['State']['Name'] != 'terminated' + ] + self.assertEqual(len(instances), 1) + hosts = self.ec2_client.describe_hosts()['Hosts'] + hosts = [h for h in hosts if h['State'] != 'released'] + self.assertEqual(len(hosts), 1) + + node.state = zk.USED + self.zk.storeNode(node) + self.waitForNodeDeletion(node) + + # verify instance and host are deleted + reservations = self.ec2_client.describe_instances()['Reservations'] + instances = [ + i + for r in reservations + for i in r['Instances'] + if i['State']['Name'] != 'terminated' + ] + self.assertEqual(len(instances), 0) + hosts = self.ec2_client.describe_hosts()['Hosts'] + hosts = [h for h in hosts if h['State'] != 'released'] + self.assertEqual(len(hosts), 0) + + def test_aws_dedicated_host_instance_failure(self): + self.run_instances_exception = Exception("some failure") + req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu') + self.assertEqual(req.state, zk.FAILED) + + # verify instance and host are deleted + provider = self.pool.getProviderManager('ec2-us-west-2') + for _ in iterate_timeout(60, Exception, + "Cloud cleanup", + interval=1): + if not (provider.launchers or provider.deleters): + break + + reservations = self.ec2_client.describe_instances()['Reservations'] + instances = [ + i + for r in reservations + for i in r['Instances'] + if i['State']['Name'] != 'terminated' + ] + self.assertEqual(len(instances), 0) + hosts = self.ec2_client.describe_hosts()['Hosts'] + hosts = [h for h in hosts if h['State'] != 'released'] + self.assertEqual(len(hosts), 0) + + def test_aws_dedicated_host_allocation_failure(self): + self.allocate_hosts_exception = Exception("some failure") + req = self.requestNode('aws/aws-dedicated-host.yaml', 'ubuntu') + self.assertEqual(req.state, zk.FAILED) + + # verify instance and host are deleted + provider = self.pool.getProviderManager('ec2-us-west-2') + for _ in iterate_timeout(60, Exception, + "Cloud cleanup", + interval=1): + if not (provider.launchers or provider.deleters): + break + + reservations = self.ec2_client.describe_instances()['Reservations'] + instances = [ + i + for r in reservations + for i in r['Instances'] + if i['State']['Name'] != 'terminated' + ] + self.assertEqual(len(instances), 0) + hosts = self.ec2_client.describe_hosts()['Hosts'] + hosts = [h for h in hosts if h['State'] != 'released'] + self.assertEqual(len(hosts), 0) diff --git a/releasenotes/notes/aws-dedicated-hosts-5b68f1174d8f242c.yaml b/releasenotes/notes/aws-dedicated-hosts-5b68f1174d8f242c.yaml new file mode 100644 index 000000000..e403e2e4f --- /dev/null +++ b/releasenotes/notes/aws-dedicated-hosts-5b68f1174d8f242c.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + Limited support for AWS dedicated hosts is now available using the + :attr:`providers.[aws].pools.labels.dedicated-host` option. This + allows for a dedicated host to be allocated to serve a single + instance (the dedicated host can not be used for multiple + instances). This enables Nodepool to launch certain instance + types which require dedicated hosts.