AWS: Use snapshot instead of image import

AWS recommends using the import_image method for importing diskimages
into the system as AMIs, but there is a significant caveat with that:
EC2 will attempt to boot an instance from the image and snapshot the
instance.  That process requires that the image match certain
characteristics of already existing images supported by AWS, so only
certain operating systems can be succesfully imported.

If anything goes wrong with the import process, the errors are opaque
since the temporary instance used for the import is inaccessible to
the user.

An alternative is to use the import_snapshot method which will produce
a snapshot object in EC2, and then a new AMI can be "registered" and
pointed to that snapshot.  It's an extra step for the Nodepool
builder, but it's simple and takes less time overall than waiting for
EC2 to boot up the temporary instance.

This is a much closer approximation to the import scheme used in
other Nodepool drivers.

I have successfully tested this method with a cirros image, which is
notable for not being a supported operating system with the previous
method.

The caveats and instructions relating to setting up the Import/Export
service roles still apply, so the documentation related to them remain.

The method for reconciling missing tags for aborted image uploads
is updated to accomodate the new process.  Notably while the import_image
method left a breadcrumb in the snapshot description, it does not appear
that we are able to set the description when we import the snapshot.
Instead we need to examine the snapshot import task list to find the
intended tags for a given snapshot or image.

Change-Id: I9478c0050777bf35e1201395bd34b9d01b8d5795
This commit is contained in:
James E. Blair 2022-08-01 16:25:12 -07:00
parent 123a32f922
commit 89cda5a1ba
5 changed files with 252 additions and 136 deletions

View File

@ -383,6 +383,19 @@ Selecting the ``aws`` driver adds the following options to the
long-standing issue with ``ansible_shell_type`` in combination
with ``become``.
.. attr:: volume-type
:type: str
:default: gp2
The root `EBS volume type`_ for the image.
.. attr:: volume-size
:type: int
The size of the root EBS volume, in GiB, for the image. If
omitted, the volume size reported for the imported snapshot
will be used.
.. attr:: pools
:type: list

View File

@ -28,6 +28,7 @@ from nodepool.driver.utils import QuotaInformation, RateLimiter
from nodepool.driver import statemachine
import boto3
import botocore.exceptions
def tag_dict_to_list(tagdict):
@ -277,33 +278,48 @@ class AwsAdapter(statemachine.Adapter):
return AwsDeleteStateMachine(self, external_id, log)
def listResources(self):
self._tagAmis()
self._tagSnapshots()
self._tagAmis()
for instance in self._listInstances():
if instance.state["Name"].lower() == "terminated":
try:
if instance.state["Name"].lower() == "terminated":
continue
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(instance.tags),
'instance', instance.id)
for volume in self._listVolumes():
if volume.state.lower() == "deleted":
try:
if volume.state.lower() == "deleted":
continue
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(volume.tags),
'volume', volume.id)
for ami in self._listAmis():
if ami.state.lower() == "deleted":
try:
if ami.state.lower() == "deleted":
continue
except (botocore.exceptions.ClientError, AttributeError):
continue
yield AwsResource(tag_list_to_dict(ami.tags),
'ami', ami.id)
for snap in self._listSnapshots():
if snap.state.lower() == "deleted":
try:
if snap.state.lower() == "deleted":
continue
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(snap.tags),
'snapshot', snap.id)
if self.provider.object_storage:
for obj in self._listObjects():
with self.non_mutating_rate_limiter:
tags = self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
try:
tags = self.s3_client.get_object_tagging(
Bucket=obj.bucket_name, Key=obj.key)
except botocore.exceptions.ClientError:
continue
yield AwsResource(tag_list_to_dict(tags['TagSet']),
'object', obj.key)
@ -368,36 +384,35 @@ class AwsAdapter(statemachine.Adapter):
bucket.upload_fileobj(fobj, object_filename,
ExtraArgs=extra_args)
# Import image as AMI
# Import snapshot
self.log.debug(f"Importing {image_name}")
import_image_task = self._import_image(
Architecture=provider_image.architecture,
DiskContainers=[
{
with self.rate_limiter:
import_snapshot_task = self._import_snapshot(
DiskContainer={
'Format': image_format,
'UserBucket': {
'S3Bucket': bucket_name,
'S3Key': object_filename,
}
},
},
],
TagSpecifications=[
{
'ResourceType': 'import-image-task',
'Tags': tag_dict_to_list(metadata),
},
]
)
task_id = import_image_task['ImportTaskId']
TagSpecifications=[
{
'ResourceType': 'import-snapshot-task',
'Tags': tag_dict_to_list(metadata),
},
]
)
task_id = import_snapshot_task['ImportTaskId']
paginator = self._get_paginator('describe_import_image_tasks')
paginator = self._get_paginator('describe_import_snapshot_tasks')
done = False
while not done:
time.sleep(self.IMAGE_UPLOAD_SLEEP)
with self.non_mutating_rate_limiter:
for page in paginator.paginate(ImportTaskIds=[task_id]):
for task in page['ImportImageTasks']:
if task['Status'].lower() in ('completed', 'deleted'):
for task in page['ImportSnapshotTasks']:
if task['SnapshotTaskDetail']['Status'].lower() in (
'completed', 'deleted'):
done = True
break
@ -405,31 +420,54 @@ class AwsAdapter(statemachine.Adapter):
with self.rate_limiter:
self.s3.Object(bucket_name, object_filename).delete()
if task['Status'].lower() != 'completed':
if task['SnapshotTaskDetail']['Status'].lower() != 'completed':
raise Exception(f"Error uploading image: {task}")
# Tag the AMI
try:
with self.non_mutating_rate_limiter:
ami = self.ec2.Image(task['ImageId'])
with self.rate_limiter:
ami.create_tags(Tags=task['Tags'])
except Exception:
self.log.exception("Error tagging AMI:")
# Tag the snapshot
try:
with self.non_mutating_rate_limiter:
snap = self.ec2.Snapshot(
task['SnapshotDetails'][0]['SnapshotId'])
task['SnapshotTaskDetail']['SnapshotId'])
with self.rate_limiter:
snap.create_tags(Tags=task['Tags'])
except Exception:
self.log.exception("Error tagging snapshot:")
self.log.debug(f"Upload of {image_name} complete as {task['ImageId']}")
volume_size = provider_image.volume_size or snap.volume_size
# Register the snapshot as an AMI
with self.rate_limiter:
register_response = self.ec2_client.register_image(
Architecture=provider_image.architecture,
BlockDeviceMappings=[
{
'DeviceName': '/dev/sda1',
'Ebs': {
'DeleteOnTermination': True,
'SnapshotId': task[
'SnapshotTaskDetail']['SnapshotId'],
'VolumeSize': volume_size,
'VolumeType': provider_image.volume_type,
},
},
],
RootDeviceName='/dev/sda1',
VirtualizationType='hvm',
Name=image_name,
)
# Tag the AMI
try:
with self.non_mutating_rate_limiter:
ami = self.ec2.Image(register_response['ImageId'])
with self.rate_limiter:
ami.create_tags(Tags=task['Tags'])
except Exception:
self.log.exception("Error tagging AMI:")
self.log.debug(f"Upload of {image_name} complete as "
f"{register_response['ImageId']}")
# Last task returned from paginator above
return task['ImageId']
return register_response['ImageId']
def deleteImage(self, external_id):
snaps = set()
@ -449,58 +487,110 @@ class AwsAdapter(statemachine.Adapter):
def _tagAmis(self):
# There is no way to tag imported AMIs, so this routine
# "eventually" tags them. We look for any AMIs without tags
# which correspond to import tasks, and we copy the tags from
# those import tasks to the AMI.
# and we copy the tags from the associated snapshot import
# task.
to_examine = []
for ami in self._listAmis():
if (ami.name.startswith('import-ami-') and
not ami.tags and
ami.id not in self.not_our_images):
# This image was imported but has no tags, which means
# it's either not a nodepool image, or it's a new one
# which doesn't have tags yet. Copy over any tags
# from the import task; otherwise, mark it as an image
# we can ignore in future runs.
task = self._getImportImageTask(ami.name)
tags = tag_list_to_dict(task.get('Tags'))
if (tags.get('nodepool_provider_name') == self.provider.name):
# Copy over tags
self.log.debug(
f"Copying tags from import task {ami.name} to AMI")
with self.rate_limiter:
ami.create_tags(Tags=task['Tags'])
else:
self.not_our_images.add(ami.id)
if ami.id in self.not_our_images:
continue
try:
if ami.tags:
continue
except (botocore.exceptions.ClientError, AttributeError):
continue
# This has no tags, which means it's either not a nodepool
# image, or it's a new one which doesn't have tags yet.
# Copy over any tags from the snapshot import task,
# otherwise, mark it as an image we can ignore in future
# runs.
if len(ami.block_device_mappings) < 1:
self.not_our_images.add(ami.id)
continue
bdm = ami.block_device_mappings[0]
ebs = bdm.get('Ebs')
if not ebs:
self.not_our_images.add(ami.id)
continue
snapshot_id = ebs.get('SnapshotId')
if not snapshot_id:
self.not_our_images.add(ami.id)
continue
to_examine.append((ami, snapshot_id))
if not to_examine:
return
# We have images to examine; get a list of import tasks so
# we can copy the tags from the import task that resulted in
# this image.
task_map = {}
for task in self._listImportSnapshotTasks():
detail = task['SnapshotTaskDetail']
task_snapshot_id = detail.get('SnapshotId')
if not task_snapshot_id:
continue
task_map[task_snapshot_id] = task['Tags']
for ami, snapshot_id in to_examine:
tags = task_map.get(snapshot_id)
if not tags:
self.not_our_images.add(ami.id)
continue
metadata = tag_list_to_dict(tags)
if (metadata.get('nodepool_provider_name') == self.provider.name):
# Copy over tags
self.log.debug(
f"Copying tags from import task to image {ami.id}")
with self.rate_limiter:
ami.create_tags(Tags=tags)
else:
self.not_our_images.add(ami.id)
def _tagSnapshots(self):
# See comments for _tagAmis
to_examine = []
for snap in self._listSnapshots():
if ('import-ami-' in snap.description and
not snap.tags and
snap.id not in self.not_our_snapshots):
try:
if (snap.id not in self.not_our_snapshots and
not snap.tags):
to_examine.append(snap)
except botocore.exceptions.ClientError:
# We may have cached a snapshot that doesn't exist
continue
if not to_examine:
return
match = re.match(r'.*?(import-ami-\w*)', snap.description)
if not match:
self.not_our_snapshots.add(snap.id)
continue
task_id = match.group(1)
task = self._getImportImageTask(task_id)
tags = tag_list_to_dict(task.get('Tags'))
if (tags.get('nodepool_provider_name') == self.provider.name):
# Copy over tags
self.log.debug(
f"Copying tags from import task {task_id} to snapshot")
with self.rate_limiter:
snap.create_tags(Tags=task['Tags'])
else:
self.not_our_snapshots.add(snap.id)
# We have snapshots to examine; get a list of import tasks so
# we can copy the tags from the import task that resulted in
# this snapshot.
task_map = {}
for task in self._listImportSnapshotTasks():
detail = task['SnapshotTaskDetail']
task_snapshot_id = detail.get('SnapshotId')
if not task_snapshot_id:
continue
task_map[task_snapshot_id] = task['Tags']
def _getImportImageTask(self, task_id):
paginator = self._get_paginator('describe_import_image_tasks')
for snap in to_examine:
tags = task_map.get(snap.id)
if not tags:
self.not_our_snapshots.add(snap.id)
continue
metadata = tag_list_to_dict(tags)
if (metadata.get('nodepool_provider_name') == self.provider.name):
# Copy over tags
self.log.debug(
f"Copying tags from import task to snapshot {snap.id}")
with self.rate_limiter:
snap.create_tags(Tags=tags)
else:
self.not_our_snapshots.add(snap.id)
def _listImportSnapshotTasks(self):
paginator = self._get_paginator('describe_import_snapshot_tasks')
with self.non_mutating_rate_limiter:
for page in paginator.paginate(ImportTaskIds=[task_id]):
for task in page['ImportImageTasks']:
# Return the first and only task
return task
for page in paginator.paginate():
for task in page['ImportSnapshotTasks']:
yield task
instance_key_re = re.compile(r'([a-z\-]+)\d.*')
@ -809,8 +899,8 @@ class AwsAdapter(statemachine.Adapter):
# These methods allow the tests to patch our use of boto to
# compensate for missing methods in the boto mocks.
def _import_image(self, *args, **kw):
return self.ec2_client.import_image(*args, **kw)
def _import_snapshot(self, *args, **kw):
return self.ec2_client.import_snapshot(*args, **kw)
def _get_paginator(self, *args, **kw):
return self.ec2_client.get_paginator(*args, **kw)

View File

@ -101,6 +101,8 @@ class AwsProviderDiskImage(ConfigValue):
default_port_mapping.get(self.connection_type, 22))
self.meta = {}
self.architecture = image.get('architecture', 'x86_64')
self.volume_size = image.get('volume-size', None)
self.volume_type = image.get('volume-type', 'gp2')
@property
def external_name(self):
@ -117,6 +119,8 @@ class AwsProviderDiskImage(ConfigValue):
'connection-port': int,
'python-path': str,
'shell-type': str,
'volume-size': int,
'volume-type': str,
}

View File

@ -22,34 +22,28 @@ import boto3
def make_stage_1(task_id, user_bucket, tags):
return {
'Architecture': 'x86_64',
'ImportTaskId': f'import-ami-{task_id}',
'ImportTaskId': f'import-snap-{task_id}',
'Progress': '19',
'SnapshotDetails': [{'DiskImageSize': 355024384.0,
'Format': 'VMDK',
'Status': 'active',
'UserBucket': user_bucket}],
'SnapshotTaskDetail': {'DiskImageSize': 355024384.0,
'Format': 'VMDK',
'Status': 'active',
'UserBucket': user_bucket},
'Status': 'active',
'StatusMessage': 'converting',
'Tags': tags,
}
def make_stage_2(task_id, image_id, snap_id, task):
def make_stage_2(task_id, snap_id, task):
# Make a unique snapshot id that's different than the task id.
return {
'Architecture': 'x86_64',
'BootMode': 'legacy_bios',
'ImageId': image_id,
'ImportTaskId': f'import-ami-{task_id}',
'LicenseType': 'BYOL',
'Platform': 'Linux',
'SnapshotDetails': [{'DeviceName': '/dev/sda1',
'DiskImageSize': 355024384.0,
'Format': 'VMDK',
'SnapshotId': snap_id,
'Status': 'completed',
'UserBucket':
task['SnapshotDetails'][0]['UserBucket']}],
'ImportTaskId': f'import-snap-{task_id}',
'SnapshotTaskDetail': {'DiskImageSize': 355024384.0,
'Format': 'VMDK',
'SnapshotId': snap_id,
'Status': 'completed',
'UserBucket':
task['SnapshotTaskDetail']['UserBucket']},
'Status': 'completed',
'Tags': task['Tags'],
}
@ -67,12 +61,12 @@ class ImportTaskPaginator:
tasks = [t for t in tasks
if t['ImportTaskId'] in kw['ImportTaskIds']]
# A page of tasks
ret = [{'ImportImageTasks': tasks}]
ret = [{'ImportSnapshotTasks': tasks}]
# Move the task along
for task in tasks:
if task['Status'] != 'completed':
self.fake.finish_import_image(task)
self.fake.finish_import_snapshot(task)
return ret
@ -84,46 +78,39 @@ class FakeAws:
self.ec2 = boto3.resource('ec2', region_name='us-west-2')
self.ec2_client = boto3.client('ec2', region_name='us-west-2')
def import_image(self, *args, **kw):
def import_snapshot(self, *args, **kw):
task_id = uuid.uuid4().hex
task = make_stage_1(
task_id,
kw['DiskContainers'][0]['UserBucket'],
kw['DiskContainer']['UserBucket'],
kw['TagSpecifications'][0]['Tags'])
self.tasks[task_id] = task
return task
def finish_import_image(self, task):
def finish_import_snapshot(self, task):
task_id = task['ImportTaskId'].split('-')[-1]
# Make an AMI to simulate the import finishing
reservation = self.ec2_client.run_instances(
ImageId="ami-12c6146b", MinCount=1, MaxCount=1)
instance = reservation["Instances"][0]
instance_id = instance["InstanceId"]
response = self.ec2_client.create_image(
InstanceId=instance_id,
Name=f'import-ami-{task_id}',
)
image_id = response["ImageId"]
self.ec2_client.describe_images(ImageIds=[image_id])["Images"][0]
# Make a Volume to simulate the import finishing
volume = self.ec2_client.create_volume(
Size=80,
AvailabilityZone='us-west-2')
snap_id = self.ec2_client.create_snapshot(
VolumeId=volume['VolumeId'],
Description=f'imported volume import-ami-{task_id}',
)["SnapshotId"]
t2 = make_stage_2(task_id, image_id, snap_id, task)
t2 = make_stage_2(task_id, snap_id, task)
self.tasks[task_id] = t2
return (image_id, snap_id)
return snap_id
def change_snapshot_id(self, task, snapshot_id):
# Given a task, update its snapshot id; the moto
# register_image mock doesn't honor the snapshot_id we pass
# in.
task_id = task['ImportTaskId'].split('-')[-1]
self.tasks[task_id]['SnapshotTaskDetail']['SnapshotId'] = snapshot_id
def get_paginator(self, name):
if name == 'describe_import_image_tasks':
if name == 'describe_import_snapshot_tasks':
return ImportTaskPaginator(self)
raise NotImplementedError()

View File

@ -103,6 +103,9 @@ class TestDriverAws(tests.DBTestCase):
Description='Zuul Nodes')
self.security_group_id = self.security_group['GroupId']
self.patch(nodepool.driver.statemachine, 'nodescan', fake_nodescan)
self.patch(AwsAdapter, '_import_snapshot',
self.fake_aws.import_snapshot)
self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator)
def tearDown(self):
self.mock_ec2.stop()
@ -526,8 +529,6 @@ class TestDriverAws(tests.DBTestCase):
self.assertTrue(response['EbsOptimized']['Value'])
def test_aws_diskimage(self):
self.patch(AwsAdapter, '_import_image', self.fake_aws.import_image)
self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator)
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
@ -558,8 +559,6 @@ class TestDriverAws(tests.DBTestCase):
{'key1': 'value1', 'key2': 'value2'})
def test_aws_diskimage_removal(self):
self.patch(AwsAdapter, '_import_image', self.fake_aws.import_image)
self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator)
configfile = self.setup_config('aws/diskimage.yaml')
self.useBuilder(configfile)
self.waitForImage('ec2-us-west-2', 'fake-image')
@ -601,19 +600,42 @@ class TestDriverAws(tests.DBTestCase):
)
instance_id = reservation['Instances'][0]['InstanceId']
task = self.fake_aws.import_image(
DiskContainers=[{
task = self.fake_aws.import_snapshot(
DiskContainer={
'Format': 'ova',
'UserBucket': {
'S3Bucket': 'nodepool',
'S3Key': 'testfile',
}
}],
},
TagSpecifications=[{
'ResourceType': 'import-image-task',
'ResourceType': 'import-snapshot-task',
'Tags': image_tags,
}])
image_id, snapshot_id = self.fake_aws.finish_import_image(task)
snapshot_id = self.fake_aws.finish_import_snapshot(task)
register_response = self.ec2_client.register_image(
Architecture='amd64',
BlockDeviceMappings=[
{
'DeviceName': '/dev/sda1',
'Ebs': {
'DeleteOnTermination': True,
'SnapshotId': snapshot_id,
'VolumeSize': 20,
'VolumeType': 'gp2',
},
},
],
RootDeviceName='/dev/sda1',
VirtualizationType='hvm',
Name='testimage',
)
image_id = register_response['ImageId']
ami = self.ec2.Image(image_id)
new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId']
self.fake_aws.change_snapshot_id(task, new_snapshot_id)
# Note that the resulting image and snapshot do not have tags
# applied, so we test the automatic retagging methods in the
@ -684,7 +706,7 @@ class TestDriverAws(tests.DBTestCase):
break
for _ in iterate_timeout(30, Exception, 'snapshot deletion'):
snap = self.ec2.Snapshot(snapshot_id)
snap = self.ec2.Snapshot(new_snapshot_id)
try:
if snap.state == 'deleted':
break