Merge "Add EBS direct image upload to AWS"
This commit is contained in:
commit
3678ddefdd
@ -5,10 +5,10 @@
|
|||||||
AWS Driver
|
AWS Driver
|
||||||
----------
|
----------
|
||||||
|
|
||||||
If using the AWS driver to upload diskimages, see
|
If using the AWS driver to upload diskimages, see `VM Import/Export
|
||||||
`VM Import/Export service role`_ for information on configuring
|
service role`_ for information on configuring the required permissions
|
||||||
the required permissions in AWS. You must also create an S3 Bucket
|
in AWS. You must also create an S3 Bucket for use by Nodepool if
|
||||||
for use by Nodepool.
|
uploading images (except when using the ebs-direct upload method).
|
||||||
|
|
||||||
Selecting the ``aws`` driver adds the following options to the
|
Selecting the ``aws`` driver adds the following options to the
|
||||||
:attr:`providers` section of the configuration.
|
:attr:`providers` section of the configuration.
|
||||||
@ -419,8 +419,9 @@ Selecting the ``aws`` driver adds the following options to the
|
|||||||
|
|
||||||
The root `EBS volume type`_ for the image.
|
The root `EBS volume type`_ for the image.
|
||||||
Only used with the
|
Only used with the
|
||||||
:value:`providers.[aws].diskimages.import-method.snapshot`
|
:value:`providers.[aws].diskimages.import-method.snapshot` or
|
||||||
import method.
|
:value:`providers.[aws].diskimages.import-method.ebs-direct`
|
||||||
|
import methods.
|
||||||
|
|
||||||
.. attr:: volume-size
|
.. attr:: volume-size
|
||||||
:type: int
|
:type: int
|
||||||
@ -428,8 +429,9 @@ Selecting the ``aws`` driver adds the following options to the
|
|||||||
The size of the root EBS volume, in GiB, for the image. If
|
The size of the root EBS volume, in GiB, for the image. If
|
||||||
omitted, the volume size reported for the imported snapshot
|
omitted, the volume size reported for the imported snapshot
|
||||||
will be used. Only used with the
|
will be used. Only used with the
|
||||||
:value:`providers.[aws].diskimages.import-method.snapshot`
|
:value:`providers.[aws].diskimages.import-method.snapshot` or
|
||||||
import method.
|
:value:`providers.[aws].diskimages.import-method.ebs-direct`
|
||||||
|
import methods.
|
||||||
|
|
||||||
.. attr:: imds-support
|
.. attr:: imds-support
|
||||||
:type: str
|
:type: str
|
||||||
@ -437,8 +439,9 @@ Selecting the ``aws`` driver adds the following options to the
|
|||||||
To enforce usage of IMDSv2 by default on instances created
|
To enforce usage of IMDSv2 by default on instances created
|
||||||
from the image, set this value to `v2.0`. If omitted, IMDSv2
|
from the image, set this value to `v2.0`. If omitted, IMDSv2
|
||||||
is optional by default. This is only supported using the
|
is optional by default. This is only supported using the
|
||||||
:value:`providers.[aws].diskimages.import-method.snapshot`
|
:value:`providers.[aws].diskimages.import-method.snapshot` or
|
||||||
import method.
|
:value:`providers.[aws].diskimages.import-method.ebs-direct`
|
||||||
|
import methods.
|
||||||
|
|
||||||
.. attr:: import-method
|
.. attr:: import-method
|
||||||
:default: snapshot
|
:default: snapshot
|
||||||
@ -455,6 +458,12 @@ Selecting the ``aws`` driver adds the following options to the
|
|||||||
operating systems which require special licensing or other
|
operating systems which require special licensing or other
|
||||||
metadata in AWS.
|
metadata in AWS.
|
||||||
|
|
||||||
|
.. value:: ebs-direct
|
||||||
|
|
||||||
|
This is similar to the `snapshot` method, but uses the
|
||||||
|
`EBS direct API`_ instead of S3. This may be faster and
|
||||||
|
more efficient, but it may incur additional costs.
|
||||||
|
|
||||||
.. value:: image
|
.. value:: image
|
||||||
|
|
||||||
This method uploads the image file to AWS and performs an
|
This method uploads the image file to AWS and performs an
|
||||||
@ -865,3 +874,4 @@ Selecting the ``aws`` driver adds the following options to the
|
|||||||
.. _`VM Import/Export service role`: https://docs.aws.amazon.com/vm-import/latest/userguide/vmie_prereqs.html#vmimport-role
|
.. _`VM Import/Export service role`: https://docs.aws.amazon.com/vm-import/latest/userguide/vmie_prereqs.html#vmimport-role
|
||||||
.. _`instance quotas`: https://us-west-1.console.aws.amazon.com/servicequotas/home/services/ec2/quotas
|
.. _`instance quotas`: https://us-west-1.console.aws.amazon.com/servicequotas/home/services/ec2/quotas
|
||||||
.. _`AWS RegisterImage API documentation`: https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_RegisterImage.html
|
.. _`AWS RegisterImage API documentation`: https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_RegisterImage.html
|
||||||
|
.. _`EBS direct API`: https://docs.aws.amazon.com/ebs/latest/userguide/ebs-accessing-snapshot.html
|
||||||
|
@ -13,24 +13,27 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
import base64
|
||||||
import cachetools.func
|
import cachetools.func
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
import copy
|
import copy
|
||||||
import functools
|
import functools
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import math
|
import math
|
||||||
|
import queue
|
||||||
import re
|
import re
|
||||||
import threading
|
import threading
|
||||||
import queue
|
|
||||||
import time
|
import time
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
from nodepool.driver.utils import (
|
from nodepool.driver.utils import (
|
||||||
QuotaInformation,
|
QuotaInformation,
|
||||||
LazyExecutorTTLCache,
|
LazyExecutorTTLCache,
|
||||||
RateLimiter,
|
RateLimiter,
|
||||||
|
ImageUploader,
|
||||||
)
|
)
|
||||||
from nodepool.driver import statemachine
|
from nodepool.driver import statemachine
|
||||||
from nodepool import exceptions
|
from nodepool import exceptions
|
||||||
@ -215,6 +218,8 @@ CACHE_TTL = 10
|
|||||||
SERVICE_QUOTA_CACHE_TTL = 300
|
SERVICE_QUOTA_CACHE_TTL = 300
|
||||||
ON_DEMAND = 0
|
ON_DEMAND = 0
|
||||||
SPOT = 1
|
SPOT = 1
|
||||||
|
KIB = 1024
|
||||||
|
GIB = 1024 ** 3
|
||||||
|
|
||||||
|
|
||||||
class AwsInstance(statemachine.Instance):
|
class AwsInstance(statemachine.Instance):
|
||||||
@ -419,6 +424,100 @@ class AwsCreateStateMachine(statemachine.StateMachine):
|
|||||||
self.host, self.quota)
|
self.host, self.quota)
|
||||||
|
|
||||||
|
|
||||||
|
class EBSSnapshotUploader(ImageUploader):
|
||||||
|
segment_size = 512 * KIB
|
||||||
|
|
||||||
|
def __init__(self, *args, **kw):
|
||||||
|
super().__init__(*args, **kw)
|
||||||
|
self.segment_count = 0
|
||||||
|
|
||||||
|
def shouldRetryException(self, exception):
|
||||||
|
# Strictly speaking, ValidationException is only retryable
|
||||||
|
# if we get a particular message, but that's impractical
|
||||||
|
# to reproduce for testing.
|
||||||
|
# https://docs.aws.amazon.com/ebs/latest/userguide/error-retries.html
|
||||||
|
ex = self.adapter.ebs_client.exceptions
|
||||||
|
if isinstance(exception, (
|
||||||
|
ex.RequestThrottledException,
|
||||||
|
ex.InternalServerException,
|
||||||
|
ex.ValidationException,
|
||||||
|
)):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _rateLimited(self, func):
|
||||||
|
def rateLimitedFunc(*args, **kw):
|
||||||
|
with self.adapter.rate_limiter:
|
||||||
|
return func(*args, **kw)
|
||||||
|
return rateLimitedFunc
|
||||||
|
|
||||||
|
def uploadSegment(self, segment):
|
||||||
|
# There is a default limit of 1000 put requests/second.
|
||||||
|
# Actual value is available as a service quota. We don't
|
||||||
|
# expect to hit this. If we do, and we need to rate-limit, we
|
||||||
|
# will need to coordinate with other builders.
|
||||||
|
# https://docs.aws.amazon.com/ebs/latest/userguide/ebs-resource-quotas.html
|
||||||
|
data = segment.data
|
||||||
|
if len(data) < self.segment_size:
|
||||||
|
# Add zeros if the last block is smaller since the
|
||||||
|
# block size in AWS is constant.
|
||||||
|
data = data.ljust(self.segment_size, b'\0')
|
||||||
|
checksum = hashlib.sha256(data)
|
||||||
|
checksum_base64 = base64.b64encode(checksum.digest()).decode('utf-8')
|
||||||
|
|
||||||
|
response = self.retry(
|
||||||
|
self.adapter.ebs_client.put_snapshot_block,
|
||||||
|
SnapshotId=self.snapshot_id,
|
||||||
|
BlockIndex=segment.index,
|
||||||
|
BlockData=data,
|
||||||
|
DataLength=len(data),
|
||||||
|
Checksum=checksum_base64,
|
||||||
|
ChecksumAlgorithm='SHA256',
|
||||||
|
)
|
||||||
|
if (response['Checksum'] != checksum_base64):
|
||||||
|
raise Exception("Checksums do not match; received "
|
||||||
|
f"{response['Checksum']} expected {checksum}")
|
||||||
|
self.segment_count += 1
|
||||||
|
|
||||||
|
def startUpload(self):
|
||||||
|
# This is used by AWS to ensure idempotency across retries
|
||||||
|
token = uuid4().hex
|
||||||
|
# Volume size is in GiB
|
||||||
|
size = math.ceil(self.size / GIB)
|
||||||
|
response = self.retry(
|
||||||
|
self._rateLimited(self.adapter.ebs_client.start_snapshot),
|
||||||
|
VolumeSize=size,
|
||||||
|
ClientToken=token,
|
||||||
|
Tags=tag_dict_to_list(self.metadata),
|
||||||
|
)
|
||||||
|
self.snapshot_id = response['SnapshotId']
|
||||||
|
|
||||||
|
def finishUpload(self):
|
||||||
|
while True:
|
||||||
|
response = self.retry(
|
||||||
|
self._rateLimited(self.adapter.ebs_client.complete_snapshot),
|
||||||
|
SnapshotId=self.snapshot_id,
|
||||||
|
ChangedBlocksCount=self.segment_count,
|
||||||
|
)
|
||||||
|
if response['Status'] == 'error':
|
||||||
|
raise Exception("Snapshot in error state")
|
||||||
|
if response['Status'] == 'completed':
|
||||||
|
break
|
||||||
|
self.checkTimeout()
|
||||||
|
return self.size, self.snapshot_id
|
||||||
|
|
||||||
|
def abortUpload(self):
|
||||||
|
try:
|
||||||
|
self.finishUpload()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
with self.adapter.rate_limiter:
|
||||||
|
snapshot_id = getattr(self, 'snapshot_id', None)
|
||||||
|
if snapshot_id:
|
||||||
|
self.adapter.ec2_client.delete_snapshot(
|
||||||
|
SnapshotId=self.snapshot_id)
|
||||||
|
|
||||||
|
|
||||||
class AwsAdapter(statemachine.Adapter):
|
class AwsAdapter(statemachine.Adapter):
|
||||||
IMAGE_UPLOAD_SLEEP = 30
|
IMAGE_UPLOAD_SLEEP = 30
|
||||||
LAUNCH_TEMPLATE_PREFIX = 'nodepool-launch-template'
|
LAUNCH_TEMPLATE_PREFIX = 'nodepool-launch-template'
|
||||||
@ -493,6 +592,7 @@ class AwsAdapter(statemachine.Adapter):
|
|||||||
self.s3 = self.aws.resource('s3')
|
self.s3 = self.aws.resource('s3')
|
||||||
self.s3_client = self.aws.client('s3')
|
self.s3_client = self.aws.client('s3')
|
||||||
self.aws_quotas = self.aws.client("service-quotas")
|
self.aws_quotas = self.aws.client("service-quotas")
|
||||||
|
self.ebs_client = self.aws.client('ebs')
|
||||||
|
|
||||||
workers = 10
|
workers = 10
|
||||||
self.log.info("Create executor with max workers=%s", workers)
|
self.log.info("Create executor with max workers=%s", workers)
|
||||||
@ -772,34 +872,93 @@ class AwsAdapter(statemachine.Adapter):
|
|||||||
image_format, metadata, md5, sha256):
|
image_format, metadata, md5, sha256):
|
||||||
self.log.debug(f"Uploading image {image_name}")
|
self.log.debug(f"Uploading image {image_name}")
|
||||||
|
|
||||||
# Upload image to S3
|
|
||||||
bucket_name = self.provider.object_storage['bucket-name']
|
|
||||||
bucket = self.s3.Bucket(bucket_name)
|
|
||||||
object_filename = f'{image_name}.{image_format}'
|
|
||||||
extra_args = {'Tagging': urllib.parse.urlencode(metadata)}
|
|
||||||
|
|
||||||
# There is no IMDS support option for the import_image call
|
# There is no IMDS support option for the import_image call
|
||||||
if (provider_image.import_method == 'image' and
|
if (provider_image.import_method == 'image' and
|
||||||
provider_image.imds_support == 'v2.0'):
|
provider_image.imds_support == 'v2.0'):
|
||||||
raise Exception("IMDSv2 requires 'snapshot' import method")
|
raise Exception("IMDSv2 requires 'snapshot' import method")
|
||||||
|
|
||||||
with open(filename, "rb") as fobj:
|
if provider_image.import_method != 'ebs-direct':
|
||||||
with self.rate_limiter:
|
# Upload image to S3
|
||||||
bucket.upload_fileobj(fobj, object_filename,
|
bucket_name = self.provider.object_storage['bucket-name']
|
||||||
ExtraArgs=extra_args)
|
bucket = self.s3.Bucket(bucket_name)
|
||||||
|
object_filename = f'{image_name}.{image_format}'
|
||||||
|
extra_args = {'Tagging': urllib.parse.urlencode(metadata)}
|
||||||
|
|
||||||
|
with open(filename, "rb") as fobj:
|
||||||
|
with self.rate_limiter:
|
||||||
|
bucket.upload_fileobj(fobj, object_filename,
|
||||||
|
ExtraArgs=extra_args)
|
||||||
|
|
||||||
if provider_image.import_method == 'image':
|
if provider_image.import_method == 'image':
|
||||||
image_id = self._uploadImageImage(
|
image_id = self._uploadImageImage(
|
||||||
provider_image, image_name, filename,
|
provider_image, image_name, filename,
|
||||||
image_format, metadata, md5, sha256,
|
image_format, metadata, md5, sha256,
|
||||||
bucket_name, object_filename)
|
bucket_name, object_filename)
|
||||||
else:
|
elif provider_image.import_method == 'snapshot':
|
||||||
image_id = self._uploadImageSnapshot(
|
image_id = self._uploadImageSnapshot(
|
||||||
provider_image, image_name, filename,
|
provider_image, image_name, filename,
|
||||||
image_format, metadata, md5, sha256,
|
image_format, metadata, md5, sha256,
|
||||||
bucket_name, object_filename)
|
bucket_name, object_filename)
|
||||||
|
elif provider_image.import_method == 'ebs-direct':
|
||||||
|
image_id = self._uploadImageSnapshotEBS(
|
||||||
|
provider_image, image_name, filename,
|
||||||
|
image_format, metadata)
|
||||||
|
else:
|
||||||
|
raise Exception("Unknown image import method")
|
||||||
return image_id
|
return image_id
|
||||||
|
|
||||||
|
def _registerImage(self, provider_image, image_name, metadata,
|
||||||
|
volume_size, snapshot_id):
|
||||||
|
# Register the snapshot as an AMI
|
||||||
|
with self.rate_limiter:
|
||||||
|
bdm = {
|
||||||
|
'DeviceName': '/dev/sda1',
|
||||||
|
'Ebs': {
|
||||||
|
'DeleteOnTermination': True,
|
||||||
|
'SnapshotId': snapshot_id,
|
||||||
|
'VolumeSize': volume_size,
|
||||||
|
'VolumeType': provider_image.volume_type,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if provider_image.iops:
|
||||||
|
bdm['Ebs']['Iops'] = provider_image.iops
|
||||||
|
if provider_image.throughput:
|
||||||
|
bdm['Ebs']['Throughput'] = provider_image.throughput
|
||||||
|
|
||||||
|
args = dict(
|
||||||
|
Architecture=provider_image.architecture,
|
||||||
|
BlockDeviceMappings=[bdm],
|
||||||
|
RootDeviceName='/dev/sda1',
|
||||||
|
VirtualizationType='hvm',
|
||||||
|
EnaSupport=provider_image.ena_support,
|
||||||
|
Name=image_name,
|
||||||
|
TagSpecifications=[
|
||||||
|
{
|
||||||
|
'ResourceType': 'image',
|
||||||
|
'Tags': tag_dict_to_list(metadata),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if provider_image.imds_support == 'v2.0':
|
||||||
|
args['ImdsSupport'] = 'v2.0'
|
||||||
|
return self.ec2_client.register_image(**args)
|
||||||
|
|
||||||
|
def _uploadImageSnapshotEBS(self, provider_image, image_name, filename,
|
||||||
|
image_format, metadata):
|
||||||
|
# Import snapshot
|
||||||
|
uploader = EBSSnapshotUploader(self, self.log, filename, metadata)
|
||||||
|
self.log.debug(f"Importing {image_name} as EBS snapshot")
|
||||||
|
volume_size, snapshot_id = uploader.upload(
|
||||||
|
self.provider.image_import_timeout)
|
||||||
|
|
||||||
|
register_response = self._registerImage(
|
||||||
|
provider_image, image_name, metadata, volume_size, snapshot_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.log.debug(f"Upload of {image_name} complete as "
|
||||||
|
f"{register_response['ImageId']}")
|
||||||
|
return register_response['ImageId']
|
||||||
|
|
||||||
def _uploadImageSnapshot(self, provider_image, image_name, filename,
|
def _uploadImageSnapshot(self, provider_image, image_name, filename,
|
||||||
image_format, metadata, md5, sha256,
|
image_format, metadata, md5, sha256,
|
||||||
bucket_name, object_filename):
|
bucket_name, object_filename):
|
||||||
@ -872,43 +1031,10 @@ class AwsAdapter(statemachine.Adapter):
|
|||||||
self.log.exception("Error tagging snapshot:")
|
self.log.exception("Error tagging snapshot:")
|
||||||
|
|
||||||
volume_size = provider_image.volume_size or snap['VolumeSize']
|
volume_size = provider_image.volume_size or snap['VolumeSize']
|
||||||
# Register the snapshot as an AMI
|
snapshot_id = task['SnapshotTaskDetail']['SnapshotId']
|
||||||
with self.rate_limiter:
|
register_response = self._registerImage(
|
||||||
bdm = {
|
provider_image, image_name, metadata, volume_size, snapshot_id,
|
||||||
'DeviceName': '/dev/sda1',
|
)
|
||||||
'Ebs': {
|
|
||||||
'DeleteOnTermination': True,
|
|
||||||
'SnapshotId': task[
|
|
||||||
'SnapshotTaskDetail']['SnapshotId'],
|
|
||||||
'VolumeSize': volume_size,
|
|
||||||
'VolumeType': provider_image.volume_type,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if provider_image.iops:
|
|
||||||
bdm['Ebs']['Iops'] = provider_image.iops
|
|
||||||
if provider_image.throughput:
|
|
||||||
bdm['Ebs']['Throughput'] = provider_image.throughput
|
|
||||||
|
|
||||||
args = dict(
|
|
||||||
Architecture=provider_image.architecture,
|
|
||||||
BlockDeviceMappings=[bdm],
|
|
||||||
RootDeviceName='/dev/sda1',
|
|
||||||
VirtualizationType='hvm',
|
|
||||||
EnaSupport=provider_image.ena_support,
|
|
||||||
Name=image_name,
|
|
||||||
)
|
|
||||||
if provider_image.imds_support == 'v2.0':
|
|
||||||
args['ImdsSupport'] = 'v2.0'
|
|
||||||
register_response = self.ec2_client.register_image(**args)
|
|
||||||
|
|
||||||
# Tag the AMI
|
|
||||||
try:
|
|
||||||
with self.rate_limiter:
|
|
||||||
self.ec2_client.create_tags(
|
|
||||||
Resources=[register_response['ImageId']],
|
|
||||||
Tags=task['Tags'])
|
|
||||||
except Exception:
|
|
||||||
self.log.exception("Error tagging AMI:")
|
|
||||||
|
|
||||||
self.log.debug(f"Upload of {image_name} complete as "
|
self.log.debug(f"Upload of {image_name} complete as "
|
||||||
f"{register_response['ImageId']}")
|
f"{register_response['ImageId']}")
|
||||||
|
@ -107,8 +107,9 @@ class AwsProviderDiskImage(ConfigValue):
|
|||||||
self.import_method = image.get('import-method', 'snapshot')
|
self.import_method = image.get('import-method', 'snapshot')
|
||||||
self.imds_support = image.get('imds-support', None)
|
self.imds_support = image.get('imds-support', None)
|
||||||
if (self.imds_support == 'v2.0' and
|
if (self.imds_support == 'v2.0' and
|
||||||
self.import_method != 'snapshot'):
|
self.import_method == 'image'):
|
||||||
raise Exception("IMDSv2 requires 'snapshot' import method")
|
raise Exception("IMDSv2 requires 'snapshot' or 'ebs-direct' "
|
||||||
|
"import method")
|
||||||
self.iops = image.get('iops', None)
|
self.iops = image.get('iops', None)
|
||||||
self.throughput = image.get('throughput', None)
|
self.throughput = image.get('throughput', None)
|
||||||
|
|
||||||
@ -131,7 +132,7 @@ class AwsProviderDiskImage(ConfigValue):
|
|||||||
'ena-support': bool,
|
'ena-support': bool,
|
||||||
'volume-size': int,
|
'volume-size': int,
|
||||||
'volume-type': str,
|
'volume-type': str,
|
||||||
'import-method': v.Any('snapshot', 'image'),
|
'import-method': v.Any('snapshot', 'ebs-direct', 'image'),
|
||||||
'imds-support': v.Any('v2.0', None),
|
'imds-support': v.Any('v2.0', None),
|
||||||
'iops': int,
|
'iops': int,
|
||||||
'throughput': int,
|
'throughput': int,
|
||||||
|
@ -16,9 +16,11 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
|
import concurrent.futures
|
||||||
import copy
|
import copy
|
||||||
import logging
|
import logging
|
||||||
import math
|
import math
|
||||||
|
import os
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
@ -548,3 +550,114 @@ class LazyExecutorTTLCache:
|
|||||||
self.last_time = time.monotonic()
|
self.last_time = time.monotonic()
|
||||||
return self.last_value
|
return self.last_value
|
||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
class Segment:
|
||||||
|
def __init__(self, index, offset, data):
|
||||||
|
self.index = index
|
||||||
|
self.offset = offset
|
||||||
|
self.data = data
|
||||||
|
|
||||||
|
|
||||||
|
class ImageUploader:
|
||||||
|
"""
|
||||||
|
A helper class for drivers that upload large images in chunks.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# These values probably don't need to be changed
|
||||||
|
error_retries = 3
|
||||||
|
concurrency = 10
|
||||||
|
|
||||||
|
# Subclasses must implement these
|
||||||
|
segment_size = None
|
||||||
|
|
||||||
|
def __init__(self, adapter, log, path, metadata):
|
||||||
|
if self.segment_size is None:
|
||||||
|
raise Exception("Subclass must set block size")
|
||||||
|
self.adapter = adapter
|
||||||
|
self.log = log
|
||||||
|
self.path = path
|
||||||
|
self.size = os.path.getsize(path)
|
||||||
|
self.metadata = metadata
|
||||||
|
self.timeout = None
|
||||||
|
|
||||||
|
def shouldRetryException(self, exception):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def uploadSegment(self, segment):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def startUpload(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def finishUpload(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def abortUpload(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Main API
|
||||||
|
def upload(self, timeout=None):
|
||||||
|
if timeout:
|
||||||
|
self.timeout = time.monotonic() + timeout
|
||||||
|
self.startUpload()
|
||||||
|
try:
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(
|
||||||
|
max_workers=self.concurrency) as executor:
|
||||||
|
with open(self.path, 'rb') as image_file:
|
||||||
|
self._uploadInner(executor, image_file)
|
||||||
|
return self.finishUpload()
|
||||||
|
except Exception:
|
||||||
|
self.log.exception("Error uploading image:")
|
||||||
|
self.abortUpload()
|
||||||
|
|
||||||
|
# Subclasses can use this helper method for wrapping retryable calls
|
||||||
|
def retry(self, func, *args, **kw):
|
||||||
|
for x in range(self.error_retries):
|
||||||
|
try:
|
||||||
|
return func(*args, **kw)
|
||||||
|
except Exception as e:
|
||||||
|
if not self.shouldRetryException(e):
|
||||||
|
raise
|
||||||
|
if x + 1 >= self.error_retries:
|
||||||
|
raise
|
||||||
|
time.sleep(2 * x)
|
||||||
|
|
||||||
|
def getTimeout(self):
|
||||||
|
if self.timeout is None:
|
||||||
|
return None
|
||||||
|
return self.timeout - time.monotonic()
|
||||||
|
|
||||||
|
def checkTimeout(self):
|
||||||
|
if self.timeout is None:
|
||||||
|
return
|
||||||
|
if self.getTimeout() < 0:
|
||||||
|
raise Exception("Timed out uploading image")
|
||||||
|
|
||||||
|
# Internal methods
|
||||||
|
def _uploadInner(self, executor, image_file):
|
||||||
|
futures = set()
|
||||||
|
for index, offset in enumerate(range(0, self.size, self.segment_size)):
|
||||||
|
segment = Segment(index, offset,
|
||||||
|
image_file.read(self.segment_size))
|
||||||
|
future = executor.submit(self.uploadSegment, segment)
|
||||||
|
futures.add(future)
|
||||||
|
# Keep the pool of workers supplied with data but without
|
||||||
|
# reading the entire file into memory.
|
||||||
|
if len(futures) >= (self.concurrency * 2):
|
||||||
|
(done, futures) = concurrent.futures.wait(
|
||||||
|
futures,
|
||||||
|
timeout=self.getTimeout(),
|
||||||
|
return_when=concurrent.futures.FIRST_COMPLETED)
|
||||||
|
for future in done:
|
||||||
|
future.result()
|
||||||
|
# Only check the timeout after waiting (not every pass
|
||||||
|
# through the loop)
|
||||||
|
self.checkTimeout()
|
||||||
|
# We're done reading the file, wait for all uploads to finish
|
||||||
|
(done, futures) = concurrent.futures.wait(
|
||||||
|
futures,
|
||||||
|
timeout=self.getTimeout())
|
||||||
|
for future in done:
|
||||||
|
future.result()
|
||||||
|
self.checkTimeout()
|
||||||
|
70
nodepool/tests/fixtures/aws/diskimage-imdsv2-ebs-snapshot.yaml
vendored
Normal file
70
nodepool/tests/fixtures/aws/diskimage-imdsv2-ebs-snapshot.yaml
vendored
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
elements-dir: .
|
||||||
|
images-dir: '{images_dir}'
|
||||||
|
build-log-dir: '{build_log_dir}'
|
||||||
|
build-log-retention: 1
|
||||||
|
|
||||||
|
zookeeper-servers:
|
||||||
|
- host: {zookeeper_host}
|
||||||
|
port: {zookeeper_port}
|
||||||
|
chroot: {zookeeper_chroot}
|
||||||
|
|
||||||
|
zookeeper-tls:
|
||||||
|
ca: {zookeeper_ca}
|
||||||
|
cert: {zookeeper_cert}
|
||||||
|
key: {zookeeper_key}
|
||||||
|
|
||||||
|
tenant-resource-limits:
|
||||||
|
- tenant-name: tenant-1
|
||||||
|
max-cores: 1024
|
||||||
|
|
||||||
|
labels:
|
||||||
|
- name: diskimage
|
||||||
|
|
||||||
|
providers:
|
||||||
|
- name: ec2-us-west-2
|
||||||
|
driver: aws
|
||||||
|
rate: 2
|
||||||
|
region-name: us-west-2
|
||||||
|
object-storage:
|
||||||
|
bucket-name: nodepool
|
||||||
|
image-import-timeout: 60
|
||||||
|
diskimages:
|
||||||
|
- name: fake-image
|
||||||
|
tags:
|
||||||
|
provider_metadata: provider
|
||||||
|
import-method: ebs-direct
|
||||||
|
volume-type: gp3
|
||||||
|
iops: 1000
|
||||||
|
throughput: 100
|
||||||
|
imds-support: v2.0
|
||||||
|
pools:
|
||||||
|
- name: main
|
||||||
|
max-servers: 1
|
||||||
|
subnet-id: {subnet_id}
|
||||||
|
security-group-id: {security_group_id}
|
||||||
|
node-attributes:
|
||||||
|
key1: value1
|
||||||
|
key2: value2
|
||||||
|
labels:
|
||||||
|
- name: diskimage
|
||||||
|
diskimage: fake-image
|
||||||
|
instance-type: t3.medium
|
||||||
|
key-name: zuul
|
||||||
|
iops: 2000
|
||||||
|
throughput: 200
|
||||||
|
|
||||||
|
diskimages:
|
||||||
|
- name: fake-image
|
||||||
|
elements:
|
||||||
|
- fedora-minimal
|
||||||
|
- vm
|
||||||
|
release: 21
|
||||||
|
dib-cmd: nodepool/tests/fake-image-create
|
||||||
|
env-vars:
|
||||||
|
TMPDIR: /opt/dib_tmp
|
||||||
|
DIB_IMAGE_CACHE: /opt/dib_cache
|
||||||
|
DIB_CLOUD_IMAGES: http://download.fedoraproject.org/pub/fedora/linux/releases/test/21-Beta/Cloud/Images/x86_64/
|
||||||
|
BASE_IMAGE_FILE: Fedora-Cloud-Base-20141029-21_Beta.x86_64.qcow2
|
||||||
|
metadata:
|
||||||
|
diskimage_metadata: diskimage
|
||||||
|
username: another_user
|
@ -849,10 +849,12 @@ class TestDriverAws(tests.DBTestCase):
|
|||||||
ec2_image = self.ec2.Image(image.external_id)
|
ec2_image = self.ec2.Image(image.external_id)
|
||||||
self.assertEqual(ec2_image.state, 'available')
|
self.assertEqual(ec2_image.state, 'available')
|
||||||
self.assertFalse('ImdsSupport' in self.register_image_calls[0])
|
self.assertFalse('ImdsSupport' in self.register_image_calls[0])
|
||||||
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
|
# As of 2024-07-09, moto does not set tags, but AWS itself does.
|
||||||
in ec2_image.tags)
|
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
|
||||||
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
|
self.assertIn(
|
||||||
in ec2_image.tags)
|
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
|
||||||
|
self.assertIn(
|
||||||
|
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
|
||||||
|
|
||||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||||
self.startPool(pool)
|
self.startPool(pool)
|
||||||
@ -939,10 +941,12 @@ class TestDriverAws(tests.DBTestCase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
|
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
|
||||||
|
|
||||||
self.assertTrue({'Key': 'diskimage_metadata', 'Value': 'diskimage'}
|
# As of 2024-07-09, moto does not set tags, but AWS itself does.
|
||||||
in ec2_image.tags)
|
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
|
||||||
self.assertTrue({'Key': 'provider_metadata', 'Value': 'provider'}
|
self.assertIn(
|
||||||
in ec2_image.tags)
|
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
|
||||||
|
self.assertIn(
|
||||||
|
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
|
||||||
|
|
||||||
pool = self.useNodepool(configfile, watermark_sleep=1)
|
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||||
self.startPool(pool)
|
self.startPool(pool)
|
||||||
@ -979,6 +983,56 @@ class TestDriverAws(tests.DBTestCase):
|
|||||||
with testtools.ExpectedException(Exception, "IMDSv2 requires"):
|
with testtools.ExpectedException(Exception, "IMDSv2 requires"):
|
||||||
self.useBuilder(configfile)
|
self.useBuilder(configfile)
|
||||||
|
|
||||||
|
def test_aws_diskimage_ebs_snapshot_imdsv2(self):
|
||||||
|
self.fake_aws.fail_import_count = 1
|
||||||
|
configfile = self.setup_config(
|
||||||
|
'aws/diskimage-imdsv2-ebs-snapshot.yaml')
|
||||||
|
|
||||||
|
self.useBuilder(configfile)
|
||||||
|
|
||||||
|
image = self.waitForImage('ec2-us-west-2', 'fake-image')
|
||||||
|
self.assertEqual(image.username, 'another_user')
|
||||||
|
|
||||||
|
ec2_image = self.ec2.Image(image.external_id)
|
||||||
|
self.assertEqual(ec2_image.state, 'available')
|
||||||
|
self.assertEqual(
|
||||||
|
self.register_image_calls[0]['ImdsSupport'], 'v2.0')
|
||||||
|
|
||||||
|
# As of 2024-07-09, moto does not set tags, but AWS itself does.
|
||||||
|
tags = self.register_image_calls[0]['TagSpecifications'][0]['Tags']
|
||||||
|
self.assertIn(
|
||||||
|
{'Key': 'diskimage_metadata', 'Value': 'diskimage'}, tags)
|
||||||
|
self.assertIn(
|
||||||
|
{'Key': 'provider_metadata', 'Value': 'provider'}, tags)
|
||||||
|
|
||||||
|
pool = self.useNodepool(configfile, watermark_sleep=1)
|
||||||
|
self.startPool(pool)
|
||||||
|
|
||||||
|
req = zk.NodeRequest()
|
||||||
|
req.state = zk.REQUESTED
|
||||||
|
req.node_types.append('diskimage')
|
||||||
|
|
||||||
|
self.zk.storeNodeRequest(req)
|
||||||
|
req = self.waitForNodeRequest(req)
|
||||||
|
|
||||||
|
self.assertEqual(req.state, zk.FULFILLED)
|
||||||
|
self.assertNotEqual(req.nodes, [])
|
||||||
|
node = self.zk.getNode(req.nodes[0])
|
||||||
|
self.assertEqual(node.allocated_to, req.id)
|
||||||
|
self.assertEqual(node.state, zk.READY)
|
||||||
|
self.assertIsNotNone(node.launcher)
|
||||||
|
self.assertEqual(node.connection_type, 'ssh')
|
||||||
|
self.assertEqual(node.shell_type, None)
|
||||||
|
self.assertEqual(node.username, 'another_user')
|
||||||
|
self.assertEqual(node.attributes,
|
||||||
|
{'key1': 'value1', 'key2': 'value2'})
|
||||||
|
self.assertEqual(
|
||||||
|
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
|
||||||
|
['Iops'], 2000)
|
||||||
|
self.assertEqual(
|
||||||
|
self.run_instances_calls[0]['BlockDeviceMappings'][0]['Ebs']
|
||||||
|
['Throughput'], 200)
|
||||||
|
|
||||||
def test_aws_diskimage_removal(self):
|
def test_aws_diskimage_removal(self):
|
||||||
configfile = self.setup_config('aws/diskimage.yaml')
|
configfile = self.setup_config('aws/diskimage.yaml')
|
||||||
self.useBuilder(configfile)
|
self.useBuilder(configfile)
|
||||||
|
6
releasenotes/notes/ebs-direct-59c1fb9f2b1465fb.yaml
Normal file
6
releasenotes/notes/ebs-direct-59c1fb9f2b1465fb.yaml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
The AWS driver now supports optionally uploading diskimages using
|
||||||
|
the EBS direct APIs. This may be faster and more efficient since
|
||||||
|
it bypasses S3, but it may incur additional costs.
|
@ -19,7 +19,7 @@ kazoo==2.9.0
|
|||||||
Paste
|
Paste
|
||||||
WebOb>=1.8.1
|
WebOb>=1.8.1
|
||||||
openshift>=0.13.1,<0.14.0
|
openshift>=0.13.1,<0.14.0
|
||||||
boto3>=1.20.0
|
boto3>=1.34.141
|
||||||
google-api-python-client
|
google-api-python-client
|
||||||
# botocore 1.23.0 (via boto3 1.20.0) requires urllib 1.26.0 or newer:
|
# botocore 1.23.0 (via boto3 1.20.0) requires urllib 1.26.0 or newer:
|
||||||
# https://github.com/boto/botocore/issues/2562
|
# https://github.com/boto/botocore/issues/2562
|
||||||
|
Loading…
x
Reference in New Issue
Block a user