Merge "Update AWS driver to use statemachine framework"
This commit is contained in:
commit
2a85b6dd0a
@ -2,13 +2,16 @@
|
||||
|
||||
.. default-domain:: zuul
|
||||
|
||||
AWS EC2 Driver
|
||||
--------------
|
||||
AWS Driver
|
||||
----------
|
||||
|
||||
Selecting the aws driver adds the following options to the :attr:`providers`
|
||||
section of the configuration.
|
||||
If using the AWS driver to upload diskimages, see
|
||||
`VM Import/Export service role`_ for information on configuring
|
||||
the required permissions in AWS. You must also create an S3 Bucket
|
||||
for use by Nodepool.
|
||||
|
||||
.. note:: Quota support is not implemented.
|
||||
Selecting the ``aws`` driver adds the following options to the
|
||||
:attr:`providers` section of the configuration.
|
||||
|
||||
.. attr-overview::
|
||||
:prefix: providers.[aws]
|
||||
@ -86,17 +89,59 @@ section of the configuration.
|
||||
|
||||
.. attr:: boot-timeout
|
||||
:type: int seconds
|
||||
:default: 60
|
||||
:default: 180
|
||||
|
||||
Once an instance is active, how long to try connecting to the
|
||||
image via SSH. If the timeout is exceeded, the node launch is
|
||||
aborted and the instance deleted.
|
||||
|
||||
.. attr:: launch-timeout
|
||||
:type: int seconds
|
||||
:default: 3600
|
||||
|
||||
The time to wait from issuing the command to create a new instance
|
||||
until that instance is reported as "active". If the timeout is
|
||||
exceeded, the node launch is aborted and the instance deleted.
|
||||
|
||||
.. attr:: launch-retries
|
||||
:default: 3
|
||||
|
||||
The number of times to retry launching a node before considering
|
||||
the job failed.
|
||||
the request failed.
|
||||
|
||||
.. attr:: post-upload-hook
|
||||
:type: string
|
||||
:default: None
|
||||
|
||||
Filename of an optional script that can be called after an image has
|
||||
been uploaded to a provider but before it is taken into use. This is
|
||||
useful to perform last minute validation tests before an image is
|
||||
really used for build nodes. The script will be called as follows:
|
||||
|
||||
``<SCRIPT> <PROVIDER> <EXTERNAL_IMAGE_ID> <LOCAL_IMAGE_FILENAME>``
|
||||
|
||||
If the script returns with result code 0 it is treated as successful
|
||||
otherwise it is treated as failed and the image gets deleted.
|
||||
|
||||
.. attr:: object-storage
|
||||
|
||||
This section is only required when using Nodepool to upload
|
||||
diskimages.
|
||||
|
||||
.. attr:: bucket-name
|
||||
|
||||
The name of a bucket to use for temporary storage of
|
||||
diskimages while creating snapshots. The bucket must already
|
||||
exist.
|
||||
|
||||
.. attr:: image-format
|
||||
:type: str
|
||||
:default: raw
|
||||
|
||||
The image format that should be requested from diskimage-builder
|
||||
and also specified to AWS when importing images. One of:
|
||||
``ova``, ``vhd``, ``vhdx``, ``vmdk``, ``raw`` (not all of which
|
||||
are supported by diskimage-builder).
|
||||
|
||||
.. attr:: cloud-images
|
||||
:type: list
|
||||
@ -137,15 +182,19 @@ section of the configuration.
|
||||
.. attr:: image-id
|
||||
:type: str
|
||||
|
||||
If this is provided, it is used to select the image from the cloud
|
||||
provider by ID.
|
||||
If this is provided, it is used to select the image from the
|
||||
cloud provider by ID. Either this field or
|
||||
:attr:`providers.[aws].cloud-images.image-filters` must be
|
||||
provided.
|
||||
|
||||
.. attr:: image-filters
|
||||
:type: list
|
||||
|
||||
If provided, this is used to select an AMI by filters. If the filters
|
||||
provided match more than one image, the most recent will be returned.
|
||||
`image-filters` are not valid if `image-id` is also specified.
|
||||
If provided, this is used to select an AMI by filters. If
|
||||
the filters provided match more than one image, the most
|
||||
recent will be returned. Either this field or
|
||||
:attr:`providers.[aws].cloud-images.image-id` must be
|
||||
provided.
|
||||
|
||||
Each entry is a dictionary with the following keys
|
||||
|
||||
@ -159,7 +208,7 @@ section of the configuration.
|
||||
:type: list
|
||||
:required:
|
||||
|
||||
A list of str values to filter on
|
||||
A list of string values on which to filter.
|
||||
|
||||
.. attr:: username
|
||||
:type: str
|
||||
@ -204,7 +253,93 @@ section of the configuration.
|
||||
- If the default shell is not Bourne compatible (sh), but instead
|
||||
e.g. ``csh`` or ``fish``, and the user is aware that there is a
|
||||
long-standing issue with ``ansible_shell_type`` in combination
|
||||
with ``become``
|
||||
with ``become``.
|
||||
|
||||
.. attr:: diskimages
|
||||
:type: list
|
||||
|
||||
Each entry in a provider's `diskimages` section must correspond
|
||||
to an entry in :attr:`diskimages`. Such an entry indicates that
|
||||
the corresponding diskimage should be uploaded for use in this
|
||||
provider. Additionally, any nodes that are created using the
|
||||
uploaded image will have the associated attributes (such as
|
||||
flavor or metadata).
|
||||
|
||||
If an image is removed from this section, any previously uploaded
|
||||
images will be deleted from the provider.
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
diskimages:
|
||||
- name: bionic
|
||||
pause: False
|
||||
- name: windows
|
||||
connection-type: winrm
|
||||
connection-port: 5986
|
||||
|
||||
|
||||
Each entry is a dictionary with the following keys
|
||||
|
||||
.. attr:: name
|
||||
:type: string
|
||||
:required:
|
||||
|
||||
Identifier to refer this image from
|
||||
:attr:`providers.[aws].pools.labels` and
|
||||
:attr:`diskimages` sections.
|
||||
|
||||
.. attr:: pause
|
||||
:type: bool
|
||||
:default: False
|
||||
|
||||
When set to True, nodepool-builder will not upload the image
|
||||
to the provider.
|
||||
|
||||
.. attr:: username
|
||||
:type: str
|
||||
|
||||
The username that should be used when connecting to the node.
|
||||
|
||||
.. attr:: connection-type
|
||||
:type: string
|
||||
|
||||
The connection type that a consumer should use when connecting
|
||||
to the node. For most diskimages this is not
|
||||
necessary. However when creating Windows images this could be
|
||||
``winrm`` to enable access via ansible.
|
||||
|
||||
.. attr:: connection-port
|
||||
:type: int
|
||||
:default: 22 / 5986
|
||||
|
||||
The port that a consumer should use when connecting to the
|
||||
node. For most diskimages this is not necessary. This defaults
|
||||
to 22 for ssh and 5986 for winrm.
|
||||
|
||||
.. attr:: python-path
|
||||
:type: str
|
||||
:default: auto
|
||||
|
||||
The path of the default python interpreter. Used by Zuul to set
|
||||
``ansible_python_interpreter``. The special value ``auto`` will
|
||||
direct Zuul to use inbuilt Ansible logic to select the
|
||||
interpreter on Ansible >=2.8, and default to
|
||||
``/usr/bin/python2`` for earlier versions.
|
||||
|
||||
.. attr:: shell-type
|
||||
:type: str
|
||||
:default: sh
|
||||
|
||||
The shell type of the node's default shell executable. Used by Zuul
|
||||
to set ``ansible_shell_type``. This setting should only be used
|
||||
|
||||
- For a windows image with the experimental `connection-type` ``ssh``
|
||||
in which case ``cmd`` or ``powershell`` should be set
|
||||
and reflect the node's ``DefaultShell`` configuration.
|
||||
- If the default shell is not Bourne compatible (sh), but instead
|
||||
e.g. ``csh`` or ``fish``, and the user is aware that there is a
|
||||
long-standing issue with ``ansible_shell_type`` in combination
|
||||
with ``become``.
|
||||
|
||||
.. attr:: pools
|
||||
:type: list
|
||||
@ -238,7 +373,26 @@ section of the configuration.
|
||||
:type: bool
|
||||
:default: True
|
||||
|
||||
Specify if a public ip address shall be attached to nodes.
|
||||
Deprecated alias for :attr:`providers.[aws].pools.public-ipv4`.
|
||||
|
||||
.. attr:: public-ipv4
|
||||
:type: bool
|
||||
:default: True
|
||||
|
||||
Specify if a public IPv4 address shall be attached to nodes.
|
||||
|
||||
.. attr:: public-ipv6
|
||||
:type: bool
|
||||
:default: True
|
||||
|
||||
Specify if a public IPv6 address shall be attached to nodes.
|
||||
|
||||
.. attr:: use-internal-ip
|
||||
:type: bool
|
||||
:default: false
|
||||
|
||||
If a public IP is attached but Nodepool should prefer the
|
||||
private IP, set this to true.
|
||||
|
||||
.. attr:: host-key-checking
|
||||
:type: bool
|
||||
@ -269,9 +423,7 @@ section of the configuration.
|
||||
:type: str
|
||||
:required:
|
||||
|
||||
Identifier to refer this label.
|
||||
Nodepool will use this to set the name of the instance unless
|
||||
the name is specified as a tag.
|
||||
Identifier to refer to this label.
|
||||
|
||||
.. attr:: cloud-image
|
||||
:type: str
|
||||
@ -279,9 +431,20 @@ section of the configuration.
|
||||
|
||||
Refers to the name of an externally managed image in the
|
||||
cloud that already exists on the provider. The value of
|
||||
``cloud-image`` should match the ``name`` of a previously
|
||||
configured entry from the ``cloud-images`` section of the
|
||||
provider. See :attr:`providers.[aws].cloud-images`.
|
||||
``cloud-image`` should match the ``name`` of a
|
||||
previously configured entry from the ``cloud-images``
|
||||
section of the provider. See
|
||||
:attr:`providers.[aws].cloud-images`. Mutually
|
||||
exclusive with
|
||||
:attr:`providers.[aws].pools.labels.diskimage`
|
||||
|
||||
.. attr:: diskimage
|
||||
:type: str
|
||||
:required:
|
||||
|
||||
Refers to provider's diskimages, see
|
||||
:attr:`providers.[aws].diskimages`. Mutually exclusive
|
||||
with :attr:`providers.[aws].pools.labels.cloud-image`
|
||||
|
||||
.. attr:: ebs-optimized
|
||||
:type: bool
|
||||
@ -343,9 +506,11 @@ section of the configuration.
|
||||
:type: dict
|
||||
:default: None
|
||||
|
||||
A dictionary of tags to add to the EC2 instances
|
||||
A dictionary of tags to add to the EC2 instances.
|
||||
Values must be supplied as strings.
|
||||
|
||||
.. _`EBS volume type`: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSVolumeTypes.html
|
||||
.. _`AWS region`: https://docs.aws.amazon.com/general/latest/gr/rande.html
|
||||
.. _`Boto configuration`: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
|
||||
.. _`Boto describe images`: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.describe_images
|
||||
.. _`VM Import/Export service role`: https://docs.aws.amazon.com/vm-import/latest/userguide/vmie_prereqs.html#vmimport-role
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2018 Red Hat
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -14,14 +15,14 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from nodepool.driver import Driver
|
||||
from nodepool.driver.statemachine import StateMachineDriver
|
||||
from nodepool.driver.aws.config import AwsProviderConfig
|
||||
from nodepool.driver.aws.provider import AwsProvider
|
||||
from nodepool.driver.aws.adapter import AwsAdapter
|
||||
|
||||
|
||||
class AwsDriver(Driver):
|
||||
class AwsDriver(StateMachineDriver):
|
||||
def getProviderConfig(self, provider):
|
||||
return AwsProviderConfig(self, provider)
|
||||
|
||||
def getProvider(self, provider_config):
|
||||
return AwsProvider(provider_config)
|
||||
def getAdapter(self, provider_config):
|
||||
return AwsAdapter(provider_config)
|
||||
|
637
nodepool/driver/aws/adapter.py
Normal file
637
nodepool/driver/aws/adapter.py
Normal file
@ -0,0 +1,637 @@
|
||||
# Copyright 2018 Red Hat
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import cachetools.func
|
||||
import urllib.parse
|
||||
import time
|
||||
import re
|
||||
|
||||
import boto3
|
||||
|
||||
from nodepool.driver.utils import QuotaInformation, RateLimiter
|
||||
from nodepool.driver import statemachine
|
||||
|
||||
|
||||
def tag_dict_to_list(tagdict):
|
||||
# TODO: validate tag values are strings in config and deprecate
|
||||
# non-string values.
|
||||
return [{"Key": k, "Value": str(v)} for k, v in tagdict.items()]
|
||||
|
||||
|
||||
def tag_list_to_dict(taglist):
|
||||
if taglist is None:
|
||||
return {}
|
||||
return {t["Key"]: t["Value"] for t in taglist}
|
||||
|
||||
|
||||
class AwsInstance(statemachine.Instance):
|
||||
def __init__(self, instance, quota):
|
||||
super().__init__()
|
||||
self.external_id = instance.id
|
||||
self.metadata = tag_list_to_dict(instance.tags)
|
||||
self.private_ipv4 = instance.private_ip_address
|
||||
self.private_ipv6 = None
|
||||
self.public_ipv4 = instance.public_ip_address
|
||||
self.public_ipv6 = None
|
||||
self.az = ''
|
||||
self.quota = quota
|
||||
|
||||
for iface in instance.network_interfaces[:1]:
|
||||
if iface.ipv6_addresses:
|
||||
v6addr = iface.ipv6_addresses[0]
|
||||
self.public_ipv6 = v6addr['Ipv6Address']
|
||||
self.interface_ip = (self.public_ipv4 or self.public_ipv6 or
|
||||
self.private_ipv4 or self.private_ipv6)
|
||||
|
||||
def getQuotaInformation(self):
|
||||
return self.quota
|
||||
|
||||
|
||||
class AwsResource(statemachine.Resource):
|
||||
def __init__(self, metadata, type, id):
|
||||
super().__init__(metadata)
|
||||
self.type = type
|
||||
self.id = id
|
||||
|
||||
|
||||
class AwsDeleteStateMachine(statemachine.StateMachine):
|
||||
VM_DELETING = 'deleting vm'
|
||||
NIC_DELETING = 'deleting nic'
|
||||
PIP_DELETING = 'deleting pip'
|
||||
DISK_DELETING = 'deleting disk'
|
||||
COMPLETE = 'complete'
|
||||
|
||||
def __init__(self, adapter, external_id):
|
||||
super().__init__()
|
||||
self.adapter = adapter
|
||||
self.external_id = external_id
|
||||
|
||||
def advance(self):
|
||||
if self.state == self.START:
|
||||
self.instance = self.adapter._deleteInstance(
|
||||
self.external_id)
|
||||
self.state = self.VM_DELETING
|
||||
|
||||
if self.state == self.VM_DELETING:
|
||||
self.instance = self.adapter._refreshDelete(self.instance)
|
||||
if self.instance is None:
|
||||
self.state = self.COMPLETE
|
||||
|
||||
if self.state == self.COMPLETE:
|
||||
self.complete = True
|
||||
|
||||
|
||||
class AwsCreateStateMachine(statemachine.StateMachine):
|
||||
INSTANCE_CREATING = 'creating instance'
|
||||
INSTANCE_RETRY = 'retrying instance creation'
|
||||
COMPLETE = 'complete'
|
||||
|
||||
def __init__(self, adapter, hostname, label, image_external_id,
|
||||
metadata, retries):
|
||||
super().__init__()
|
||||
self.adapter = adapter
|
||||
self.retries = retries
|
||||
self.attempts = 0
|
||||
self.image_external_id = image_external_id
|
||||
self.metadata = metadata
|
||||
self.tags = label.tags.copy() or {}
|
||||
self.tags.update(metadata)
|
||||
self.tags['Name'] = hostname
|
||||
self.hostname = hostname
|
||||
self.label = label
|
||||
self.public_ipv4 = None
|
||||
self.public_ipv6 = None
|
||||
self.nic = None
|
||||
self.instance = None
|
||||
|
||||
def advance(self):
|
||||
if self.state == self.START:
|
||||
self.external_id = self.hostname
|
||||
|
||||
self.instance = self.adapter._createInstance(
|
||||
self.label, self.image_external_id,
|
||||
self.tags, self.hostname)
|
||||
self.state = self.INSTANCE_CREATING
|
||||
|
||||
if self.state == self.INSTANCE_CREATING:
|
||||
self.quota = self.adapter._getQuotaForInstanceType(
|
||||
self.instance.instance_type)
|
||||
self.instance = self.adapter._refresh(self.instance)
|
||||
|
||||
if self.instance.state["Name"].lower() == "running":
|
||||
self.state = self.COMPLETE
|
||||
elif self.instance.state["Name"].lower() == "terminated":
|
||||
if self.attempts >= self.retries:
|
||||
raise Exception("Too many retries")
|
||||
self.attempts += 1
|
||||
self.instance = self.adapter._deleteInstance(
|
||||
self.external_id)
|
||||
self.state = self.INSTANCE_RETRY
|
||||
else:
|
||||
return
|
||||
|
||||
if self.state == self.INSTANCE_RETRY:
|
||||
self.instance = self.adapter._refreshDelete(self.instance)
|
||||
if self.instance is None:
|
||||
self.state = self.START
|
||||
return
|
||||
|
||||
if self.state == self.COMPLETE:
|
||||
self.complete = True
|
||||
return AwsInstance(self.instance, self.quota)
|
||||
|
||||
|
||||
class AwsAdapter(statemachine.Adapter):
|
||||
log = logging.getLogger("nodepool.driver.aws.AwsAdapter")
|
||||
|
||||
def __init__(self, provider_config):
|
||||
self.provider = provider_config
|
||||
# The standard rate limit, this might be 1 request per second
|
||||
self.rate_limiter = RateLimiter(self.provider.name,
|
||||
self.provider.rate)
|
||||
# Non mutating requests can be made more often at 10x the rate
|
||||
# of mutating requests by default.
|
||||
self.non_mutating_rate_limiter = RateLimiter(self.provider.name,
|
||||
self.provider.rate * 10.0)
|
||||
self.image_id_by_filter_cache = cachetools.TTLCache(
|
||||
maxsize=8192, ttl=(5 * 60))
|
||||
self.aws = boto3.Session(
|
||||
region_name=self.provider.region_name,
|
||||
profile_name=self.provider.profile_name)
|
||||
self.ec2 = self.aws.resource('ec2')
|
||||
self.ec2_client = self.aws.client("ec2")
|
||||
self.s3 = self.aws.resource('s3')
|
||||
self.s3_client = self.aws.client('s3')
|
||||
self.aws_quotas = self.aws.client("service-quotas")
|
||||
# In listResources, we reconcile AMIs which appear to be
|
||||
# imports but have no nodepool tags, however it's possible
|
||||
# that these aren't nodepool images. If we determine that's
|
||||
# the case, we'll add their ids here so we don't waste our
|
||||
# time on that again.
|
||||
self.not_our_images = set()
|
||||
self.not_our_snapshots = set()
|
||||
|
||||
def getCreateStateMachine(self, hostname, label,
|
||||
image_external_id, metadata, retries):
|
||||
return AwsCreateStateMachine(self, hostname, label,
|
||||
image_external_id, metadata, retries)
|
||||
|
||||
def getDeleteStateMachine(self, external_id):
|
||||
return AwsDeleteStateMachine(self, external_id)
|
||||
|
||||
def listResources(self):
|
||||
self._tagAmis()
|
||||
self._tagSnapshots()
|
||||
for instance in self._listInstances():
|
||||
if instance.state["Name"].lower() == "terminated":
|
||||
continue
|
||||
yield AwsResource(tag_list_to_dict(instance.tags),
|
||||
'instance', instance.id)
|
||||
for volume in self._listVolumes():
|
||||
if volume.state.lower() == "deleted":
|
||||
continue
|
||||
yield AwsResource(tag_list_to_dict(volume.tags),
|
||||
'volume', volume.id)
|
||||
for ami in self._listAmis():
|
||||
if ami.state.lower() == "deleted":
|
||||
continue
|
||||
yield AwsResource(tag_list_to_dict(ami.tags),
|
||||
'ami', ami.id)
|
||||
for snap in self._listSnapshots():
|
||||
if snap.state.lower() == "deleted":
|
||||
continue
|
||||
yield AwsResource(tag_list_to_dict(snap.tags),
|
||||
'snapshot', snap.id)
|
||||
if self.provider.object_storage:
|
||||
for obj in self._listObjects():
|
||||
with self.non_mutating_rate_limiter:
|
||||
tags = self.s3_client.get_object_tagging(
|
||||
Bucket=obj.bucket_name, Key=obj.key)
|
||||
yield AwsResource(tag_list_to_dict(tags['TagSet']),
|
||||
'object', obj.key)
|
||||
|
||||
def deleteResource(self, resource):
|
||||
self.log.info(f"Deleting leaked {resource.type}: {resource.id}")
|
||||
if resource.type == 'instance':
|
||||
self._deleteInstance(resource.id)
|
||||
if resource.type == 'volume':
|
||||
self._deleteVolume(resource.id)
|
||||
if resource.type == 'ami':
|
||||
self._deleteAmi(resource.id)
|
||||
if resource.type == 'snapshot':
|
||||
self._deleteSnapshot(resource.id)
|
||||
if resource.type == 'object':
|
||||
self._deleteObject(resource.id)
|
||||
|
||||
def listInstances(self):
|
||||
for instance in self._listInstances():
|
||||
if instance.state["Name"].lower() == "terminated":
|
||||
continue
|
||||
quota = self._getQuotaForInstanceType(instance.instance_type)
|
||||
yield AwsInstance(instance, quota)
|
||||
|
||||
def getQuotaLimits(self):
|
||||
with self.non_mutating_rate_limiter:
|
||||
response = self.aws_quotas.get_service_quota(
|
||||
ServiceCode='ec2',
|
||||
QuotaCode='L-1216C47A'
|
||||
)
|
||||
cores = response['Quota']['Value']
|
||||
return QuotaInformation(cores=cores,
|
||||
default=math.inf)
|
||||
|
||||
def getQuotaForLabel(self, label):
|
||||
return self._getQuotaForInstanceType(label.instance_type)
|
||||
|
||||
def uploadImage(self, provider_image, image_name, filename,
|
||||
image_format, metadata, md5, sha256):
|
||||
self.log.debug(f"Uploading image {image_name}")
|
||||
|
||||
# Upload image to S3
|
||||
bucket_name = self.provider.object_storage['bucket-name']
|
||||
bucket = self.s3.Bucket(bucket_name)
|
||||
object_filename = f'{image_name}.{image_format}'
|
||||
extra_args = {'Tagging': urllib.parse.urlencode(metadata)}
|
||||
with open(filename, "rb") as fobj:
|
||||
with self.rate_limiter:
|
||||
bucket.upload_fileobj(fobj, object_filename,
|
||||
ExtraArgs=extra_args)
|
||||
|
||||
# Import image as AMI
|
||||
self.log.debug(f"Importing {image_name}")
|
||||
import_image_task = self.ec2_client.import_image(
|
||||
Architecture=provider_image.architecture,
|
||||
DiskContainers=[
|
||||
{
|
||||
'Format': image_format,
|
||||
'UserBucket': {
|
||||
'S3Bucket': bucket_name,
|
||||
'S3Key': object_filename,
|
||||
}
|
||||
},
|
||||
],
|
||||
TagSpecifications=[
|
||||
{
|
||||
'ResourceType': 'import-image-task',
|
||||
'Tags': tag_dict_to_list(metadata),
|
||||
},
|
||||
]
|
||||
)
|
||||
task_id = import_image_task['ImportTaskId']
|
||||
|
||||
paginator = self.ec2_client.get_paginator(
|
||||
'describe_import_image_tasks')
|
||||
done = False
|
||||
while not done:
|
||||
time.sleep(30)
|
||||
with self.non_mutating_rate_limiter:
|
||||
for page in paginator.paginate(ImportTaskIds=[task_id]):
|
||||
for task in page['ImportImageTasks']:
|
||||
if task['Status'].lower() in ('completed', 'deleted'):
|
||||
done = True
|
||||
break
|
||||
|
||||
self.log.debug(f"Deleting {image_name} from S3")
|
||||
with self.rate_limiter:
|
||||
self.s3.Object(bucket_name, object_filename).delete()
|
||||
|
||||
if task['Status'].lower() != 'completed':
|
||||
raise Exception(f"Error uploading image: {task}")
|
||||
|
||||
# Tag the AMI
|
||||
try:
|
||||
with self.non_mutating_rate_limiter:
|
||||
ami = self.ec2.Image(task['ImageId'])
|
||||
with self.rate_limiter:
|
||||
ami.create_tags(Tags=task['Tags'])
|
||||
except Exception:
|
||||
self.log.exception("Error tagging AMI:")
|
||||
|
||||
# Tag the snapshot
|
||||
try:
|
||||
with self.non_mutating_rate_limiter:
|
||||
snap = self.ec2.Snapshot(
|
||||
task['SnapshotDetails'][0]['SnapshotId'])
|
||||
with self.rate_limiter:
|
||||
snap.create_tags(Tags=task['Tags'])
|
||||
except Exception:
|
||||
self.log.exception("Error tagging snapshot:")
|
||||
|
||||
self.log.debug(f"Upload of {image_name} complete as {task['ImageId']}")
|
||||
# Last task returned from paginator above
|
||||
return task['ImageId']
|
||||
|
||||
def deleteImage(self, external_id):
|
||||
self.log.debug(f"Deleting image {external_id}")
|
||||
|
||||
# Local implementation below
|
||||
|
||||
def _tagAmis(self):
|
||||
# There is no way to tag imported AMIs, so this routine
|
||||
# "eventually" tags them. We look for any AMIs without tags
|
||||
# which correspond to import tasks, and we copy the tags from
|
||||
# those import tasks to the AMI.
|
||||
for ami in self._listAmis():
|
||||
if (ami.name.startswith('import-ami-') and
|
||||
not ami.tags and
|
||||
ami.id not in self.not_our_images):
|
||||
# This image was imported but has no tags, which means
|
||||
# it's either not a nodepool image, or it's a new one
|
||||
# which doesn't have tags yet. Copy over any tags
|
||||
# from the import task; otherwise, mark it as an image
|
||||
# we can ignore in future runs.
|
||||
task = self._getImportImageTask(ami.name)
|
||||
tags = tag_list_to_dict(task.get('Tags'))
|
||||
if (tags.get('nodepool_provider_name') == self.provider.name):
|
||||
# Copy over tags
|
||||
self.log.debug(
|
||||
f"Copying tags from import task {ami.name} to AMI")
|
||||
with self.rate_limiter:
|
||||
ami.create_tags(Tags=task['Tags'])
|
||||
else:
|
||||
self.not_our_images.add(ami.id)
|
||||
|
||||
def _tagSnapshots(self):
|
||||
# See comments for _tagAmis
|
||||
for snap in self._listSnapshots():
|
||||
if ('import-ami-' in snap.description and
|
||||
not snap.tags and
|
||||
snap.id not in self.not_our_snapshots):
|
||||
|
||||
match = re.match(r'.*?(import-ami-\w*)', snap.description)
|
||||
if not match:
|
||||
self.not_our_snapshots.add(snap.id)
|
||||
continue
|
||||
task_id = match.group(1)
|
||||
task = self._getImportImageTask(task_id)
|
||||
tags = tag_list_to_dict(task.get('Tags'))
|
||||
if (tags.get('nodepool_provider_name') == self.provider.name):
|
||||
# Copy over tags
|
||||
self.log.debug(
|
||||
f"Copying tags from import task {task_id} to snapshot")
|
||||
with self.rate_limiter:
|
||||
snap.create_tags(Tags=task['Tags'])
|
||||
else:
|
||||
self.not_our_snapshots.add(snap.id)
|
||||
|
||||
def _getImportImageTask(self, task_id):
|
||||
paginator = self.ec2_client.get_paginator(
|
||||
'describe_import_image_tasks')
|
||||
with self.non_mutating_rate_limiter:
|
||||
for page in paginator.paginate(ImportTaskIds=[task_id]):
|
||||
for task in page['ImportImageTasks']:
|
||||
# Return the first and only task
|
||||
return task
|
||||
|
||||
def _getQuotaForInstanceType(self, instance_type):
|
||||
itype = self._getInstanceType(instance_type)
|
||||
cores = itype['InstanceTypes'][0]['VCpuInfo']['DefaultCores']
|
||||
ram = itype['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']
|
||||
return QuotaInformation(cores=cores,
|
||||
ram=ram,
|
||||
instances=1)
|
||||
|
||||
@cachetools.func.lru_cache(maxsize=None)
|
||||
def _getInstanceType(self, instance_type):
|
||||
with self.non_mutating_rate_limiter:
|
||||
self.log.debug(
|
||||
f"Getting information for instance type {instance_type}")
|
||||
return self.ec2_client.describe_instance_types(
|
||||
InstanceTypes=[instance_type])
|
||||
|
||||
def _refresh(self, obj):
|
||||
for instance in self._listInstances():
|
||||
if instance.id == obj.id:
|
||||
return instance
|
||||
|
||||
def _refreshDelete(self, obj):
|
||||
if obj is None:
|
||||
return obj
|
||||
|
||||
for instance in self._listInstances():
|
||||
if instance.id == obj.id:
|
||||
if instance.state["Name"].lower() == "terminated":
|
||||
return None
|
||||
return instance
|
||||
return None
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=10)
|
||||
def _listInstances(self):
|
||||
with self.non_mutating_rate_limiter:
|
||||
return self.ec2.instances.all()
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=10)
|
||||
def _listVolumes(self):
|
||||
with self.non_mutating_rate_limiter:
|
||||
return self.ec2.volumes.all()
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=10)
|
||||
def _listAmis(self):
|
||||
with self.non_mutating_rate_limiter:
|
||||
return self.ec2.images.filter(Owners=['self'])
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=10)
|
||||
def _listSnapshots(self):
|
||||
with self.non_mutating_rate_limiter:
|
||||
return self.ec2.snapshots.filter(OwnerIds=['self'])
|
||||
|
||||
@cachetools.func.ttl_cache(maxsize=1, ttl=10)
|
||||
def _listObjects(self):
|
||||
bucket_name = self.provider.object_storage.get('bucket-name')
|
||||
if not bucket_name:
|
||||
return []
|
||||
|
||||
bucket = self.s3.Bucket(bucket_name)
|
||||
with self.non_mutating_rate_limiter:
|
||||
return bucket.objects.all()
|
||||
|
||||
def _getLatestImageIdByFilters(self, image_filters):
|
||||
# Normally we would decorate this method, but our cache key is
|
||||
# complex, so we serialize it to JSON and manage the cache
|
||||
# ourselves.
|
||||
cache_key = json.dumps(image_filters)
|
||||
val = self.image_id_by_filter_cache.get(cache_key)
|
||||
if val:
|
||||
return val
|
||||
|
||||
with self.non_mutating_rate_limiter:
|
||||
res = self.ec2_client.describe_images(
|
||||
Filters=image_filters
|
||||
).get("Images")
|
||||
|
||||
images = sorted(
|
||||
res,
|
||||
key=lambda k: k["CreationDate"],
|
||||
reverse=True
|
||||
)
|
||||
|
||||
if not images:
|
||||
raise Exception(
|
||||
"No cloud-image (AMI) matches supplied image filters")
|
||||
else:
|
||||
val = images[0].get("ImageId")
|
||||
self.image_id_by_filter_cache[cache_key] = val
|
||||
return val
|
||||
|
||||
def _getImageId(self, cloud_image):
|
||||
image_id = cloud_image.image_id
|
||||
image_filters = cloud_image.image_filters
|
||||
|
||||
if image_filters is not None:
|
||||
return self._getLatestImageIdByFilters(image_filters)
|
||||
|
||||
return image_id
|
||||
|
||||
@cachetools.func.lru_cache(maxsize=None)
|
||||
def _getImage(self, image_id):
|
||||
with self.non_mutating_rate_limiter:
|
||||
return self.ec2.Image(image_id)
|
||||
|
||||
def _createInstance(self, label, image_external_id,
|
||||
tags, hostname):
|
||||
if image_external_id:
|
||||
image_id = image_external_id
|
||||
else:
|
||||
image_id = self._getImageId(label.cloud_image)
|
||||
|
||||
args = dict(
|
||||
ImageId=image_id,
|
||||
MinCount=1,
|
||||
MaxCount=1,
|
||||
KeyName=label.key_name,
|
||||
EbsOptimized=label.ebs_optimized,
|
||||
InstanceType=label.instance_type,
|
||||
NetworkInterfaces=[{
|
||||
'AssociatePublicIpAddress': label.pool.public_ipv4,
|
||||
'DeviceIndex': 0}],
|
||||
TagSpecifications=[
|
||||
{
|
||||
'ResourceType': 'instance',
|
||||
'Tags': tag_dict_to_list(tags),
|
||||
},
|
||||
{
|
||||
'ResourceType': 'volume',
|
||||
'Tags': tag_dict_to_list(tags),
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
if label.pool.security_group_id:
|
||||
args['NetworkInterfaces'][0]['Groups'] = [
|
||||
label.pool.security_group_id
|
||||
]
|
||||
if label.pool.subnet_id:
|
||||
args['NetworkInterfaces'][0]['SubnetId'] = label.pool.subnet_id
|
||||
|
||||
if label.pool.public_ipv6:
|
||||
args['NetworkInterfaces'][0]['Ipv6AddressCount'] = 1
|
||||
|
||||
if label.userdata:
|
||||
args['UserData'] = label.userdata
|
||||
|
||||
if label.iam_instance_profile:
|
||||
if 'name' in label.iam_instance_profile:
|
||||
args['IamInstanceProfile'] = {
|
||||
'Name': label.iam_instance_profile['name']
|
||||
}
|
||||
elif 'arn' in label.iam_instance_profile:
|
||||
args['IamInstanceProfile'] = {
|
||||
'Arn': label.iam_instance_profile['arn']
|
||||
}
|
||||
|
||||
# Default block device mapping parameters are embedded in AMIs.
|
||||
# We might need to supply our own mapping before lauching the instance.
|
||||
# We basically want to make sure DeleteOnTermination is true and be
|
||||
# able to set the volume type and size.
|
||||
image = self._getImage(image_id)
|
||||
# TODO: Flavors can also influence whether or not the VM spawns with a
|
||||
# volume -- we basically need to ensure DeleteOnTermination is true.
|
||||
# However, leaked volume detection may mitigate this.
|
||||
if hasattr(image, 'block_device_mappings'):
|
||||
bdm = image.block_device_mappings
|
||||
mapping = bdm[0]
|
||||
if 'Ebs' in mapping:
|
||||
mapping['Ebs']['DeleteOnTermination'] = True
|
||||
if label.volume_size:
|
||||
mapping['Ebs']['VolumeSize'] = label.volume_size
|
||||
if label.volume_type:
|
||||
mapping['Ebs']['VolumeType'] = label.volume_type
|
||||
# If the AMI is a snapshot, we cannot supply an "encrypted"
|
||||
# parameter
|
||||
if 'Encrypted' in mapping['Ebs']:
|
||||
del mapping['Ebs']['Encrypted']
|
||||
args['BlockDeviceMappings'] = [mapping]
|
||||
|
||||
with self.rate_limiter:
|
||||
self.log.debug(f"Creating VM {hostname}")
|
||||
instances = self.ec2.create_instances(**args)
|
||||
return self.ec2.Instance(instances[0].id)
|
||||
|
||||
def _deleteInstance(self, external_id):
|
||||
for instance in self._listInstances():
|
||||
if instance.id == external_id:
|
||||
break
|
||||
else:
|
||||
self.log.warning(f"Instance not found when deleting {external_id}")
|
||||
return None
|
||||
with self.rate_limiter:
|
||||
self.log.debug(f"Deleting instance {external_id}")
|
||||
instance.terminate()
|
||||
return instance
|
||||
|
||||
def _deleteVolume(self, external_id):
|
||||
for volume in self._listVolumes():
|
||||
if volume.id == external_id:
|
||||
break
|
||||
else:
|
||||
self.log.warning(f"Volume not found when deleting {external_id}")
|
||||
return None
|
||||
with self.rate_limiter:
|
||||
self.log.debug(f"Deleting volume {external_id}")
|
||||
volume.delete()
|
||||
return volume
|
||||
|
||||
def _deleteAmi(self, external_id):
|
||||
for ami in self._listAmis():
|
||||
if ami.id == external_id:
|
||||
break
|
||||
else:
|
||||
self.log.warning(f"AMI not found when deleting {external_id}")
|
||||
return None
|
||||
with self.rate_limiter:
|
||||
self.log.debug(f"Deleting AMI {external_id}")
|
||||
ami.deregister()
|
||||
return ami
|
||||
|
||||
def _deleteSnapshot(self, external_id):
|
||||
for snap in self._listSnapshots():
|
||||
if snap.id == external_id:
|
||||
break
|
||||
else:
|
||||
self.log.warning(f"Snapshot not found when deleting {external_id}")
|
||||
return None
|
||||
with self.rate_limiter:
|
||||
self.log.debug(f"Deleting Snapshot {external_id}")
|
||||
snap.delete()
|
||||
return snap
|
||||
|
||||
def _deleteObject(self, external_id):
|
||||
bucket_name = self.provider.object_storage.get('bucket-name')
|
||||
with self.rate_limiter:
|
||||
self.log.debug(f"Deleting object {external_id}")
|
||||
self.s3.Object(bucket_name, external_id).delete()
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2018 Red Hat
|
||||
# Copyright 2022 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -21,180 +22,149 @@ from nodepool.driver import ConfigValue
|
||||
from nodepool.driver import ProviderConfig
|
||||
|
||||
|
||||
class ProviderCloudImage(ConfigValue):
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.image_id = None
|
||||
self.username = None
|
||||
self.connection_type = None
|
||||
self.connection_port = None
|
||||
|
||||
def __repr__(self):
|
||||
return "<ProviderCloudImage %s>" % self.name
|
||||
|
||||
@property
|
||||
def external_name(self):
|
||||
'''Human readable version of external.'''
|
||||
return self.image_id or self.name
|
||||
|
||||
|
||||
class ProviderLabel(ConfigValue):
|
||||
ignore_equality = ['pool']
|
||||
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.cloud_image = None
|
||||
self.ebs_optimized = None
|
||||
self.instance_type = None
|
||||
self.key_name = None
|
||||
self.volume_size = None
|
||||
self.volume_type = None
|
||||
self.userdata = None
|
||||
self.iam_instance_profile = None
|
||||
# The ProviderPool object that owns this label.
|
||||
self.pool = None
|
||||
self.tags = None
|
||||
|
||||
def __repr__(self):
|
||||
return "<ProviderLabel %s>" % self.name
|
||||
|
||||
|
||||
class ProviderPool(ConfigPool):
|
||||
ignore_equality = ['provider']
|
||||
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.max_cores = None
|
||||
self.max_ram = None
|
||||
self.subnet_id = None
|
||||
self.security_group_id = None
|
||||
self.public_ip = True
|
||||
self.host_key_checking = True
|
||||
self.labels = None
|
||||
# The ProviderConfig object that owns this pool.
|
||||
self.provider = None
|
||||
|
||||
# Initialize base class attributes
|
||||
super().__init__()
|
||||
|
||||
def load(self, pool_config, full_config, provider):
|
||||
super().load(pool_config)
|
||||
self.name = pool_config['name']
|
||||
self.provider = provider
|
||||
|
||||
self.security_group_id = pool_config.get('security-group-id')
|
||||
self.subnet_id = pool_config.get('subnet-id')
|
||||
self.host_key_checking = bool(
|
||||
pool_config.get('host-key-checking', True))
|
||||
self.public_ip = bool(pool_config.get('public-ip-address', True))
|
||||
|
||||
for label in pool_config.get('labels', []):
|
||||
pl = ProviderLabel()
|
||||
pl.name = label['name']
|
||||
pl.pool = self
|
||||
self.labels[pl.name] = pl
|
||||
cloud_image_name = label.get('cloud-image', None)
|
||||
if cloud_image_name:
|
||||
cloud_image = self.provider.cloud_images.get(
|
||||
cloud_image_name, None)
|
||||
if not cloud_image:
|
||||
raise ValueError(
|
||||
"cloud-image %s does not exist in provider %s"
|
||||
" but is referenced in label %s" %
|
||||
(cloud_image_name, self.name, pl.name))
|
||||
else:
|
||||
cloud_image = None
|
||||
pl.cloud_image = cloud_image
|
||||
pl.ebs_optimized = bool(label.get('ebs-optimized', False))
|
||||
pl.instance_type = label['instance-type']
|
||||
pl.key_name = label['key-name']
|
||||
pl.volume_type = label.get('volume-type')
|
||||
pl.volume_size = label.get('volume-size')
|
||||
pl.userdata = label.get('userdata', None)
|
||||
pl.iam_instance_profile = label.get('iam-instance-profile', None)
|
||||
pl.tags = [
|
||||
{
|
||||
"Key": k,
|
||||
"Value": str(v)
|
||||
} for k, v in label.get('tags', {}).items()
|
||||
]
|
||||
full_config.labels[label['name']].pools.append(self)
|
||||
|
||||
def __repr__(self):
|
||||
return "<ProviderPool %s>" % self.name
|
||||
|
||||
|
||||
class AwsProviderConfig(ProviderConfig):
|
||||
def __init__(self, driver, provider):
|
||||
self.driver_object = driver
|
||||
self.__pools = {}
|
||||
self.profile_name = None
|
||||
self.region_name = None
|
||||
self.boot_timeout = None
|
||||
self.launch_retries = None
|
||||
self.cloud_images = {}
|
||||
super().__init__(provider)
|
||||
|
||||
@property
|
||||
def pools(self):
|
||||
return self.__pools
|
||||
|
||||
@property
|
||||
def manage_images(self):
|
||||
# Currently we have no image management for AWS. This should
|
||||
# be updated if that changes.
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def reset():
|
||||
pass
|
||||
|
||||
def load(self, config):
|
||||
self.profile_name = self.provider.get('profile-name')
|
||||
self.region_name = self.provider.get('region-name')
|
||||
self.boot_timeout = self.provider.get('boot-timeout', 60)
|
||||
self.launch_retries = self.provider.get('launch-retries', 3)
|
||||
|
||||
class AwsProviderCloudImage(ConfigValue):
|
||||
def __init__(self, image):
|
||||
default_port_mapping = {
|
||||
'ssh': 22,
|
||||
'winrm': 5986,
|
||||
}
|
||||
# TODO: diskimages
|
||||
self.name = image['name']
|
||||
self.username = image['username']
|
||||
self.image_id = image.get('image-id')
|
||||
self.python_path = image.get('python-path')
|
||||
self.shell_type = image.get('shell-type')
|
||||
self.connection_type = image.get('connection-type', 'ssh')
|
||||
self.connection_port = image.get(
|
||||
'connection-port',
|
||||
default_port_mapping.get(self.connection_type, 22))
|
||||
|
||||
for image in self.provider.get('cloud-images', []):
|
||||
i = ProviderCloudImage()
|
||||
i.name = image['name']
|
||||
i.image_id = image.get('image-id', None)
|
||||
image_filters = image.get("image-filters", None)
|
||||
if image_filters is not None:
|
||||
# ensure 'name' and 'values' keys are capitalized for boto
|
||||
def capitalize_keys(image_filter):
|
||||
return {
|
||||
k.capitalize(): v for (k, v) in image_filter.items()
|
||||
}
|
||||
|
||||
image_filters = image.get("image-filters", None)
|
||||
if image_filters is not None:
|
||||
# ensure 'name' and 'values' keys are capitalized for boto
|
||||
def capitalize_keys(image_filter):
|
||||
return {
|
||||
k.capitalize(): v for (k, v) in image_filter.items()
|
||||
}
|
||||
image_filters = [capitalize_keys(f) for f in image_filters]
|
||||
self.image_filters = image_filters
|
||||
|
||||
image_filters = [capitalize_keys(f) for f in image_filters]
|
||||
i.image_filters = image_filters
|
||||
@property
|
||||
def external_name(self):
|
||||
'''Human readable version of external.'''
|
||||
return (self.image_id or self.name)
|
||||
|
||||
i.username = image.get('username', None)
|
||||
i.python_path = image.get('python-path', 'auto')
|
||||
i.shell_type = image.get('shell-type', None)
|
||||
i.connection_type = image.get('connection-type', 'ssh')
|
||||
i.connection_port = image.get(
|
||||
'connection-port',
|
||||
default_port_mapping.get(i.connection_type, 22))
|
||||
self.cloud_images[i.name] = i
|
||||
@staticmethod
|
||||
def getSchema():
|
||||
image_filters = {
|
||||
v.Any('Name', 'name'): str,
|
||||
v.Any('Values', 'values'): [str]
|
||||
}
|
||||
|
||||
for pool in self.provider.get('pools', []):
|
||||
pp = ProviderPool()
|
||||
pp.load(pool, config, self)
|
||||
self.pools[pp.name] = pp
|
||||
|
||||
def getSchema(self):
|
||||
pool_label = {
|
||||
return v.All({
|
||||
v.Required('name'): str,
|
||||
v.Exclusive('cloud-image', 'label-image'): str,
|
||||
v.Required('username'): str,
|
||||
v.Exclusive('image-id', 'spec'): str,
|
||||
v.Exclusive('image-filters', 'spec'): [image_filters],
|
||||
'connection-type': str,
|
||||
'connection-port': int,
|
||||
'python-path': str,
|
||||
'shell-type': str,
|
||||
}, {
|
||||
v.Required(
|
||||
v.Any('image-id', 'image-filters'),
|
||||
msg=('Provide either '
|
||||
'"image-filters", or "image-id" keys')
|
||||
): object,
|
||||
object: object,
|
||||
})
|
||||
|
||||
|
||||
class AwsProviderDiskImage(ConfigValue):
|
||||
def __init__(self, image_type, image, diskimage):
|
||||
default_port_mapping = {
|
||||
'ssh': 22,
|
||||
'winrm': 5986,
|
||||
}
|
||||
self.name = image['name']
|
||||
diskimage.image_types.add(image_type)
|
||||
self.pause = bool(image.get('pause', False))
|
||||
self.python_path = image.get('python-path')
|
||||
self.shell_type = image.get('shell-type')
|
||||
self.username = image.get('username')
|
||||
self.connection_type = image.get('connection-type', 'ssh')
|
||||
self.connection_port = image.get(
|
||||
'connection-port',
|
||||
default_port_mapping.get(self.connection_type, 22))
|
||||
self.meta = {}
|
||||
self.architecture = image.get('architecture', 'x86_64')
|
||||
|
||||
@property
|
||||
def external_name(self):
|
||||
'''Human readable version of external.'''
|
||||
return self.name
|
||||
|
||||
@staticmethod
|
||||
def getSchema():
|
||||
return {
|
||||
v.Required('name'): str,
|
||||
'username': str,
|
||||
'pause': bool,
|
||||
'connection-type': str,
|
||||
'connection-port': int,
|
||||
'python-path': str,
|
||||
'shell-type': str,
|
||||
}
|
||||
|
||||
|
||||
class AwsLabel(ConfigValue):
|
||||
ignore_equality = ['pool']
|
||||
|
||||
def __init__(self, label, provider_config, provider_pool):
|
||||
self.name = label['name']
|
||||
self.pool = provider_pool
|
||||
|
||||
cloud_image_name = label.get('cloud-image', None)
|
||||
if cloud_image_name:
|
||||
cloud_image = provider_config.cloud_images.get(
|
||||
cloud_image_name, None)
|
||||
if not cloud_image:
|
||||
raise ValueError(
|
||||
"cloud-image %s does not exist in provider %s"
|
||||
" but is referenced in label %s" %
|
||||
(cloud_image_name, provider_config.name, self.name))
|
||||
self.cloud_image = cloud_image
|
||||
else:
|
||||
self.cloud_image = None
|
||||
|
||||
diskimage_name = label.get('diskimage')
|
||||
if diskimage_name:
|
||||
diskimage = provider_config.diskimages.get(
|
||||
diskimage_name, None)
|
||||
if not diskimage:
|
||||
raise ValueError(
|
||||
"diskimage %s does not exist in provider %s"
|
||||
" but is referenced in label %s" %
|
||||
(diskimage_name, provider_config.name, self.name))
|
||||
self.diskimage = diskimage
|
||||
else:
|
||||
self.diskimage = None
|
||||
|
||||
self.ebs_optimized = bool(label.get('ebs-optimized', False))
|
||||
self.instance_type = label['instance-type']
|
||||
self.key_name = label.get('key-name')
|
||||
self.volume_type = label.get('volume-type')
|
||||
self.volume_size = label.get('volume-size')
|
||||
self.userdata = label.get('userdata', None)
|
||||
self.iam_instance_profile = label.get('iam-instance-profile', None)
|
||||
self.tags = label.get('tags', {})
|
||||
|
||||
@staticmethod
|
||||
def getSchema():
|
||||
return {
|
||||
v.Required('name'): str,
|
||||
v.Exclusive('cloud-image', 'image'): str,
|
||||
v.Exclusive('diskimage', 'image'): str,
|
||||
v.Required('instance-type'): str,
|
||||
v.Required('key-name'): str,
|
||||
'ebs-optimized': bool,
|
||||
@ -208,41 +178,133 @@ class AwsProviderConfig(ProviderConfig):
|
||||
'tags': dict,
|
||||
}
|
||||
|
||||
|
||||
class AwsPool(ConfigPool):
|
||||
ignore_equality = ['provider']
|
||||
|
||||
def __init__(self, provider_config, pool_config):
|
||||
super().__init__()
|
||||
self.provider = provider_config
|
||||
self.load(pool_config)
|
||||
|
||||
def load(self, pool_config):
|
||||
super().load(pool_config)
|
||||
self.name = pool_config['name']
|
||||
self.security_group_id = pool_config.get('security-group-id')
|
||||
self.subnet_id = pool_config.get('subnet-id')
|
||||
self.public_ipv4 = pool_config.get(
|
||||
'public-ipv4', self.provider.public_ipv4)
|
||||
self.public_ipv6 = pool_config.get(
|
||||
'public-ipv6', self.provider.public_ipv6)
|
||||
# TODO: Deprecate public-ip-address
|
||||
self.public_ipv4 = pool_config.get(
|
||||
'public-ip-address', self.public_ipv4)
|
||||
self.use_internal_ip = pool_config.get(
|
||||
'use-internal-ip', self.provider.use_internal_ip)
|
||||
self.host_key_checking = pool_config.get(
|
||||
'host-key-checking', self.provider.host_key_checking)
|
||||
|
||||
@staticmethod
|
||||
def getSchema():
|
||||
aws_label = AwsLabel.getSchema()
|
||||
|
||||
pool = ConfigPool.getCommonSchemaDict()
|
||||
pool.update({
|
||||
v.Required('name'): str,
|
||||
v.Required('labels'): [pool_label],
|
||||
'host-key-checking': bool,
|
||||
v.Required('labels'): [aws_label],
|
||||
'security-group-id': str,
|
||||
'subnet-id': str,
|
||||
'public-ip-address': bool,
|
||||
'public-ipv4': bool,
|
||||
'public-ipv6': bool,
|
||||
'host-key-checking': bool,
|
||||
})
|
||||
return pool
|
||||
|
||||
image_filters = {
|
||||
v.Any('Name', 'name'): str,
|
||||
v.Any('Values', 'values'): [str]
|
||||
}
|
||||
|
||||
provider_cloud_images = {
|
||||
'name': str,
|
||||
'connection-type': str,
|
||||
'connection-port': int,
|
||||
'shell-type': str,
|
||||
'image-id': str,
|
||||
"image-filters": [image_filters],
|
||||
'username': str,
|
||||
'python-path': str,
|
||||
class AwsProviderConfig(ProviderConfig):
|
||||
def __init__(self, driver, provider):
|
||||
super().__init__(provider)
|
||||
self._pools = {}
|
||||
self.rate = None
|
||||
self.launch_retries = None
|
||||
self.profile_name = None
|
||||
self.region_name = None
|
||||
self.boot_timeout = None
|
||||
self.launch_retries = None
|
||||
self.cloud_images = {}
|
||||
self.diskimages = {}
|
||||
|
||||
@property
|
||||
def pools(self):
|
||||
return self._pools
|
||||
|
||||
@property
|
||||
def manage_images(self):
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def reset():
|
||||
pass
|
||||
|
||||
def load(self, config):
|
||||
self.profile_name = self.provider.get('profile-name')
|
||||
self.region_name = self.provider.get('region-name')
|
||||
|
||||
self.rate = self.provider.get('rate', 1)
|
||||
self.launch_retries = self.provider.get('launch-retries', 3)
|
||||
self.launch_timeout = self.provider.get('launch-timeout', 3600)
|
||||
self.boot_timeout = self.provider.get('boot-timeout', 180)
|
||||
self.use_internal_ip = self.provider.get('use-internal-ip', False)
|
||||
self.host_key_checking = self.provider.get('host-key-checking', True)
|
||||
self.public_ipv4 = self.provider.get('public-ipv4', True)
|
||||
self.public_ipv6 = self.provider.get('public-ipv6', False)
|
||||
self.object_storage = self.provider.get('object-storage')
|
||||
self.image_type = self.provider.get('image-format', 'raw')
|
||||
self.image_name_format = '{image_name}-{timestamp}'
|
||||
self.post_upload_hook = self.provider.get('post-upload-hook')
|
||||
|
||||
self.cloud_images = {}
|
||||
for image in self.provider.get('cloud-images', []):
|
||||
i = AwsProviderCloudImage(image)
|
||||
self.cloud_images[i.name] = i
|
||||
|
||||
self.diskimages = {}
|
||||
for image in self.provider.get('diskimages', []):
|
||||
diskimage = config.diskimages[image['name']]
|
||||
i = AwsProviderDiskImage(self.image_type, image, diskimage)
|
||||
self.diskimages[i.name] = i
|
||||
|
||||
for pool in self.provider.get('pools', []):
|
||||
pp = AwsPool(self, pool)
|
||||
self._pools[pp.name] = pp
|
||||
|
||||
for label in pool.get('labels', []):
|
||||
pl = AwsLabel(label, self, pp)
|
||||
pp.labels[pl.name] = pl
|
||||
config.labels[pl.name].pools.append(pp)
|
||||
|
||||
def getSchema(self):
|
||||
pool = AwsPool.getSchema()
|
||||
provider_cloud_images = AwsProviderCloudImage.getSchema()
|
||||
provider_diskimages = AwsProviderDiskImage.getSchema()
|
||||
object_storage = {
|
||||
v.Required('bucket-name'): str,
|
||||
}
|
||||
|
||||
provider = ProviderConfig.getCommonSchemaDict()
|
||||
provider.update({
|
||||
v.Required('pools'): [pool],
|
||||
v.Required('region-name'): str,
|
||||
'rate': v.Any(int, float),
|
||||
'profile-name': str,
|
||||
'cloud-images': [provider_cloud_images],
|
||||
'diskimages': [provider_diskimages],
|
||||
'hostname-format': str,
|
||||
'boot-timeout': int,
|
||||
'launch-retries': int,
|
||||
'object-storage': object_storage,
|
||||
'image-format': v.Any('ova', 'vhd', 'vhdx', 'vmdk', 'raw'),
|
||||
})
|
||||
return v.Schema(provider)
|
||||
|
||||
|
37
nodepool/tests/fixtures/aws-bad-config-images.yaml
vendored
Normal file
37
nodepool/tests/fixtures/aws-bad-config-images.yaml
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
zookeeper-servers:
|
||||
- host: {zookeeper_host}
|
||||
- port: {zookeeper_port}
|
||||
- chroot: {zookeeper_chroot}
|
||||
|
||||
tenant-resource-limits:
|
||||
- tenant-name: tenant-1
|
||||
max-cores: 1024
|
||||
|
||||
labels:
|
||||
- name: ubuntu1404-bad-config
|
||||
|
||||
providers:
|
||||
- name: ec2-us-west-2
|
||||
driver: aws
|
||||
region-name: us-west-2
|
||||
cloud-images:
|
||||
- name: ubuntu1404-bad-config
|
||||
image-id: ami-1e749f67
|
||||
image-filters:
|
||||
- name: name
|
||||
values:
|
||||
- ubuntu*
|
||||
username: ubuntu
|
||||
pools:
|
||||
- name: main
|
||||
max-servers: 1
|
||||
subnet-id: null
|
||||
security-group-id: null
|
||||
node-attributes:
|
||||
key1: value1
|
||||
key2: value2
|
||||
labels:
|
||||
- name: ubuntu1404-bad-config
|
||||
cloud-image: ubuntu1404-bad-config
|
||||
instance-type: t3.medium
|
||||
key-name: zuul
|
12
nodepool/tests/fixtures/aws.yaml
vendored
12
nodepool/tests/fixtures/aws.yaml
vendored
@ -12,7 +12,6 @@ labels:
|
||||
- name: ubuntu1404-bad-ami-name
|
||||
- name: ubuntu1404-by-filters
|
||||
- name: ubuntu1404-by-capitalized-filters
|
||||
- name: ubuntu1404-bad-config
|
||||
- name: ubuntu1404-ebs-optimized
|
||||
- name: ubuntu1404-non-host-key-checking
|
||||
- name: ubuntu1404-private-ip
|
||||
@ -46,13 +45,6 @@ providers:
|
||||
Values:
|
||||
- ubuntu*
|
||||
username: ubuntu
|
||||
- name: ubuntu1404-bad-config
|
||||
image-id: ami-1e749f67
|
||||
image-filters:
|
||||
- name: name
|
||||
values:
|
||||
- ubuntu*
|
||||
username: ubuntu
|
||||
- name: ubuntu1404-with-shell-type
|
||||
image-id: ami-1e749f67
|
||||
username: ubuntu
|
||||
@ -95,10 +87,6 @@ providers:
|
||||
cloud-image: ubuntu1404-by-capitalized-filters
|
||||
instance-type: t3.medium
|
||||
key-name: zuul
|
||||
- name: ubuntu1404-bad-config
|
||||
cloud-image: ubuntu1404-bad-config
|
||||
instance-type: t3.medium
|
||||
key-name: zuul
|
||||
- name: ubuntu1404-userdata
|
||||
cloud-image: ubuntu1404
|
||||
instance-type: t3.medium
|
||||
|
@ -24,6 +24,7 @@ from unittest.mock import patch
|
||||
import boto3
|
||||
from moto import mock_ec2
|
||||
import yaml
|
||||
import testtools
|
||||
|
||||
from nodepool import config as nodepool_config
|
||||
from nodepool import tests
|
||||
@ -39,7 +40,7 @@ class TestDriverAws(tests.DBTestCase):
|
||||
30, Exception, 'wait for provider'):
|
||||
try:
|
||||
provider_manager = nodepool.getProviderManager(provider)
|
||||
if provider_manager.ec2 is not None:
|
||||
if provider_manager.adapter.ec2 is not None:
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
@ -124,18 +125,28 @@ class TestDriverAws(tests.DBTestCase):
|
||||
self.assertEqual(
|
||||
False,
|
||||
interfaces[0].get('AssociatePublicIpAddress'))
|
||||
return provider_manager.ec2.create_instances_orig(
|
||||
return provider_manager.adapter.ec2.create_instances_orig(
|
||||
*args, **kwargs)
|
||||
|
||||
provider_manager.ec2.create_instances_orig = \
|
||||
provider_manager.ec2.create_instances
|
||||
provider_manager.ec2.create_instances = _fake_create_instances
|
||||
provider_manager.adapter.ec2.create_instances_orig =\
|
||||
provider_manager.adapter.ec2.create_instances
|
||||
provider_manager.adapter.ec2.create_instances =\
|
||||
_fake_create_instances
|
||||
|
||||
# moto does not mock service-quotas, so we do it ourselves:
|
||||
|
||||
def _fake_get_service_quota(*args, **kwargs):
|
||||
# This is a simple fake that only returns the number
|
||||
# of cores.
|
||||
return {'Quota': {'Value': 100}}
|
||||
provider_manager.adapter.aws_quotas.get_service_quota =\
|
||||
_fake_get_service_quota
|
||||
|
||||
req = zk.NodeRequest()
|
||||
req.state = zk.REQUESTED
|
||||
req.tenant_name = 'tenant-1'
|
||||
req.node_types.append(label)
|
||||
with patch('nodepool.driver.aws.handler.nodescan') as nodescan:
|
||||
with patch('nodepool.driver.statemachine.nodescan') as nodescan:
|
||||
nodescan.return_value = 'MOCK KEY'
|
||||
self.zk.storeNodeRequest(req)
|
||||
|
||||
@ -221,8 +232,10 @@ class TestDriverAws(tests.DBTestCase):
|
||||
is_valid_config=False)
|
||||
|
||||
def test_ec2_machine_bad_config(self):
|
||||
self._test_ec2_machine('ubuntu1404-bad-config',
|
||||
is_valid_config=False)
|
||||
# This fails config schema validation
|
||||
with testtools.ExpectedException(ValueError,
|
||||
".*?could not be validated.*?"):
|
||||
self.setup_config('aws-bad-config-images.yaml')
|
||||
|
||||
def test_ec2_machine_non_host_key_checking(self):
|
||||
self._test_ec2_machine('ubuntu1404-non-host-key-checking',
|
||||
@ -249,13 +262,15 @@ class TestDriverAws(tests.DBTestCase):
|
||||
tags=[
|
||||
{"Key": "has-tags", "Value": "true"},
|
||||
{"Key": "Name",
|
||||
"Value": "ubuntu1404-with-tags"}
|
||||
"Value": "np0000000000"}
|
||||
])
|
||||
|
||||
def test_ec2_machine_name_tag(self):
|
||||
# This ignores the Name value in the configuration, but still
|
||||
# succeeds.
|
||||
self._test_ec2_machine('ubuntu1404-with-name-tag',
|
||||
tags=[
|
||||
{"Key": "Name", "Value": "different-name"}
|
||||
{"Key": "Name", "Value": "np0000000000"}
|
||||
])
|
||||
|
||||
def test_ec2_machine_shell_type(self):
|
||||
|
38
releasenotes/notes/aws-statemachine-64fe9beeef9c24b1.yaml
Normal file
38
releasenotes/notes/aws-statemachine-64fe9beeef9c24b1.yaml
Normal file
@ -0,0 +1,38 @@
|
||||
---
|
||||
prelude: >
|
||||
The AWS driver has been updated to achieve parity with other
|
||||
Nodepool drivers.
|
||||
features:
|
||||
- |
|
||||
The AWS driver now supports rate limiting. It utilizes a two-tier
|
||||
rate limiting system to match AWS's request token buckets. The
|
||||
rate specified in the config file is used as the rate for mutating
|
||||
requests. Non-mutating requests will have their rate limited to
|
||||
10 times that amount.
|
||||
- |
|
||||
The AWS driver now supports quota. AWS only provides a quota
|
||||
value for the number of cores.
|
||||
- |
|
||||
The AWS driver now support diskimage uploads.
|
||||
- |
|
||||
The AWS driver uses a new state machine framework within Nodepool
|
||||
with significant caching in order to improve performance at scale.
|
||||
- |
|
||||
The AWS driver now supports IPv6 addresses.
|
||||
upgrade:
|
||||
- |
|
||||
The AWS driver will now ignore the "Name" tag if specified.
|
||||
Instead, it matches the behavior of other Nodepool drivers and
|
||||
sets the instance name to the Nodepool hostname (which is derived
|
||||
from the node name; e.g, "np0000000001")
|
||||
deprecations:
|
||||
- |
|
||||
In AWS providers, the ``public-ip-address`` setting is deprecated.
|
||||
Use ``public-ipv4`` or ``public-ipv6`` instead.
|
||||
- |
|
||||
In AWS providers, specifying image filter values as non-string
|
||||
values is deprecated. The current behavior is that Nodepool
|
||||
coerces non-string values (such as ``true`` or integers) into
|
||||
strings, but a later version of Nodepool will produce an error.
|
||||
Please update config files to use literal (quoted if necessary)
|
||||
YAML strings.
|
Loading…
Reference in New Issue
Block a user