diff --git a/doc/source/aws.rst b/doc/source/aws.rst index 991d21322..28ba11e6b 100644 --- a/doc/source/aws.rst +++ b/doc/source/aws.rst @@ -383,6 +383,19 @@ Selecting the ``aws`` driver adds the following options to the long-standing issue with ``ansible_shell_type`` in combination with ``become``. + .. attr:: volume-type + :type: str + :default: gp2 + + The root `EBS volume type`_ for the image. + + .. attr:: volume-size + :type: int + + The size of the root EBS volume, in GiB, for the image. If + omitted, the volume size reported for the imported snapshot + will be used. + .. attr:: pools :type: list diff --git a/nodepool/driver/aws/adapter.py b/nodepool/driver/aws/adapter.py index 8e1ec84cf..a4ce36e83 100644 --- a/nodepool/driver/aws/adapter.py +++ b/nodepool/driver/aws/adapter.py @@ -28,6 +28,7 @@ from nodepool.driver.utils import QuotaInformation, RateLimiter from nodepool.driver import statemachine import boto3 +import botocore.exceptions def tag_dict_to_list(tagdict): @@ -277,33 +278,48 @@ class AwsAdapter(statemachine.Adapter): return AwsDeleteStateMachine(self, external_id, log) def listResources(self): - self._tagAmis() self._tagSnapshots() + self._tagAmis() for instance in self._listInstances(): - if instance.state["Name"].lower() == "terminated": + try: + if instance.state["Name"].lower() == "terminated": + continue + except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(instance.tags), 'instance', instance.id) for volume in self._listVolumes(): - if volume.state.lower() == "deleted": + try: + if volume.state.lower() == "deleted": + continue + except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(volume.tags), 'volume', volume.id) for ami in self._listAmis(): - if ami.state.lower() == "deleted": + try: + if ami.state.lower() == "deleted": + continue + except (botocore.exceptions.ClientError, AttributeError): continue yield AwsResource(tag_list_to_dict(ami.tags), 'ami', ami.id) for snap in self._listSnapshots(): - if snap.state.lower() == "deleted": + try: + if snap.state.lower() == "deleted": + continue + except botocore.exceptions.ClientError: continue yield AwsResource(tag_list_to_dict(snap.tags), 'snapshot', snap.id) if self.provider.object_storage: for obj in self._listObjects(): with self.non_mutating_rate_limiter: - tags = self.s3_client.get_object_tagging( - Bucket=obj.bucket_name, Key=obj.key) + try: + tags = self.s3_client.get_object_tagging( + Bucket=obj.bucket_name, Key=obj.key) + except botocore.exceptions.ClientError: + continue yield AwsResource(tag_list_to_dict(tags['TagSet']), 'object', obj.key) @@ -368,36 +384,35 @@ class AwsAdapter(statemachine.Adapter): bucket.upload_fileobj(fobj, object_filename, ExtraArgs=extra_args) - # Import image as AMI + # Import snapshot self.log.debug(f"Importing {image_name}") - import_image_task = self._import_image( - Architecture=provider_image.architecture, - DiskContainers=[ - { + with self.rate_limiter: + import_snapshot_task = self._import_snapshot( + DiskContainer={ 'Format': image_format, 'UserBucket': { 'S3Bucket': bucket_name, 'S3Key': object_filename, - } + }, }, - ], - TagSpecifications=[ - { - 'ResourceType': 'import-image-task', - 'Tags': tag_dict_to_list(metadata), - }, - ] - ) - task_id = import_image_task['ImportTaskId'] + TagSpecifications=[ + { + 'ResourceType': 'import-snapshot-task', + 'Tags': tag_dict_to_list(metadata), + }, + ] + ) + task_id = import_snapshot_task['ImportTaskId'] - paginator = self._get_paginator('describe_import_image_tasks') + paginator = self._get_paginator('describe_import_snapshot_tasks') done = False while not done: time.sleep(self.IMAGE_UPLOAD_SLEEP) with self.non_mutating_rate_limiter: for page in paginator.paginate(ImportTaskIds=[task_id]): - for task in page['ImportImageTasks']: - if task['Status'].lower() in ('completed', 'deleted'): + for task in page['ImportSnapshotTasks']: + if task['SnapshotTaskDetail']['Status'].lower() in ( + 'completed', 'deleted'): done = True break @@ -405,31 +420,54 @@ class AwsAdapter(statemachine.Adapter): with self.rate_limiter: self.s3.Object(bucket_name, object_filename).delete() - if task['Status'].lower() != 'completed': + if task['SnapshotTaskDetail']['Status'].lower() != 'completed': raise Exception(f"Error uploading image: {task}") - # Tag the AMI - try: - with self.non_mutating_rate_limiter: - ami = self.ec2.Image(task['ImageId']) - with self.rate_limiter: - ami.create_tags(Tags=task['Tags']) - except Exception: - self.log.exception("Error tagging AMI:") - # Tag the snapshot try: with self.non_mutating_rate_limiter: snap = self.ec2.Snapshot( - task['SnapshotDetails'][0]['SnapshotId']) + task['SnapshotTaskDetail']['SnapshotId']) with self.rate_limiter: snap.create_tags(Tags=task['Tags']) except Exception: self.log.exception("Error tagging snapshot:") - self.log.debug(f"Upload of {image_name} complete as {task['ImageId']}") + volume_size = provider_image.volume_size or snap.volume_size + # Register the snapshot as an AMI + with self.rate_limiter: + register_response = self.ec2_client.register_image( + Architecture=provider_image.architecture, + BlockDeviceMappings=[ + { + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'DeleteOnTermination': True, + 'SnapshotId': task[ + 'SnapshotTaskDetail']['SnapshotId'], + 'VolumeSize': volume_size, + 'VolumeType': provider_image.volume_type, + }, + }, + ], + RootDeviceName='/dev/sda1', + VirtualizationType='hvm', + Name=image_name, + ) + + # Tag the AMI + try: + with self.non_mutating_rate_limiter: + ami = self.ec2.Image(register_response['ImageId']) + with self.rate_limiter: + ami.create_tags(Tags=task['Tags']) + except Exception: + self.log.exception("Error tagging AMI:") + + self.log.debug(f"Upload of {image_name} complete as " + f"{register_response['ImageId']}") # Last task returned from paginator above - return task['ImageId'] + return register_response['ImageId'] def deleteImage(self, external_id): snaps = set() @@ -449,58 +487,110 @@ class AwsAdapter(statemachine.Adapter): def _tagAmis(self): # There is no way to tag imported AMIs, so this routine # "eventually" tags them. We look for any AMIs without tags - # which correspond to import tasks, and we copy the tags from - # those import tasks to the AMI. + # and we copy the tags from the associated snapshot import + # task. + to_examine = [] for ami in self._listAmis(): - if (ami.name.startswith('import-ami-') and - not ami.tags and - ami.id not in self.not_our_images): - # This image was imported but has no tags, which means - # it's either not a nodepool image, or it's a new one - # which doesn't have tags yet. Copy over any tags - # from the import task; otherwise, mark it as an image - # we can ignore in future runs. - task = self._getImportImageTask(ami.name) - tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('nodepool_provider_name') == self.provider.name): - # Copy over tags - self.log.debug( - f"Copying tags from import task {ami.name} to AMI") - with self.rate_limiter: - ami.create_tags(Tags=task['Tags']) - else: - self.not_our_images.add(ami.id) + if ami.id in self.not_our_images: + continue + try: + if ami.tags: + continue + except (botocore.exceptions.ClientError, AttributeError): + continue + # This has no tags, which means it's either not a nodepool + # image, or it's a new one which doesn't have tags yet. + # Copy over any tags from the snapshot import task, + # otherwise, mark it as an image we can ignore in future + # runs. + if len(ami.block_device_mappings) < 1: + self.not_our_images.add(ami.id) + continue + bdm = ami.block_device_mappings[0] + ebs = bdm.get('Ebs') + if not ebs: + self.not_our_images.add(ami.id) + continue + snapshot_id = ebs.get('SnapshotId') + if not snapshot_id: + self.not_our_images.add(ami.id) + continue + to_examine.append((ami, snapshot_id)) + if not to_examine: + return + + # We have images to examine; get a list of import tasks so + # we can copy the tags from the import task that resulted in + # this image. + task_map = {} + for task in self._listImportSnapshotTasks(): + detail = task['SnapshotTaskDetail'] + task_snapshot_id = detail.get('SnapshotId') + if not task_snapshot_id: + continue + task_map[task_snapshot_id] = task['Tags'] + + for ami, snapshot_id in to_examine: + tags = task_map.get(snapshot_id) + if not tags: + self.not_our_images.add(ami.id) + continue + metadata = tag_list_to_dict(tags) + if (metadata.get('nodepool_provider_name') == self.provider.name): + # Copy over tags + self.log.debug( + f"Copying tags from import task to image {ami.id}") + with self.rate_limiter: + ami.create_tags(Tags=tags) + else: + self.not_our_images.add(ami.id) def _tagSnapshots(self): # See comments for _tagAmis + to_examine = [] for snap in self._listSnapshots(): - if ('import-ami-' in snap.description and - not snap.tags and - snap.id not in self.not_our_snapshots): + try: + if (snap.id not in self.not_our_snapshots and + not snap.tags): + to_examine.append(snap) + except botocore.exceptions.ClientError: + # We may have cached a snapshot that doesn't exist + continue + if not to_examine: + return - match = re.match(r'.*?(import-ami-\w*)', snap.description) - if not match: - self.not_our_snapshots.add(snap.id) - continue - task_id = match.group(1) - task = self._getImportImageTask(task_id) - tags = tag_list_to_dict(task.get('Tags')) - if (tags.get('nodepool_provider_name') == self.provider.name): - # Copy over tags - self.log.debug( - f"Copying tags from import task {task_id} to snapshot") - with self.rate_limiter: - snap.create_tags(Tags=task['Tags']) - else: - self.not_our_snapshots.add(snap.id) + # We have snapshots to examine; get a list of import tasks so + # we can copy the tags from the import task that resulted in + # this snapshot. + task_map = {} + for task in self._listImportSnapshotTasks(): + detail = task['SnapshotTaskDetail'] + task_snapshot_id = detail.get('SnapshotId') + if not task_snapshot_id: + continue + task_map[task_snapshot_id] = task['Tags'] - def _getImportImageTask(self, task_id): - paginator = self._get_paginator('describe_import_image_tasks') + for snap in to_examine: + tags = task_map.get(snap.id) + if not tags: + self.not_our_snapshots.add(snap.id) + continue + metadata = tag_list_to_dict(tags) + if (metadata.get('nodepool_provider_name') == self.provider.name): + # Copy over tags + self.log.debug( + f"Copying tags from import task to snapshot {snap.id}") + with self.rate_limiter: + snap.create_tags(Tags=tags) + else: + self.not_our_snapshots.add(snap.id) + + def _listImportSnapshotTasks(self): + paginator = self._get_paginator('describe_import_snapshot_tasks') with self.non_mutating_rate_limiter: - for page in paginator.paginate(ImportTaskIds=[task_id]): - for task in page['ImportImageTasks']: - # Return the first and only task - return task + for page in paginator.paginate(): + for task in page['ImportSnapshotTasks']: + yield task instance_key_re = re.compile(r'([a-z\-]+)\d.*') @@ -809,8 +899,8 @@ class AwsAdapter(statemachine.Adapter): # These methods allow the tests to patch our use of boto to # compensate for missing methods in the boto mocks. - def _import_image(self, *args, **kw): - return self.ec2_client.import_image(*args, **kw) + def _import_snapshot(self, *args, **kw): + return self.ec2_client.import_snapshot(*args, **kw) def _get_paginator(self, *args, **kw): return self.ec2_client.get_paginator(*args, **kw) diff --git a/nodepool/driver/aws/config.py b/nodepool/driver/aws/config.py index d94c5c1e8..fe0e1465e 100644 --- a/nodepool/driver/aws/config.py +++ b/nodepool/driver/aws/config.py @@ -101,6 +101,8 @@ class AwsProviderDiskImage(ConfigValue): default_port_mapping.get(self.connection_type, 22)) self.meta = {} self.architecture = image.get('architecture', 'x86_64') + self.volume_size = image.get('volume-size', None) + self.volume_type = image.get('volume-type', 'gp2') @property def external_name(self): @@ -117,6 +119,8 @@ class AwsProviderDiskImage(ConfigValue): 'connection-port': int, 'python-path': str, 'shell-type': str, + 'volume-size': int, + 'volume-type': str, } diff --git a/nodepool/tests/unit/fake_aws.py b/nodepool/tests/unit/fake_aws.py index 7e85f5d8a..a72648984 100644 --- a/nodepool/tests/unit/fake_aws.py +++ b/nodepool/tests/unit/fake_aws.py @@ -22,34 +22,28 @@ import boto3 def make_stage_1(task_id, user_bucket, tags): return { 'Architecture': 'x86_64', - 'ImportTaskId': f'import-ami-{task_id}', + 'ImportTaskId': f'import-snap-{task_id}', 'Progress': '19', - 'SnapshotDetails': [{'DiskImageSize': 355024384.0, - 'Format': 'VMDK', - 'Status': 'active', - 'UserBucket': user_bucket}], + 'SnapshotTaskDetail': {'DiskImageSize': 355024384.0, + 'Format': 'VMDK', + 'Status': 'active', + 'UserBucket': user_bucket}, 'Status': 'active', 'StatusMessage': 'converting', 'Tags': tags, } -def make_stage_2(task_id, image_id, snap_id, task): +def make_stage_2(task_id, snap_id, task): # Make a unique snapshot id that's different than the task id. return { - 'Architecture': 'x86_64', - 'BootMode': 'legacy_bios', - 'ImageId': image_id, - 'ImportTaskId': f'import-ami-{task_id}', - 'LicenseType': 'BYOL', - 'Platform': 'Linux', - 'SnapshotDetails': [{'DeviceName': '/dev/sda1', - 'DiskImageSize': 355024384.0, - 'Format': 'VMDK', - 'SnapshotId': snap_id, - 'Status': 'completed', - 'UserBucket': - task['SnapshotDetails'][0]['UserBucket']}], + 'ImportTaskId': f'import-snap-{task_id}', + 'SnapshotTaskDetail': {'DiskImageSize': 355024384.0, + 'Format': 'VMDK', + 'SnapshotId': snap_id, + 'Status': 'completed', + 'UserBucket': + task['SnapshotTaskDetail']['UserBucket']}, 'Status': 'completed', 'Tags': task['Tags'], } @@ -67,12 +61,12 @@ class ImportTaskPaginator: tasks = [t for t in tasks if t['ImportTaskId'] in kw['ImportTaskIds']] # A page of tasks - ret = [{'ImportImageTasks': tasks}] + ret = [{'ImportSnapshotTasks': tasks}] # Move the task along for task in tasks: if task['Status'] != 'completed': - self.fake.finish_import_image(task) + self.fake.finish_import_snapshot(task) return ret @@ -84,46 +78,39 @@ class FakeAws: self.ec2 = boto3.resource('ec2', region_name='us-west-2') self.ec2_client = boto3.client('ec2', region_name='us-west-2') - def import_image(self, *args, **kw): + def import_snapshot(self, *args, **kw): task_id = uuid.uuid4().hex task = make_stage_1( task_id, - kw['DiskContainers'][0]['UserBucket'], + kw['DiskContainer']['UserBucket'], kw['TagSpecifications'][0]['Tags']) self.tasks[task_id] = task return task - def finish_import_image(self, task): + def finish_import_snapshot(self, task): task_id = task['ImportTaskId'].split('-')[-1] - # Make an AMI to simulate the import finishing - reservation = self.ec2_client.run_instances( - ImageId="ami-12c6146b", MinCount=1, MaxCount=1) - instance = reservation["Instances"][0] - instance_id = instance["InstanceId"] - - response = self.ec2_client.create_image( - InstanceId=instance_id, - Name=f'import-ami-{task_id}', - ) - - image_id = response["ImageId"] - self.ec2_client.describe_images(ImageIds=[image_id])["Images"][0] - + # Make a Volume to simulate the import finishing volume = self.ec2_client.create_volume( Size=80, AvailabilityZone='us-west-2') snap_id = self.ec2_client.create_snapshot( VolumeId=volume['VolumeId'], - Description=f'imported volume import-ami-{task_id}', )["SnapshotId"] - t2 = make_stage_2(task_id, image_id, snap_id, task) + t2 = make_stage_2(task_id, snap_id, task) self.tasks[task_id] = t2 - return (image_id, snap_id) + return snap_id + + def change_snapshot_id(self, task, snapshot_id): + # Given a task, update its snapshot id; the moto + # register_image mock doesn't honor the snapshot_id we pass + # in. + task_id = task['ImportTaskId'].split('-')[-1] + self.tasks[task_id]['SnapshotTaskDetail']['SnapshotId'] = snapshot_id def get_paginator(self, name): - if name == 'describe_import_image_tasks': + if name == 'describe_import_snapshot_tasks': return ImportTaskPaginator(self) raise NotImplementedError() diff --git a/nodepool/tests/unit/test_driver_aws.py b/nodepool/tests/unit/test_driver_aws.py index aa6da341a..aea273dad 100644 --- a/nodepool/tests/unit/test_driver_aws.py +++ b/nodepool/tests/unit/test_driver_aws.py @@ -103,6 +103,9 @@ class TestDriverAws(tests.DBTestCase): Description='Zuul Nodes') self.security_group_id = self.security_group['GroupId'] self.patch(nodepool.driver.statemachine, 'nodescan', fake_nodescan) + self.patch(AwsAdapter, '_import_snapshot', + self.fake_aws.import_snapshot) + self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator) def tearDown(self): self.mock_ec2.stop() @@ -526,8 +529,6 @@ class TestDriverAws(tests.DBTestCase): self.assertTrue(response['EbsOptimized']['Value']) def test_aws_diskimage(self): - self.patch(AwsAdapter, '_import_image', self.fake_aws.import_image) - self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator) configfile = self.setup_config('aws/diskimage.yaml') self.useBuilder(configfile) @@ -558,8 +559,6 @@ class TestDriverAws(tests.DBTestCase): {'key1': 'value1', 'key2': 'value2'}) def test_aws_diskimage_removal(self): - self.patch(AwsAdapter, '_import_image', self.fake_aws.import_image) - self.patch(AwsAdapter, '_get_paginator', self.fake_aws.get_paginator) configfile = self.setup_config('aws/diskimage.yaml') self.useBuilder(configfile) self.waitForImage('ec2-us-west-2', 'fake-image') @@ -601,19 +600,42 @@ class TestDriverAws(tests.DBTestCase): ) instance_id = reservation['Instances'][0]['InstanceId'] - task = self.fake_aws.import_image( - DiskContainers=[{ + task = self.fake_aws.import_snapshot( + DiskContainer={ 'Format': 'ova', 'UserBucket': { 'S3Bucket': 'nodepool', 'S3Key': 'testfile', } - }], + }, TagSpecifications=[{ - 'ResourceType': 'import-image-task', + 'ResourceType': 'import-snapshot-task', 'Tags': image_tags, }]) - image_id, snapshot_id = self.fake_aws.finish_import_image(task) + snapshot_id = self.fake_aws.finish_import_snapshot(task) + + register_response = self.ec2_client.register_image( + Architecture='amd64', + BlockDeviceMappings=[ + { + 'DeviceName': '/dev/sda1', + 'Ebs': { + 'DeleteOnTermination': True, + 'SnapshotId': snapshot_id, + 'VolumeSize': 20, + 'VolumeType': 'gp2', + }, + }, + ], + RootDeviceName='/dev/sda1', + VirtualizationType='hvm', + Name='testimage', + ) + image_id = register_response['ImageId'] + + ami = self.ec2.Image(image_id) + new_snapshot_id = ami.block_device_mappings[0]['Ebs']['SnapshotId'] + self.fake_aws.change_snapshot_id(task, new_snapshot_id) # Note that the resulting image and snapshot do not have tags # applied, so we test the automatic retagging methods in the @@ -684,7 +706,7 @@ class TestDriverAws(tests.DBTestCase): break for _ in iterate_timeout(30, Exception, 'snapshot deletion'): - snap = self.ec2.Snapshot(snapshot_id) + snap = self.ec2.Snapshot(new_snapshot_id) try: if snap.state == 'deleted': break