Add import_image support to AWS

In I9478c0050777bf35e1201395bd34b9d01b8d5795 we switched from using the
import_image method to import_snapshot in the AWS driver.  This method
is faster and more like other drivers in Nodepool.  However, some operating
systems (such as Windows, RHEL or SLES) require licensing metadata
associated with an AMI which is not available to be set when we register
an AMI from a snapshot.  For these systems, the only viable way to upload
images is with the import_image method.

This change restores the previous method as an option, but keeps the
"snapshot" method as the default.

Change-Id: I81daabebbc9dbe968d8aaf65e6b70f5cdfdd01bf
This commit is contained in:
James E. Blair 2023-01-30 14:22:26 -08:00
parent ad7bf9aaeb
commit fdc093a8de
7 changed files with 523 additions and 64 deletions

View File

@ -403,13 +403,44 @@ Selecting the ``aws`` driver adds the following options to the
:default: gp2 :default: gp2
The root `EBS volume type`_ for the image. The root `EBS volume type`_ for the image.
Only used with the
:value:`providers.[aws].diskimages.import-method.snapshot`
import method.
.. attr:: volume-size .. attr:: volume-size
:type: int :type: int
The size of the root EBS volume, in GiB, for the image. If The size of the root EBS volume, in GiB, for the image. If
omitted, the volume size reported for the imported snapshot omitted, the volume size reported for the imported snapshot
will be used. will be used. Only used with the
:value:`providers.[aws].diskimages.import-method.snapshot`
import method.
.. attr:: import-method
:default: snapshot
The method to use when importing the image.
.. value:: snapshot
This method uploads the image file to AWS as a snapshot
and then registers an AMI directly from the snapshot.
This is faster compared to the `image` method and may be
used with operating systems and versions that AWS does not
otherwise support. However, it is incompatible with some
operating systems which require special licensing or other
metadata in AWS.
.. value:: image
This method uploads the image file to AWS and performs an
"image import" on the file. This causes AWS to boot the
image in a temporary VM and then take a snapshot of that
VM which is then used as the basis of the AMI. This is
slower compared to the `snapshot` method and may only be
used with operating systems and versions which AWS already
supports. This may be necessary in order to use Windows
images.
.. attr:: iops .. attr:: iops
:type: int :type: int

View File

@ -403,8 +403,23 @@ class AwsAdapter(statemachine.Adapter):
bucket.upload_fileobj(fobj, object_filename, bucket.upload_fileobj(fobj, object_filename,
ExtraArgs=extra_args) ExtraArgs=extra_args)
if provider_image.import_method == 'image':
image_id = self._uploadImageImage(
provider_image, image_name, filename,
image_format, metadata, md5, sha256,
bucket_name, object_filename)
else:
image_id = self._uploadImageSnapshot(
provider_image, image_name, filename,
image_format, metadata, md5, sha256,
bucket_name, object_filename)
return image_id
def _uploadImageSnapshot(self, provider_image, image_name, filename,
image_format, metadata, md5, sha256,
bucket_name, object_filename):
# Import snapshot # Import snapshot
self.log.debug(f"Importing {image_name}") self.log.debug(f"Importing {image_name} as snapshot")
with self.rate_limiter: with self.rate_limiter:
import_snapshot_task = self.ec2_client.import_snapshot( import_snapshot_task = self.ec2_client.import_snapshot(
DiskContainer={ DiskContainer={
@ -491,9 +506,74 @@ class AwsAdapter(statemachine.Adapter):
self.log.debug(f"Upload of {image_name} complete as " self.log.debug(f"Upload of {image_name} complete as "
f"{register_response['ImageId']}") f"{register_response['ImageId']}")
# Last task returned from paginator above
return register_response['ImageId'] return register_response['ImageId']
def _uploadImageImage(self, provider_image, image_name, filename,
image_format, metadata, md5, sha256,
bucket_name, object_filename):
# Import image as AMI
self.log.debug(f"Importing {image_name} as AMI")
with self.rate_limiter:
import_image_task = self.ec2_client.import_image(
Architecture=provider_image.architecture,
DiskContainers=[{
'Format': image_format,
'UserBucket': {
'S3Bucket': bucket_name,
'S3Key': object_filename,
},
}],
TagSpecifications=[
{
'ResourceType': 'import-image-task',
'Tags': tag_dict_to_list(metadata),
},
]
)
task_id = import_image_task['ImportTaskId']
paginator = self.ec2_client.get_paginator(
'describe_import_image_tasks')
done = False
while not done:
time.sleep(self.IMAGE_UPLOAD_SLEEP)
with self.non_mutating_rate_limiter:
for page in paginator.paginate(ImportTaskIds=[task_id]):
for task in page['ImportImageTasks']:
if task['Status'].lower() in ('completed', 'deleted'):
done = True
break
self.log.debug(f"Deleting {image_name} from S3")
with self.rate_limiter:
self.s3.Object(bucket_name, object_filename).delete()
if task['Status'].lower() != 'completed':
raise Exception(f"Error uploading image: {task}")
# Tag the AMI
try:
with self.non_mutating_rate_limiter:
ami = self.ec2.Image(task['ImageId'])
with self.rate_limiter:
ami.create_tags(Tags=task['Tags'])
except Exception:
self.log.exception("Error tagging AMI:")
# Tag the snapshot
try:
with self.non_mutating_rate_limiter:
snap = self.ec2.Snapshot(
task['SnapshotDetails'][0]['SnapshotId'])
with self.rate_limiter:
snap.create_tags(Tags=task['Tags'])
except Exception:
self.log.exception("Error tagging snapshot:")
self.log.debug(f"Upload of {image_name} complete as {task['ImageId']}")
# Last task returned from paginator above
return task['ImageId']
def deleteImage(self, external_id): def deleteImage(self, external_id):
snaps = set() snaps = set()
self.log.debug(f"Deleting image {external_id}") self.log.debug(f"Deleting image {external_id}")
@ -512,8 +592,8 @@ class AwsAdapter(statemachine.Adapter):
def _tagAmis(self): def _tagAmis(self):
# There is no way to tag imported AMIs, so this routine # There is no way to tag imported AMIs, so this routine
# "eventually" tags them. We look for any AMIs without tags # "eventually" tags them. We look for any AMIs without tags
# and we copy the tags from the associated snapshot import # and we copy the tags from the associated snapshot or image
# task. # import task.
to_examine = [] to_examine = []
for ami in self._listAmis(): for ami in self._listAmis():
if ami.id in self.not_our_images: if ami.id in self.not_our_images:
@ -523,11 +603,27 @@ class AwsAdapter(statemachine.Adapter):
continue continue
except (botocore.exceptions.ClientError, AttributeError): except (botocore.exceptions.ClientError, AttributeError):
continue continue
# This has no tags, which means it's either not a nodepool # This has no tags, which means it's either not a nodepool
# image, or it's a new one which doesn't have tags yet. # image, or it's a new one which doesn't have tags yet.
# Copy over any tags from the snapshot import task, if ami.name.startswith('import-ami-'):
# otherwise, mark it as an image we can ignore in future task = self._getImportImageTask(ami.name)
# runs. if task:
# This was an import image (not snapshot) so let's
# try to find tags from the import task.
tags = tag_list_to_dict(task.get('Tags'))
if (tags.get('nodepool_provider_name') ==
self.provider.name):
# Copy over tags
self.log.debug(
f"Copying tags from import task {ami.name} to AMI")
with self.rate_limiter:
ami.create_tags(Tags=task['Tags'])
continue
# This may have been a snapshot import; try to copy over
# any tags from the snapshot import task, otherwise, mark
# it as an image we can ignore in future runs.
if len(ami.block_device_mappings) < 1: if len(ami.block_device_mappings) < 1:
self.not_our_images.add(ami.id) self.not_our_images.add(ami.id)
continue continue
@ -574,13 +670,39 @@ class AwsAdapter(statemachine.Adapter):
# See comments for _tagAmis # See comments for _tagAmis
to_examine = [] to_examine = []
for snap in self._listSnapshots(): for snap in self._listSnapshots():
if snap.id in self.not_our_snapshots:
continue
try: try:
if (snap.id not in self.not_our_snapshots and if snap.tags:
not snap.tags): continue
to_examine.append(snap)
except botocore.exceptions.ClientError: except botocore.exceptions.ClientError:
# We may have cached a snapshot that doesn't exist # We may have cached a snapshot that doesn't exist
continue continue
if 'import-ami' in snap.description:
match = re.match(r'.*?(import-ami-\w*)', snap.description)
task = None
if match:
task_id = match.group(1)
task = self._getImportImageTask(task_id)
if task:
# This was an import image (not snapshot) so let's
# try to find tags from the import task.
tags = tag_list_to_dict(task.get('Tags'))
if (tags.get('nodepool_provider_name') ==
self.provider.name):
# Copy over tags
self.log.debug(
f"Copying tags from import task {task_id}"
" to snapshot")
with self.rate_limiter:
snap.create_tags(Tags=task['Tags'])
continue
# This may have been a snapshot import; try to copy over
# any tags from the snapshot import task.
to_examine.append(snap)
if not to_examine: if not to_examine:
return return
@ -610,6 +732,16 @@ class AwsAdapter(statemachine.Adapter):
else: else:
self.not_our_snapshots.add(snap.id) self.not_our_snapshots.add(snap.id)
def _getImportImageTask(self, task_id):
paginator = self.ec2_client.get_paginator(
'describe_import_image_tasks')
with self.non_mutating_rate_limiter:
for page in paginator.paginate(ImportTaskIds=[task_id]):
for task in page['ImportImageTasks']:
# Return the first and only task
return task
return None
def _listImportSnapshotTasks(self): def _listImportSnapshotTasks(self):
paginator = self.ec2_client.get_paginator( paginator = self.ec2_client.get_paginator(
'describe_import_snapshot_tasks') 'describe_import_snapshot_tasks')

View File

@ -104,6 +104,7 @@ class AwsProviderDiskImage(ConfigValue):
self.ena_support = image.get('ena-support', True) self.ena_support = image.get('ena-support', True)
self.volume_size = image.get('volume-size', None) self.volume_size = image.get('volume-size', None)
self.volume_type = image.get('volume-type', 'gp2') self.volume_type = image.get('volume-type', 'gp2')
self.import_method = image.get('import-method', 'snapshot')
self.iops = image.get('iops', None) self.iops = image.get('iops', None)
self.throughput = image.get('throughput', None) self.throughput = image.get('throughput', None)
@ -126,6 +127,7 @@ class AwsProviderDiskImage(ConfigValue):
'ena-support': bool, 'ena-support': bool,
'volume-size': int, 'volume-size': int,
'volume-type': str, 'volume-type': str,
'import-method': v.Any('snapshot', 'image'),
'iops': int, 'iops': int,
'throughput': int, 'throughput': int,
'tags': dict, 'tags': dict,

View File

@ -0,0 +1,66 @@
elements-dir: .
images-dir: '{images_dir}'
build-log-dir: '{build_log_dir}'
build-log-retention: 1
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
zookeeper-tls:
ca: {zookeeper_ca}
cert: {zookeeper_cert}
key: {zookeeper_key}
tenant-resource-limits:
- tenant-name: tenant-1
max-cores: 1024
labels:
- name: diskimage
providers:
- name: ec2-us-west-2
driver: aws
rate: 2
region-name: us-west-2
object-storage:
bucket-name: nodepool
diskimages:
- name: fake-image
tags:
provider_metadata: provider
import-method: image
iops: 1000
throughput: 100
pools:
- name: main
max-servers: 1
subnet-id: {subnet_id}
security-group-id: {security_group_id}
node-attributes:
key1: value1
key2: value2
labels:
- name: diskimage
diskimage: fake-image
instance-type: t3.medium
key-name: zuul
iops: 2000
throughput: 200
diskimages:
- name: fake-image
elements:
- fedora-minimal
- vm
release: 21
dib-cmd: nodepool/tests/fake-image-create
env-vars:
TMPDIR: /opt/dib_tmp
DIB_IMAGE_CACHE: /opt/dib_cache
DIB_CLOUD_IMAGES: http://download.fedoraproject.org/pub/fedora/linux/releases/test/21-Beta/Cloud/Images/x86_64/
BASE_IMAGE_FILE: Fedora-Cloud-Base-20141029-21_Beta.x86_64.qcow2
metadata:
diskimage_metadata: diskimage

View File

@ -19,7 +19,7 @@ import uuid
import boto3 import boto3
def make_stage_1(task_id, user_bucket, tags): def make_import_snapshot_stage_1(task_id, user_bucket, tags):
return { return {
'Architecture': 'x86_64', 'Architecture': 'x86_64',
'ImportTaskId': f'import-snap-{task_id}', 'ImportTaskId': f'import-snap-{task_id}',
@ -34,7 +34,7 @@ def make_stage_1(task_id, user_bucket, tags):
} }
def make_stage_2(task_id, snap_id, task): def make_import_snapshot_stage_2(task_id, snap_id, task):
# Make a unique snapshot id that's different than the task id. # Make a unique snapshot id that's different than the task id.
return { return {
'ImportTaskId': f'import-snap-{task_id}', 'ImportTaskId': f'import-snap-{task_id}',
@ -49,7 +49,43 @@ def make_stage_2(task_id, snap_id, task):
} }
class ImportTaskPaginator: def make_import_image_stage_1(task_id, user_bucket, tags):
return {
'Architecture': 'x86_64',
'ImportTaskId': f'import-ami-{task_id}',
'Progress': '19',
'SnapshotDetails': [{'DiskImageSize': 355024384.0,
'Format': 'VMDK',
'Status': 'active',
'UserBucket': user_bucket}],
'Status': 'active',
'StatusMessage': 'converting',
'Tags': tags,
}
def make_import_image_stage_2(task_id, image_id, snap_id, task):
# Make a unique snapshot id that's different than the task id.
return {
'Architecture': 'x86_64',
'BootMode': 'legacy_bios',
'ImageId': image_id,
'ImportTaskId': f'import-ami-{task_id}',
'LicenseType': 'BYOL',
'Platform': 'Linux',
'SnapshotDetails': [{'DeviceName': '/dev/sda1',
'DiskImageSize': 355024384.0,
'Format': 'VMDK',
'SnapshotId': snap_id,
'Status': 'completed',
'UserBucket':
task['SnapshotDetails'][0]['UserBucket']}],
'Status': 'completed',
'Tags': task['Tags'],
}
class ImportSnapshotTaskPaginator:
log = logging.getLogger("nodepool.FakeAws") log = logging.getLogger("nodepool.FakeAws")
def __init__(self, fake): def __init__(self, fake):
@ -57,6 +93,7 @@ class ImportTaskPaginator:
def paginate(self, **kw): def paginate(self, **kw):
tasks = list(self.fake.tasks.values()) tasks = list(self.fake.tasks.values())
tasks = [t for t in tasks if 'import-snap' in t['ImportTaskId']]
if 'ImportTaskIds' in kw: if 'ImportTaskIds' in kw:
tasks = [t for t in tasks tasks = [t for t in tasks
if t['ImportTaskId'] in kw['ImportTaskIds']] if t['ImportTaskId'] in kw['ImportTaskIds']]
@ -70,6 +107,28 @@ class ImportTaskPaginator:
return ret return ret
class ImportImageTaskPaginator:
log = logging.getLogger("nodepool.FakeAws")
def __init__(self, fake):
self.fake = fake
def paginate(self, **kw):
tasks = list(self.fake.tasks.values())
tasks = [t for t in tasks if 'import-ami' in t['ImportTaskId']]
if 'ImportTaskIds' in kw:
tasks = [t for t in tasks
if t['ImportTaskId'] in kw['ImportTaskIds']]
# A page of tasks
ret = [{'ImportImageTasks': tasks}]
# Move the task along
for task in tasks:
if task['Status'] != 'completed':
self.fake.finish_import_image(task)
return ret
class FakeAws: class FakeAws:
log = logging.getLogger("nodepool.FakeAws") log = logging.getLogger("nodepool.FakeAws")
@ -80,7 +139,7 @@ class FakeAws:
def import_snapshot(self, *args, **kw): def import_snapshot(self, *args, **kw):
task_id = uuid.uuid4().hex task_id = uuid.uuid4().hex
task = make_stage_1( task = make_import_snapshot_stage_1(
task_id, task_id,
kw['DiskContainer']['UserBucket'], kw['DiskContainer']['UserBucket'],
kw['TagSpecifications'][0]['Tags']) kw['TagSpecifications'][0]['Tags'])
@ -98,10 +157,48 @@ class FakeAws:
VolumeId=volume['VolumeId'], VolumeId=volume['VolumeId'],
)["SnapshotId"] )["SnapshotId"]
t2 = make_stage_2(task_id, snap_id, task) t2 = make_import_snapshot_stage_2(task_id, snap_id, task)
self.tasks[task_id] = t2 self.tasks[task_id] = t2
return snap_id return snap_id
def import_image(self, *args, **kw):
task_id = uuid.uuid4().hex
task = make_import_image_stage_1(
task_id,
kw['DiskContainers'][0]['UserBucket'],
kw['TagSpecifications'][0]['Tags'])
self.tasks[task_id] = task
return task
def finish_import_image(self, task):
task_id = task['ImportTaskId'].split('-')[-1]
# Make an AMI to simulate the import finishing
reservation = self.ec2_client.run_instances(
ImageId="ami-12c6146b", MinCount=1, MaxCount=1)
instance = reservation["Instances"][0]
instance_id = instance["InstanceId"]
response = self.ec2_client.create_image(
InstanceId=instance_id,
Name=f'import-ami-{task_id}',
)
image_id = response["ImageId"]
self.ec2_client.describe_images(ImageIds=[image_id])["Images"][0]
volume = self.ec2_client.create_volume(
Size=80,
AvailabilityZone='us-west-2')
snap_id = self.ec2_client.create_snapshot(
VolumeId=volume['VolumeId'],
Description=f'imported volume import-ami-{task_id}',
)["SnapshotId"]
t2 = make_import_image_stage_2(task_id, image_id, snap_id, task)
self.tasks[task_id] = t2
return (image_id, snap_id)
def change_snapshot_id(self, task, snapshot_id): def change_snapshot_id(self, task, snapshot_id):
# Given a task, update its snapshot id; the moto # Given a task, update its snapshot id; the moto
# register_image mock doesn't honor the snapshot_id we pass # register_image mock doesn't honor the snapshot_id we pass
@ -110,8 +207,10 @@ class FakeAws:
self.tasks[task_id]['SnapshotTaskDetail']['SnapshotId'] = snapshot_id self.tasks[task_id]['SnapshotTaskDetail']['SnapshotId'] = snapshot_id
def get_paginator(self, name): def get_paginator(self, name):
if name == 'describe_import_image_tasks':
return ImportImageTaskPaginator(self)
if name == 'describe_import_snapshot_tasks': if name == 'describe_import_snapshot_tasks':
return ImportTaskPaginator(self) return ImportSnapshotTaskPaginator(self)
raise NotImplementedError() raise NotImplementedError()
def _listAmis(self): def _listAmis(self):

View File

@ -60,6 +60,8 @@ class FakeAwsAdapter(AwsAdapter):
self.ec2.create_instances = _fake_create_instances self.ec2.create_instances = _fake_create_instances
self.ec2_client.import_snapshot = \ self.ec2_client.import_snapshot = \
self.__testcase.fake_aws.import_snapshot self.__testcase.fake_aws.import_snapshot
self.ec2_client.import_image = \
self.__testcase.fake_aws.import_image
self.ec2_client.get_paginator = \ self.ec2_client.get_paginator = \
self.__testcase.fake_aws.get_paginator self.__testcase.fake_aws.get_paginator
@ -594,7 +596,7 @@ class TestDriverAws(tests.DBTestCase):
response = instance.describe_attribute(Attribute='ebsOptimized') response = instance.describe_attribute(Attribute='ebsOptimized')
self.assertTrue(response['EbsOptimized']['Value']) self.assertTrue(response['EbsOptimized']['Value'])
def test_aws_diskimage(self): def test_aws_diskimage_snapshot(self):
configfile = self.setup_config('aws/diskimage.yaml') configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile) self.useBuilder(configfile)
@ -636,6 +638,48 @@ class TestDriverAws(tests.DBTestCase):
self.create_instance_calls[0]['BlockDeviceMappings'][0]['Ebs'] self.create_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200) ['Throughput'], 200)
def test_aws_diskimage_image(self):
configfile = self.setup_config('aws/diskimage-import-image.yaml')
self.useBuilder(configfile)
image = self.waitForImage('ec2-us-west-2', 'fake-image')
self.assertEqual(image.username, 'zuul')
ec2_image = self.ec2.Image(image.external_id)
self.assertEqual(ec2_image.state, 'available')
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
in ec2_image.tags)
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
in ec2_image.tags)
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('diskimage')
self.zk.storeNodeRequest(req)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertNotEqual(req.nodes, [])
node = self.zk.getNode(req.nodes[0])
self.assertEqual(node.allocated_to, req.id)
self.assertEqual(node.state, zk.READY)
self.assertIsNotNone(node.launcher)
self.assertEqual(node.connection_type, 'ssh')
self.assertEqual(node.shell_type, None)
self.assertEqual(node.attributes,
{'key1': 'value1', 'key2': 'value2'})
self.assertEqual(
self.create_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Iops'], 2000)
self.assertEqual(
self.create_instance_calls[0]['BlockDeviceMappings'][0]['Ebs']
['Throughput'], 200)
def test_aws_diskimage_removal(self): def test_aws_diskimage_removal(self):
configfile = self.setup_config('aws/diskimage.yaml') configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile) self.useBuilder(configfile)
@ -645,17 +689,19 @@ class TestDriverAws(tests.DBTestCase):
self.waitForBuildDeletion('fake-image', '0000000001') self.waitForBuildDeletion('fake-image', '0000000001')
def test_aws_resource_cleanup(self): def test_aws_resource_cleanup(self):
# This tests everything except the image imports
# Start by setting up leaked resources # Start by setting up leaked resources
instance_tags = [ instance_tags = [
{'Key': 'nodepool_node_id', 'Value': '0000000042'}, {'Key': 'nodepool_node_id', 'Value': '0000000042'},
{'Key': 'nodepool_pool_name', 'Value': 'main'}, {'Key': 'nodepool_pool_name', 'Value': 'main'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'} {'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
] ]
image_tags = [
{'Key': 'nodepool_build_id', 'Value': '0000000042'}, s3_tags = {
{'Key': 'nodepool_upload_id', 'Value': '0000000042'}, 'nodepool_build_id': '0000000042',
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'} 'nodepool_upload_id': '0000000042',
] 'nodepool_provider_name': 'ec2-us-west-2',
}
reservation = self.ec2_client.run_instances( reservation = self.ec2_client.run_instances(
ImageId="ami-12c6146b", MinCount=1, MaxCount=1, ImageId="ami-12c6146b", MinCount=1, MaxCount=1,
@ -676,6 +722,60 @@ class TestDriverAws(tests.DBTestCase):
) )
instance_id = reservation['Instances'][0]['InstanceId'] instance_id = reservation['Instances'][0]['InstanceId']
bucket = self.s3.Bucket('nodepool')
bucket.put_object(Body=b'hi',
Key='testimage',
Tagging=urllib.parse.urlencode(s3_tags))
obj = self.s3.Object('nodepool', 'testimage')
# This effectively asserts the object exists
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
instance = self.ec2.Instance(instance_id)
self.assertEqual(instance.state['Name'], 'running')
volume_id = list(instance.volumes.all())[0].id
volume = self.ec2.Volume(volume_id)
self.assertEqual(volume.state, 'in-use')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
for _ in iterate_timeout(30, Exception, 'instance deletion'):
instance = self.ec2.Instance(instance_id)
if instance.state['Name'] == 'terminated':
break
for _ in iterate_timeout(30, Exception, 'volume deletion'):
volume = self.ec2.Volume(volume_id)
try:
if volume.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
for _ in iterate_timeout(30, Exception, 'object deletion'):
obj = self.s3.Object('nodepool', 'testimage')
try:
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
except self.s3_client.exceptions.NoSuchKey:
break
def test_aws_resource_cleanup_import_snapshot(self):
# This tests the import_snapshot path
# Start by setting up leaked resources
image_tags = [
{'Key': 'nodepool_build_id', 'Value': '0000000042'},
{'Key': 'nodepool_upload_id', 'Value': '0000000042'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
task = self.fake_aws.import_snapshot( task = self.fake_aws.import_snapshot(
DiskContainer={ DiskContainer={
'Format': 'ova', 'Format': 'ova',
@ -717,28 +817,6 @@ class TestDriverAws(tests.DBTestCase):
# applied, so we test the automatic retagging methods in the # applied, so we test the automatic retagging methods in the
# adapter. # adapter.
s3_tags = {
'nodepool_build_id': '0000000042',
'nodepool_upload_id': '0000000042',
'nodepool_provider_name': 'ec2-us-west-2',
}
bucket = self.s3.Bucket('nodepool')
bucket.put_object(Body=b'hi',
Key='testimage',
Tagging=urllib.parse.urlencode(s3_tags))
obj = self.s3.Object('nodepool', 'testimage')
# This effectively asserts the object exists
self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
instance = self.ec2.Instance(instance_id)
self.assertEqual(instance.state['Name'], 'running')
volume_id = list(instance.volumes.all())[0].id
volume = self.ec2.Volume(volume_id)
self.assertEqual(volume.state, 'in-use')
image = self.ec2.Image(image_id) image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available') self.assertEqual(image.state, 'available')
@ -752,20 +830,6 @@ class TestDriverAws(tests.DBTestCase):
pool = self.useNodepool(configfile, watermark_sleep=1) pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start() pool.start()
for _ in iterate_timeout(30, Exception, 'instance deletion'):
instance = self.ec2.Instance(instance_id)
if instance.state['Name'] == 'terminated':
break
for _ in iterate_timeout(30, Exception, 'volume deletion'):
volume = self.ec2.Volume(volume_id)
try:
if volume.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break
for _ in iterate_timeout(30, Exception, 'ami deletion'): for _ in iterate_timeout(30, Exception, 'ami deletion'):
image = self.ec2.Image(image_id) image = self.ec2.Image(image_id)
try: try:
@ -789,10 +853,66 @@ class TestDriverAws(tests.DBTestCase):
# Probably not found # Probably not found
break break
for _ in iterate_timeout(30, Exception, 'object deletion'): def test_aws_resource_cleanup_import_image(self):
obj = self.s3.Object('nodepool', 'testimage') # This tests the import_image path
# Start by setting up leaked resources
image_tags = [
{'Key': 'nodepool_build_id', 'Value': '0000000042'},
{'Key': 'nodepool_upload_id', 'Value': '0000000042'},
{'Key': 'nodepool_provider_name', 'Value': 'ec2-us-west-2'}
]
# The image import path:
task = self.fake_aws.import_image(
DiskContainers=[{
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'nodepool',
'S3Key': 'testfile',
}
}],
TagSpecifications=[{
'ResourceType': 'import-image-task',
'Tags': image_tags,
}])
image_id, snapshot_id = self.fake_aws.finish_import_image(task)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
# adapter.
image = self.ec2.Image(image_id)
self.assertEqual(image.state, 'available')
snap = self.ec2.Snapshot(snapshot_id)
self.assertEqual(snap.state, 'completed')
# Now that the leaked resources exist, start the provider and
# wait for it to clean them.
configfile = self.setup_config('aws/diskimage.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
for _ in iterate_timeout(30, Exception, 'ami deletion'):
image = self.ec2.Image(image_id)
try: try:
self.s3_client.get_object_tagging( # If this has a value the image was not deleted
Bucket=obj.bucket_name, Key=obj.key) if image.state == 'available':
except self.s3_client.exceptions.NoSuchKey: # Definitely not deleted yet
continue
except AttributeError:
# Per AWS API, a recently deleted image is empty and
# looking at the state raises an AttributeFailure; see
# https://github.com/boto/boto3/issues/2531. The image
# was deleted, so we continue on here
break
for _ in iterate_timeout(30, Exception, 'snapshot deletion'):
snap = self.ec2.Snapshot(snapshot_id)
try:
if snap.state == 'deleted':
break
except botocore.exceptions.ClientError:
# Probably not found
break break

View File

@ -0,0 +1,9 @@
---
features:
- |
The AWS driver now supports importing images using either the
"image" or "snapshot" import methods. The "snapshot" method is
the current behavior and remains the default and is the fastest
and most efficient in most circumstances. The "image" method is
available for images which require certain AWS licensing metadata
that can only be added via that method.