From 89cda5a1ba5e7b2c7f33e13ffaf8bb057d94b358 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 1 Aug 2022 16:25:12 -0700 Subject: [PATCH] AWS: Use snapshot instead of image import AWS recommends using the import_image method for importing diskimages into the system as AMIs, but there is a significant caveat with that: EC2 will attempt to boot an instance from the image and snapshot the instance. That process requires that the image match certain characteristics of already existing images supported by AWS, so only certain operating systems can be succesfully imported. If anything goes wrong with the import process, the errors are opaque since the temporary instance used for the import is inaccessible to the user. An alternative is to use the import_snapshot method which will produce a snapshot object in EC2, and then a new AMI can be "registered" and pointed to that snapshot. It's an extra step for the Nodepool builder, but it's simple and takes less time overall than waiting for EC2 to boot up the temporary instance. This is a much closer approximation to the import scheme used in other Nodepool drivers. I have successfully tested this method with a cirros image, which is notable for not being a supported operating system with the previous method. The caveats and instructions relating to setting up the Import/Export service roles still apply, so the documentation related to them remain. The method for reconciling missing tags for aborted image uploads is updated to accomodate the new process. Notably while the import_image method left a breadcrumb in the snapshot description, it does not appear that we are able to set the description when we import the snapshot. Instead we need to examine the snapshot import task list to find the intended tags for a given snapshot or image. Change-Id: I9478c0050777bf35e1201395bd34b9d01b8d5795 --- doc/source/aws.rst | 13 ++ nodepool/driver/aws/adapter.py | 258 +++++++++++++++++-------- nodepool/driver/aws/config.py | 4 + nodepool/tests/unit/fake_aws.py | 71 +++---- nodepool/tests/unit/test_driver_aws.py | 42 +++- 5 files changed, 252 insertions(+), 136 deletions(-) diff --git a/doc/source/aws.rst b/doc/source/aws.rst index 991d21322..28ba11e6b 100644 --- a/doc/source/aws.rst +++ b/doc/source/aws.rst @@ -383,6 +383,19 @@ Selecting the ``aws`` driver adds the following options to the long-standing issue with ``ansible_shell_type`` in combination with ``become``. + .. attr:: volume-type + :type: str + :default: gp2 + + The root `EBS volume type`_ for the image. + + .. attr:: volume-size + :type: int + + The size of the root EBS volume, in GiB, for the image. If + omitted, the volume size reported for the imported snapshot + will be used. + .. attr:: pools :type: list diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index 8e1ec84cf..a4ce36e83 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -28,6 +28,7 @@ from nodepool.driver.utils import QuotaInformation, RateLimiter from nodepool.driver import statemachine import boto3 +import botocore.exceptions def tag_dict_to_list(tagdict): @@ -277,33 +278,48 @@ class AwsAdapter(statemachine.Adapter): return AwsDeleteStateMachine(self, external_id, log) def listResources(self): - self._tagAmis() self._tagSnapshots() + self._tagAmis() for instance in self._listInstances(): - if instance.state["Name"].lower() == "terminated": + try: + if instance.state["Name"].lower() == "terminated": + continue + except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(instance.tags), 'instance', instance.id) for volume in self._listVolumes(): - if volume.state.lower() == "deleted": + try: + if volume.state.lower() == "deleted": + continue + except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(volume.tags), 'volume', volume.id) for ami in self._listAmis(): - if ami.state.lower() == "deleted": + try: + if ami.state.lower() == "deleted": + continue + except (botocore.exceptions.ClientError, AttributeError): continue yield AwsResource(tag_list_to_dict(ami.tags), 'ami', ami.id) for snap in self._listSnapshots(): - if snap.state.lower() == "deleted": + try: + if snap.state.lower() == "deleted": + continue + except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(snap.tags), 'snapshot', snap.id) if self.provider.object_storage: for obj in self._listObjects(): with self.non_mutating_rate_limiter: - tags = self.s3_client.get_object_tagging( - Bucket=obj.bucket_name, Key=obj.key) + try: + tags = self.s3_client.get_object_tagging( + Bucket=obj.bucket_name, Key=obj.key) + except botocore.exceptions.ClientError: + continue yield AwsResource(tag_list_to_dict(tags['TagSet']), 'object', obj.key) @@ -368,36 +384,35 @@ class AwsAdapter(statemachine.Adapter): bucket.upload_fileobj(fobj, object_filename, ExtraArgs=extra_args) - # Import image as AMI + # Import snapshot self.log.debug(f"Importing {image_name}") - import_image_task = self._import_image( - Architecture=provider_image.architecture, - DiskContainers=[ - { + with self.rate_limiter: + import_snapshot_task = self._import_snapshot( + DiskContainer={ 'Format': image_format, 'UserBucket': { 'S3Bucket': bucket_name, 'S3Key': object_filename, - } + }, }, - ], - TagSpecifications=[ - { - 'ResourceType': 'import-image-task', - 'Tags': tag_dict_to_list(metadata), - }, - ] - ) - task_id = import_image_task['ImportTaskId'] + TagSpecifications=[ + { + 'ResourceType': 'import-snapshot-task', + 'Tags': tag_dict_to_list(metadata), + }, + ] + ) + task_id = import_snapshot_task['ImportTaskId'] - paginator = self._get_paginator('describe_import_image_tasks') + paginator = self._get_paginator('describe_import_snapshot_tasks') done = False while not done: time.sleep(self.IMAGE_UPLOAD_SLEEP) with self.non_mutating_rate_limiter: for page in paginator.paginate(ImportTaskIds=[task_id]): - for task in page['ImportImageTasks']: - if task['Status'].lower() in ('completed', 'deleted'): + for task in page['ImportSnapshotTasks']: + if task['SnapshotTaskDetail']['Status'].lower() in ( + 'completed', 'deleted'): done = True break @@ -405,31 +420,54 @@ class AwsAdapter(statemachine.Adapter): with self.rate_limiter: self.s3.Object(bucket_name, object_filename).delete() - if task['Status'].lower() != 'completed': + if task['SnapshotTaskDetail']['Status'].lower() != 'completed': raise Exception(f"Error uploading image: {task}") - # Tag the AMI - try: - with self.non_mutating_rate_limiter: - ami = self.ec2.Image(task['ImageId']) - with self.rate_limiter: - ami.create_tags(Tags=task['Tags']) - except Exception: - self.log.exception("Error tagging AMI:") - # Tag the snapshot try: with self.non_mutating_rate_limiter: snap = self.ec2.Snapshot( - task['SnapshotDetails'][0]['SnapshotId']) + task['SnapshotTaskDetail']['SnapshotId']) with self.rate_limiter: snap.create_tags(Tags=task['Tags']) except Exception: self.log.exception("Error tagging snapshot:") - self.log.debug(f"Upload of {image_name} complete as {task['ImageId']}") + volume_size = provider_image.volume_size or snap.volume_size + # Register the snapshot as an AMI + with self.rate_limiter: + register_response = self.ec2_client.register_image( + Architecture=provider_image.architecture, + BlockDeviceMappings=[ + { + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'DeleteOnTermination': True, + 'SnapshotId': task[ + 'SnapshotTaskDetail']['SnapshotId'], + 'VolumeSize': volume_size, + 'VolumeType': provider_image.volume_type, + }, + }, + ], + RootDeviceName='/dev/sda1', + VirtualizationType='hvm', + Name=image_name, + ) + + # Tag the AMI + try: + with self.non_mutating_rate_limiter: + ami = self.ec2.Image(register_response['ImageId']) + with self.rate_limiter: + ami.create_tags(Tags=task['Tags']) + except Exception: + self.log.exception("Error tagging AMI:") + + self.log.debug(f"Upload of {image_name} complete as " + f"{register_response['ImageId']}") # Last task returned from paginator above - return task['ImageId'] + return register_response['ImageId'] def deleteImage(self, external_id): snaps = set() @@ -449,58 +487,110 @@ class AwsAdapter(statemachine.Adapter): def _tagAmis(self): # There is no way to tag imported AMIs, so this routine # "eventually" tags them. We look for any AMIs without tags - # which correspond to import tasks, and we copy the tags from - # those import tasks to the AMI. + # and we copy the tags from the associated snapshot import + # task. + to_examine = [] for ami in self._listAmis(): - if (ami.name.startswith('import-ami-') and - not ami.tags and - ami.id not in self.not_our_images): - # This image was imported but has no tags, which means - # it's either not a nodepool image, or it's a new one - # which doesn't have tags yet. Copy over any tags - # from the import task; otherwise, mark it as an image - # we can ignore in future runs. - task = self._getImportImageTask(ami.name) - tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('nodepool_provider_name') == self.provider.name): - # Copy over tags - self.log.debug( - f"Copying tags from import task {ami.name} to AMI") - with self.rate_limiter: - ami.create_tags(Tags=task['Tags']) - else: - self.not_our_images.add(ami.id) + if ami.id in self.not_our_images: + continue + try: + if ami.tags: + continue + except (botocore.exceptions.ClientError, AttributeError): + continue + # This has no tags, which means it's either not a nodepool + # image, or it's a new one which doesn't have tags yet. + # Copy over any tags from the snapshot import task, + # otherwise, mark it as an image we can ignore in future + # runs. + if len(ami.block_device_mappings) < 1: + self.not_our_images.add(ami.id) + continue + bdm = ami.block_device_mappings[0] + ebs = bdm.get('Ebs') + if not ebs: + self.not_our_images.add(ami.id) + continue + snapshot_id = ebs.get('SnapshotId') + if not snapshot_id: + self.not_our_images.add(ami.id) + continue + to_examine.append((ami, snapshot_id)) + if not to_examine: + return + + # We have images to examine; get a list of import tasks so + # we can copy the tags from the import task that resulted in + # this image. + task_map = {} + for task in self._listImportSnapshotTasks(): + detail = task['SnapshotTaskDetail'] + task_snapshot_id = detail.get('SnapshotId') + if not task_snapshot_id: + continue + task_map[task_snapshot_id] = task['Tags'] + + for ami, snapshot_id in to_examine: + tags = task_map.get(snapshot_id) + if not tags: + self.not_our_images.add(ami.id) + continue + metadata = tag_list_to_dict(tags) + if (metadata.get('nodepool_provider_name') == self.provider.name): + # Copy over tags + self.log.debug( + f"Copying tags from import task to image {ami.id}") + with self.rate_limiter: + ami.create_tags(Tags=tags) + else: + self.not_our_images.add(ami.id) def _tagSnapshots(self): # See comments for _tagAmis + to_examine = [] for snap in self._listSnapshots(): - if ('import-ami-' in snap.description and - not snap.tags and - snap.id not in self.not_our_snapshots): + try: + if (snap.id not in self.not_our_snapshots and + not snap.tags): + to_examine.append(snap) + except botocore.exceptions.ClientError: + # We may have cached a snapshot that doesn't exist + continue + if not to_examine: + return - match = re.match(r'.*?(import-ami-\w*)', snap.description) - if not match: - self.not_our_snapshots.add(snap.id) - continue - task_id = match.group(1) - task = self._getImportImageTask(task_id) - tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('nodepool_provider_name') == self.provider.name): - # Copy over tags - self.log.debug( - f"Copying tags from import task {task_id} to snapshot") - with self.rate_limiter: - snap.create_tags(Tags=task['Tags']) - else: - self.not_our_snapshots.add(snap.id) + # We have snapshots to examine; get a list of import tasks so + # we can copy the tags from the import task that resulted in + # this snapshot. + task_map = {} + for task in self._listImportSnapshotTasks(): + detail = task['SnapshotTaskDetail'] + task_snapshot_id = detail.get('SnapshotId') + if not task_snapshot_id: + continue + task_map[task_snapshot_id] = task['Tags'] - def _getImportImageTask(self, task_id): - paginator = self._get_paginator('describe_import_image_tasks') + for snap in to_examine: + tags = task_map.get(snap.id) + if not tags: + self.not_our_snapshots.add(snap.id) + continue + metadata = tag_list_to_dict(tags) + if (metadata.get('nodepool_provider_name') == self.provider.name): + # Copy over tags + self.log.debug( + f"Copying tags from import task to snapshot {snap.id}") + with self.rate_limiter: + snap.create_tags(Tags=tags) + else: + self.not_our_snapshots.add(snap.id) + + def _listImportSnapshotTasks(self): + paginator = self._get_paginator('describe_import_snapshot_tasks') with self.non_mutating_rate_limiter: - for page in paginator.paginate(ImportTaskIds=[task_id]): - for task in page['ImportImageTasks']: - # Return the first and only task - return task + for page in paginator.paginate(): + for task in page['ImportSnapshotTasks']: + yield task instance_key_re = re.compile(r'([a-z\-]+)\d.*') @@ -809,8 +899,8 @@ class AwsAdapter(statemachine.Adapter): # These methods allow the tests to patch our use of boto to # compensate for missing methods in the boto mocks. - def _import_image(self, *args, **kw): - return self.ec2_client.import_image(*args, **kw) + def _import_snapshot(self, *args, **kw): + return self.ec2_client.import_snapshot(*args, **kw) def _get_paginator(self, *args, **kw): return self.ec2_client.get_paginator(*args, **kw) diff --git a/nodepool/driver/aws/config.py b/nodepool/driver/aws/config.py index d94c5c1e8..fe0e1465e 100644 --- a/nodepool/driver/aws/config.py +++ b/nodepool/driver/aws/config.py @@ -101,6 +101,8 @@ class AwsProviderDiskImage(ConfigValue): default_port_mapping.get(self.connection_type, 22)) self.meta = {} self.architecture = image.get('architecture', 'x86_64') + self.volume_size = image.get('volume-size', None) + self.volume_type = image.get('volume-type', 'gp2') @property def external_name(self): @@ -117,6 +119,8 @@ class AwsProviderDiskImage(ConfigValue): 'connection-port': int, 'python-path': str, 'shell-type': str, + 'volume-size': int, + 'volume-type': str, } diff --git a/nodepool/tests/unit/fake_aws.py b/nodepool/tests/unit/fake_aws.py index 7e85f5d8a..a72648984 100644 --- a/nodepool/tests/unit/fake_aws.py +++ b/nodepool/tests/unit/fake_aws.py @@ -22,34 +22,28 @@ import boto3 def make_stage_1(task_id, user_bucket, tags): return { 'Architecture': 'x86_64', - 'ImportTaskId': f'import-ami-{task_id}', + 'ImportTaskId': f'import-snap-{task_id}', 'Progress': '19', - 'SnapshotDetails': [{'DiskImageSize': 355024384.0, - 'Format': 'VMDK', - 'Status': 'active', - 'UserBucket': user_bucket}], + 'SnapshotTaskDetail': {'DiskImageSize': 355024384.0, + 'Format': 'VMDK', + 'Status': 'active', + 'UserBucket': user_bucket}, 'Status': 'active', 'StatusMessage': 'converting', 'Tags': tags, } -def make_stage_2(task_id, image_id, snap_id, task): +def make_stage_2(task_id, snap_id, task): # Make a unique snapshot id that's different than the task id. return { - 'Architecture': 'x86_64', - 'BootMode': 'legacy_bios', - 'ImageId': image_id, - 'ImportTaskId': f'import-ami-{task_id}', - 'LicenseType': 'BYOL', - 'Platform': 'Linux', - 'SnapshotDetails': [{'DeviceName': '/dev/sda1', - 'DiskImageSize': 355024384.0, - 'Format': 'VMDK', - 'SnapshotId': snap_id, - 'Status': 'completed', - 'UserBucket': - task['SnapshotDetails'][0]['UserBucket']}], + 'ImportTaskId': f'import-snap-{task_id}', + 'SnapshotTaskDetail': {'DiskImageSize': 355024384.0, + 'Format': 'VMDK', + 'SnapshotId': snap_id, + 'Status': 'completed', + 'UserBucket': + task['SnapshotTaskDetail']['UserBucket']}, 'Status': 'completed', 'Tags': task['Tags'], } @@ -67,12 +61,12 @@ class ImportTaskPaginator: tasks = [t for t in tasks if t['ImportTaskId'] in kw['ImportTaskIds']] # A page of tasks - ret = [{'ImportImageTasks': tasks}] + ret = [{'ImportSnapshotTasks': tasks}] # Move the task along for task in tasks: if task['Status'] != 'completed': - self.fake.finish_import_image(task) + self.fake.finish_import_snapshot(task) return ret @@ -84,46 +78,39 @@ class FakeAws: self.ec2 = boto3.resource('ec2', region_name='us-west-2') self.ec2_client = boto3.client('ec2', region_name='us-west-2') - def import_image(self, *args, **kw): + def import_snapshot(self, *args, **kw): task_id = uuid.uuid4().hex task = make_stage_1( task_id, - kw['DiskContainers'][0]['UserBucket'], + kw['DiskContainer']['UserBucket'], kw['TagSpecifications'][0]['Tags']) self.tasks[task_id] = task return task - def finish_import_image(self, task): + def finish_import_snapshot(self, task): task_id = task['ImportTaskId'].split('-')[-1] - # Make an AMI to simulate the import finishing - reservation = self.ec2_client.run_instances( - ImageId="ami-12c6146b", MinCount=1, MaxCount=1) - instance = reservation["Instances"][0] - instance_id = instance["InstanceId"] - - response = self.ec2_client.create_image( - InstanceId=instance_id, - Name=f'import-ami-{task_id}', - ) - - image_id = response["ImageId"] - self.ec2_client.describe_images(ImageIds=[image_id])["Images"][0] - + # Make a Volume to simulate the import finishing volume = self.ec2_client.create_volume( Size=80, AvailabilityZone='us-west-2') snap_id = self.ec2_client.create_snapshot( VolumeId=volume['VolumeId'], - Description=f'imported volume import-ami-{task_id}', )["SnapshotId"] - t2 = make_stage_2(task_id, image_id, snap_id, task) + t2 = make_stage_2(task_id, snap_id, task) self.tasks[task_id] = t2 - return (image_id, snap_id) + return snap_id + + def change_snapshot_id(self, task, snapshot_id): + # Given a task, update its snapshot id; the moto + # register_image mock doesn't honor the snapshot_id we pass + # in. + task_id = task['ImportTaskId'].split('-')[-1] + self.tasks[task_id]['SnapshotTaskDetail']['SnapshotId'] = snapshot_id def get_paginator(self, name): - if name == 'describe_import_image_tasks': + if name == 'describe_import_snapshot_tasks': return ImportTaskPaginator(self) raise NotImplementedError() diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index aa6da341a..aea273dad 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -103,6 +103,9 @@ class TestDriverAws(tests.DBTestCase): Description='Zuul Nodes') self.security_group_id = self.security_group['GroupId'] self.patch(nodepool.driver.statemachine, 'nodescan', fake_nodescan) + self.patch(AwsAdapter, '_import_snapshot', + self.fake_aws.import_snapshot) + self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator) def tearDown(self): self.mock_ec2.stop() @@ -526,8 +529,6 @@ class TestDriverAws(tests.DBTestCase): self.assertTrue(response['EbsOptimized']['Value']) def test_aws_diskimage(self): - self.patch(AwsAdapter, '_import_image', self.fake_aws.import_image) - self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator) configfile = self.setup_config('aws/diskimage.yaml') self.useBuilder(configfile) @@ -558,8 +559,6 @@ class TestDriverAws(tests.DBTestCase): {'key1': 'value1', 'key2': 'value2'}) def test_aws_diskimage_removal(self): - self.patch(AwsAdapter, '_import_image', self.fake_aws.import_image) - self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator) configfile = self.setup_config('aws/diskimage.yaml') self.useBuilder(configfile) self.waitForImage('ec2-us-west-2', 'fake-image') @@ -601,19 +600,42 @@ class TestDriverAws(tests.DBTestCase): ) instance_id = reservation['Instances'][0]['InstanceId'] - task = self.fake_aws.import_image( - DiskContainers=[{ + task = self.fake_aws.import_snapshot( + DiskContainer={ 'Format': 'ova', 'UserBucket': { 'S3Bucket': 'nodepool', 'S3Key': 'testfile', } - }], + }, TagSpecifications=[{ - 'ResourceType': 'import-image-task', + 'ResourceType': 'import-snapshot-task', 'Tags': image_tags, }]) - image_id, snapshot_id = self.fake_aws.finish_import_image(task) + snapshot_id = self.fake_aws.finish_import_snapshot(task) + + register_response = self.ec2_client.register_image( + Architecture='amd64', + BlockDeviceMappings=[ + { + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'DeleteOnTermination': True, + 'SnapshotId': snapshot_id, + 'VolumeSize': 20, + 'VolumeType': 'gp2', + }, + }, + ], + RootDeviceName='/dev/sda1', + VirtualizationType='hvm', + Name='testimage', + ) + image_id = register_response['ImageId'] + + ami = self.ec2.Image(image_id) + new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId'] + self.fake_aws.change_snapshot_id(task, new_snapshot_id) # Note that the resulting image and snapshot do not have tags # applied, so we test the automatic retagging methods in the @@ -684,7 +706,7 @@ class TestDriverAws(tests.DBTestCase): break for _ in iterate_timeout(30, Exception, 'snapshot deletion'): - snap = self.ec2.Snapshot(snapshot_id) + snap = self.ec2.Snapshot(new_snapshot_id) try: if snap.state == 'deleted': break