Replace SwiftService with direct REST uploads

SwiftService uploads large objects using a thread pool. (The pool
defaults to 5 and we're not currently configuring it larger or smaller)
Instead of using that, spin up upload threads on our own so that we can
get rid of the swiftclient depend.

A few notes:
- We're using the new async feature of the Adapter wrapper, which rate
  limits at the _start_ of a REST call. This is sane as far as we can
  tell, but also might not be what someone is expecting.
- We'll skip the thread pool uploader for objects that are smaller than
  the default max segment size.
- In splitting the file into segments, we'd like to avoid reading all of
  the segments into RAM when we don't need to - so there is a file-like
  wrapper class which can be passed to requests. This implements a
  read-view of a portion of the file. In a pathological case, this could
  be slower due to disk seeking on the read side. However, let's go back
  and deal with buffering when we have a problem - I imagine that the
  REST upload will be the bottleneck long before the overhead of
  interleaved disk seeks will be.

Change-Id: Id9258980d2e0782e4e3c0ac26c7f11dc4db80354
This commit is contained in:
Monty Taylor 2016-12-29 07:37:06 -10:00
parent efe7b8d469
commit b7ea6c7150
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
12 changed files with 1067 additions and 84 deletions

View File

@ -0,0 +1,5 @@
---
upgrade:
- Removed swiftclient as a dependency. All swift operations
are now performed with direct REST calls using keystoneauth
Adapter.

View File

@ -17,7 +17,6 @@ python-cinderclient>=1.3.1
python-neutronclient>=2.3.10
python-troveclient>=1.2.0
python-ironicclient>=0.10.0
python-swiftclient>=2.5.0
python-heatclient>=1.0.0
python-designateclient>=2.1.0
python-magnumclient>=2.1.0

View File

@ -91,7 +91,15 @@ class ShadeAdapter(adapter.Adapter):
def _munch_response(self, response, result_key=None):
exc.raise_from_response(response)
# Glance image downloads just return the data in the body
if response.headers.get('Content-Type') == 'application/octet-stream':
if response.headers.get('Content-Type') in (
'text/plain',
'application/octet-stream'):
return response
elif response.headers.get('X-Static-Large-Object'):
# Workaround what seems to be a bug in swift where SLO objects
# return Content-Type application/json but contain
# application/octet-stream
# Bug filed: https://bugs.launchpad.net/swift/+bug/1658295
return response
else:
if not response.content:
@ -100,12 +108,12 @@ class ShadeAdapter(adapter.Adapter):
try:
result_json = response.json()
except Exception:
self.shade_logger.debug(
raise exc.OpenStackCloudHTTPError(
"Problems decoding json from response."
" Reponse: {code} {reason}".format(
code=response.status_code,
reason=response.reason))
raise
reason=response.reason),
response=response)
request_id = response.headers.get('x-openstack-request-id')
@ -154,4 +162,7 @@ class ShadeAdapter(adapter.Adapter):
return request_method(**self.args)
response = self.manager.submit_task(RequestTask(**kwargs))
return self._munch_response(response)
if run_async:
return response
else:
return self._munch_response(response)

View File

@ -493,11 +493,6 @@ class FloatingIPPoolList(task_manager.Task):
return client.nova_client.floating_ip_pools.list()
class ObjectCreate(task_manager.Task):
def main(self, client):
return client.swift_service.upload(**self.args)
class SubnetCreate(task_manager.Task):
def main(self, client):
return client.neutron_client.create_subnet(**self.args)

View File

@ -616,3 +616,40 @@ def generate_patches_from_kwargs(operation, **kwargs):
'path': '/%s' % k}
patches.append(patch)
return patches
class FileSegment(object):
"""File-like object to pass to requests."""
def __init__(self, filename, offset, length):
self.filename = filename
self.offset = offset
self.length = length
self.pos = 0
self._file = open(filename, 'rb')
self.seek(0)
def tell(self):
return self._file.tell() - self.offset
def seek(self, offset, whence=0):
if whence == 0:
self._file.seek(self.offset + offset, whence)
elif whence == 1:
self._file.seek(offset, whence)
elif whence == 2:
self._file.seek(self.offset + self.length - offset, 0)
def read(self, size=-1):
remaining = self.length - self.pos
if remaining <= 0:
return b''
to_read = remaining if size < 0 else min(size, remaining)
chunk = self._file.read(to_read)
self.pos += len(chunk)
return chunk
def reset(self):
self._file.seek(self.offset, 0)

View File

@ -10,6 +10,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import functools
import hashlib
import ipaddress
@ -40,7 +41,6 @@ import magnumclient.client
import neutronclient.neutron.client
import novaclient.client
import novaclient.exceptions as nova_exceptions
import swiftclient.service
import troveclient.client
import designateclient.client
@ -297,12 +297,6 @@ class OpenStackCloud(_normalize.Normalizer):
self._keystone_client = None
self._neutron_client = None
self._nova_client = None
self._swift_service = None
# Lock used to reset swift client. Since swift client does not
# support keystone sessions, we we have to make a new client
# in order to get new auth prior to operations, otherwise
# long-running sessions will fail.
self._swift_service_lock = threading.Lock()
self._trove_client = None
self._designate_client = None
self._magnum_client = None
@ -1077,19 +1071,25 @@ class OpenStackCloud(_normalize.Normalizer):
@property
def swift_service(self):
with self._swift_service_lock:
if self._swift_service is None:
with _utils.shade_exceptions("Error constructing "
"swift client"):
endpoint = self.get_session_endpoint(
service_key='object-store')
options = dict(os_auth_token=self.auth_token,
os_storage_url=endpoint,
os_region_name=self.region_name)
options.update(self._get_swift_kwargs())
self._swift_service = swiftclient.service.SwiftService(
options=options)
return self._swift_service
warnings.warn(
'Using shade to get a swift object is deprecated. If you'
' need a raw swiftclient.service.SwiftService object, please use'
' make_legacy_client in os-client-config instead')
try:
import swiftclient.service
except ImportError:
self.log.error(
'swiftclient is no longer a dependency of shade. You need to'
' install python-swiftclient directly.')
with _utils.shade_exceptions("Error constructing "
"swift client"):
endpoint = self.get_session_endpoint(
service_key='object-store')
options = dict(os_auth_token=self.auth_token,
os_storage_url=endpoint,
os_region_name=self.region_name)
options.update(self._get_swift_kwargs())
return swiftclient.service.SwiftService(options=options)
@property
def cinder_client(self):
@ -3432,10 +3432,6 @@ class OpenStackCloud(_normalize.Normalizer):
parameters = image_kwargs.pop('parameters', {})
image_kwargs.update(parameters)
# get new client sessions
with self._swift_service_lock:
self._swift_service = None
self.create_object(
container, name, filename,
md5=md5, sha256=sha256)
@ -5758,34 +5754,148 @@ class OpenStackCloud(_normalize.Normalizer):
if not filename:
filename = name
# segment_size gets used as a step value in a range call, so needs
# to be an int
if segment_size:
segment_size = int(segment_size)
segment_size = self.get_object_segment_size(segment_size)
file_size = os.path.getsize(filename)
if not (md5 or sha256):
(md5, sha256) = self._get_file_hashes(filename)
headers[OBJECT_MD5_KEY] = md5 or ''
headers[OBJECT_SHA256_KEY] = sha256 or ''
header_list = sorted([':'.join([k, v]) for (k, v) in headers.items()])
for (k, v) in metadata.items():
header_list.append(':'.join(['x-object-meta-' + k, v]))
headers['x-object-meta-' + k] = v
# On some clouds this is not necessary. On others it is. I'm confused.
self.create_container(container)
if self.is_object_stale(container, name, filename, md5, sha256):
endpoint = '{container}/{name}'.format(
container=container, name=name)
self.log.debug(
"swift uploading %(filename)s to %(container)s/%(name)s",
{'filename': filename, 'container': container, 'name': name})
upload = swiftclient.service.SwiftUploadObject(
source=filename, object_name=name)
for r in self.manager.submit_task(_tasks.ObjectCreate(
container=container, objects=[upload],
options=dict(
header=header_list,
segment_size=segment_size,
use_slo=use_slo))):
if not r['success']:
raise OpenStackCloudException(
'Failed at action ({action}) [{error}]:'.format(**r))
"swift uploading %(filename)s to %(endpoint)s",
{'filename': filename, 'endpoint': endpoint})
if file_size <= segment_size:
self._upload_object(endpoint, filename, headers)
else:
self._upload_large_object(
endpoint, filename, headers,
file_size, segment_size, use_slo)
def _upload_object(self, endpoint, filename, headers):
return self._object_store_client.put(
endpoint, headers=headers, data=open(filename, 'r'))
def _get_file_segments(self, endpoint, filename, file_size, segment_size):
# Use an ordered dict here so that testing can replicate things
segments = collections.OrderedDict()
for (index, offset) in enumerate(range(0, file_size, segment_size)):
remaining = file_size - (index * segment_size)
segment = _utils.FileSegment(
filename, offset,
segment_size if segment_size < remaining else remaining)
name = '{endpoint}/{index:0>6}'.format(
endpoint=endpoint, index=index)
segments[name] = segment
return segments
def _object_name_from_url(self, url):
'''Get container_name/object_name from the full URL called.
Remove the Swift endpoint from the front of the URL, and remove
the leaving / that will leave behind.'''
endpoint = self._object_store_client.get_endpoint()
object_name = url.replace(endpoint, '')
if object_name.startswith('/'):
object_name = object_name[1:]
return object_name
def _add_etag_to_manifest(self, segment_results, manifest):
for result in segment_results:
if 'Etag' not in result.headers:
continue
name = self._object_name_from_url(result.url)
for entry in manifest:
if entry['path'] == '/{name}'.format(name=name):
entry['etag'] = result.headers['Etag']
def _upload_large_object(
self, endpoint, filename, headers, file_size, segment_size, use_slo):
# If the object is big, we need to break it up into segments that
# are no larger than segment_size, upload each of them individually
# and then upload a manifest object. The segments can be uploaded in
# parallel, so we'll use the async feature of the TaskManager.
segment_futures = []
segment_results = []
retry_results = []
retry_futures = []
manifest = []
# Get an OrderedDict with keys being the swift location for the
# segment, the value a FileSegment file-like object that is a
# slice of the data for the segment.
segments = self._get_file_segments(
endpoint, filename, file_size, segment_size)
# Schedule the segments for upload
for name, segment in segments.items():
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
segment_futures.append(segment_future)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
manifest.append(dict(
path='/{name}'.format(name=name),
size_bytes=segment.length))
# Try once and collect failed results to retry
segment_results, retry_results = task_manager.wait_for_futures(
segment_futures, raise_on_error=False)
self._add_etag_to_manifest(segment_results, manifest)
for result in retry_results:
# Grab the FileSegment for the failed upload so we can retry
name = self._object_name_from_url(result.url)
segment = segments[name]
segment.seek(0)
# Async call to put - schedules execution and returns a future
segment_future = self._object_store_client.put(
name, headers=headers, data=segment, run_async=True)
# TODO(mordred) Collect etags from results to add to this manifest
# dict. Then sort the list of dicts by path.
retry_futures.append(segment_future)
# If any segments fail the second time, just throw the error
segment_results, retry_results = task_manager.wait_for_futures(
retry_futures, raise_on_error=True)
self._add_etag_to_manifest(segment_results, manifest)
if use_slo:
return self._finish_large_object_slo(endpoint, headers, manifest)
else:
return self._finish_large_object_dlo(endpoint, headers)
def _finish_large_object_slo(self, endpoint, headers, manifest):
# TODO(mordred) send an etag of the manifest, which is the md5sum
# of the concatenation of the etags of the results
headers = headers.copy()
return self._object_store_client.put(
endpoint,
params={'multipart-manifest': 'put'},
headers=headers, json=manifest)
def _finish_large_object_dlo(self, endpoint, headers):
headers = headers.copy()
headers['X-Object-Manifest'] = endpoint
return self._object_store_client.put(endpoint, headers=headers)
def update_object(self, container, name, metadata=None, **headers):
"""Update the metadata of an object
@ -5837,10 +5947,25 @@ class OpenStackCloud(_normalize.Normalizer):
:raises: OpenStackCloudException on operation error.
"""
# TODO(mordred) DELETE for swift returns status in text/plain format
# like so:
# Number Deleted: 15
# Number Not Found: 0
# Response Body:
# Response Status: 200 OK
# Errors:
# We should ultimately do something with that
try:
meta = self.get_object_metadata(container, name)
if not meta:
return False
params = {}
if meta.get('X-Static-Large-Object', None) == 'True':
params['multipart-manifest'] = 'delete'
self._object_store_client.delete(
'{container}/{object}'.format(
container=container, object=name))
container=container, object=name),
params=params)
return True
except OpenStackCloudHTTPError:
return False

View File

@ -26,6 +26,7 @@ import simplejson
import six
from shade import _log
from shade import exc
from shade import meta
@ -287,3 +288,33 @@ class TaskManager(object):
task_class = generate_task_class(method, name, result_filter_cb)
return self._executor.submit_task(task_class(**kwargs))
def wait_for_futures(futures, raise_on_error=True, log=None):
'''Collect results or failures from a list of running future tasks.'''
results = []
retries = []
# Check on each result as its thread finishes
for completed in concurrent.futures.as_completed(futures):
try:
result = completed.result()
# We have to do this here because munch_response doesn't
# get called on async job results
exc.raise_from_response(result)
results.append(result)
except (keystoneauth1.exceptions.RetriableConnectionFailure,
exc.OpenStackCloudException) as e:
if log:
log.debug(
"Exception processing async task: {e}".format(
e=str(e)),
exc_info=True)
# If we get an exception, put the result into a list so we
# can try again
if raise_on_error:
raise
else:
retries.append(result)
return results, retries

View File

@ -17,10 +17,13 @@ test_object
Functional tests for `shade` object methods.
"""
import random
import string
import tempfile
from testtools import content
from shade import exc
from shade.tests.functional import base
@ -40,24 +43,29 @@ class TestObject(base.BaseFunctionalTestCase):
self.assertEqual(container_name,
self.demo_cloud.list_containers()[0]['name'])
sizes = (
(64 * 1024, 1), # 64K, one segment
(50 * 1024 ** 2, 5) # 50MB, 5 segments
(64 * 1024, 1), # 64K, one segment
(64 * 1024, 5) # 64MB, 5 segments
)
for size, nseg in sizes:
segment_size = round(size / nseg)
with tempfile.NamedTemporaryFile() as sparse_file:
sparse_file.seek(size)
sparse_file.write("\0")
sparse_file.flush()
segment_size = int(round(size / nseg))
with tempfile.NamedTemporaryFile() as fake_file:
fake_content = ''.join(random.SystemRandom().choice(
string.ascii_uppercase + string.digits)
for _ in range(size)).encode('latin-1')
fake_file.write(fake_content)
fake_file.flush()
name = 'test-%d' % size
self.addCleanup(
self.demo_cloud.delete_object, container_name, name)
self.demo_cloud.create_object(
container_name, name,
sparse_file.name,
fake_file.name,
segment_size=segment_size,
metadata={'foo': 'bar'})
self.assertFalse(self.demo_cloud.is_object_stale(
container_name, name,
sparse_file.name
fake_file.name
)
)
self.assertEqual(
@ -70,12 +78,21 @@ class TestObject(base.BaseFunctionalTestCase):
'testv', self.demo_cloud.get_object_metadata(
container_name, name)['x-object-meta-testk']
)
self.assertIsNotNone(
self.demo_cloud.get_object(container_name, name))
try:
self.assertIsNotNone(
self.demo_cloud.get_object(container_name, name))
except exc.OpenStackCloudException as e:
self.addDetail(
'failed_response',
content.text_content(str(e.response.headers)))
self.addDetail(
'failed_response',
content.text_content(e.response.text))
self.assertEqual(
name,
self.demo_cloud.list_objects(container_name)[0]['name'])
self.demo_cloud.delete_object(container_name, name)
self.assertTrue(
self.demo_cloud.delete_object(container_name, name))
self.assertEqual([], self.demo_cloud.list_objects(container_name))
self.assertEqual(container_name,
self.demo_cloud.list_containers()[0]['name'])

View File

@ -171,10 +171,12 @@ class RequestsMockTestCase(BaseTestCase):
dict(method='GET', url='https://image.example.com/'),
]
def assert_calls(self):
def assert_calls(self, stop_after=None):
self.assertEqual(len(self.calls), len(self.adapter.request_history))
for (x, (call, history)) in enumerate(
zip(self.calls, self.adapter.request_history)):
if stop_after and x > stop_after:
break
self.assertEqual(
call['method'], history.method,
'Method mismatch on call {index}'.format(index=x))

View File

@ -10,6 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
import string
import tempfile
import testtools
from shade import _utils
@ -230,3 +234,26 @@ class TestUtils(base.TestCase):
"Invalid range value: <>100"
):
_utils.range_filter(RANGE_DATA, "key1", "<>100")
def test_file_segment(self):
file_size = 4200
content = ''.join(random.SystemRandom().choice(
string.ascii_uppercase + string.digits)
for _ in range(file_size)).encode('latin-1')
self.imagefile = tempfile.NamedTemporaryFile(delete=False)
self.imagefile.write(content)
self.imagefile.close()
segments = self.cloud._get_file_segments(
endpoint='test_container/test_image',
filename=self.imagefile.name,
file_size=file_size,
segment_size=1000)
self.assertEqual(len(segments), 5)
segment_content = b''
for (index, (name, segment)) in enumerate(segments.items()):
self.assertEqual(
'test_container/test_image/{index:0>6}'.format(index=index),
name)
segment_content += segment.read()
self.assertEqual(content, segment_content)

View File

@ -215,9 +215,7 @@ class TestImage(BaseTestImage):
self.assert_calls()
self.assertEqual(self.adapter.request_history[5].text.read(), b'\x00')
@mock.patch.object(shade.OpenStackCloud, 'swift_service')
def test_create_image_task(self,
swift_service_mock):
def test_create_image_task(self):
self.cloud.image_api_use_tasks = True
image_name = 'name-99'
container_name = 'image_upload_v2_test_container'
@ -262,6 +260,12 @@ class TestImage(BaseTestImage):
container=container_name, object=image_name),
status_code=404)
self.adapter.put(
'{endpoint}/{container}/{object}'.format(
endpoint=endpoint,
container=container_name, object=image_name),
status_code=201)
task_id = str(uuid.uuid4())
args = dict(
id=task_id,
@ -304,18 +308,6 @@ class TestImage(BaseTestImage):
image_name, self.imagefile.name, wait=True, timeout=1,
is_public=False, container=container_name)
args = {
'header': [
'x-object-meta-x-shade-md5:{md5}'.format(md5=NO_MD5),
'x-object-meta-x-shade-sha256:{sha}'.format(sha=NO_SHA256),
],
'segment_size': 1000,
'use_slo': True}
swift_service_mock.upload.assert_called_with(
container='image_upload_v2_test_container',
objects=mock.ANY,
options=args)
self.calls += [
dict(method='GET', url='https://image.example.com/v2/images'),
dict(method='GET', url='https://object-store.example.com/info'),
@ -339,6 +331,15 @@ class TestImage(BaseTestImage):
url='{endpoint}/{container}/{object}'.format(
endpoint=endpoint,
container=container_name, object=image_name)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}'.format(
endpoint=endpoint,
container=container_name, object=image_name),
headers={
'x-object-meta-x-shade-md5': NO_MD5,
'x-object-meta-x-shade-sha256': NO_SHA256,
}),
dict(method='GET', url='https://image.example.com/v2/images'),
dict(
method='POST',

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import tempfile
import testtools
import shade
@ -20,10 +22,10 @@ from shade import exc
from shade.tests.unit import base
class TestObject(base.RequestsMockTestCase):
class BaseTestObject(base.RequestsMockTestCase):
def setUp(self):
super(TestObject, self).setUp()
super(BaseTestObject, self).setUp()
self.container = self.getUniqueString()
self.object = self.getUniqueString()
@ -33,6 +35,9 @@ class TestObject(base.RequestsMockTestCase):
self.object_endpoint = '{endpoint}/{object}'.format(
endpoint=self.container_endpoint, object=self.object)
class TestObject(BaseTestObject):
def test_create_container(self):
"""Test creating a (private) container"""
self.adapter.head(
@ -331,22 +336,31 @@ class TestObject(base.RequestsMockTestCase):
self.cloud.list_objects, self.container)
def test_delete_object(self):
self.adapter.head(
self.object_endpoint, headers={'X-Object-Meta': 'foo'})
self.adapter.delete(self.object_endpoint, status_code=204)
self.assertTrue(self.cloud.delete_object(self.container, self.object))
self.calls += [
dict(method='DELETE', url=self.object_endpoint),
dict(
method='HEAD',
url=self.object_endpoint),
dict(
method='DELETE',
url=self.object_endpoint),
]
self.assert_calls()
def test_delete_object_not_found(self):
self.adapter.delete(self.object_endpoint, status_code=404)
self.adapter.head(self.object_endpoint, status_code=404)
self.assertFalse(self.cloud.delete_object(self.container, self.object))
self.calls += [
dict(method='DELETE', url=self.object_endpoint),
dict(
method='HEAD',
url=self.object_endpoint),
]
self.assert_calls()
@ -439,3 +453,722 @@ class TestObject(base.RequestsMockTestCase):
reason='Precondition failed')
self.assertEqual(shade.openstackcloud.DEFAULT_OBJECT_SEGMENT_SIZE,
self.cloud.get_object_segment_size(None))
class TestObjectUploads(BaseTestObject):
def setUp(self):
super(TestObjectUploads, self).setUp()
self.content = self.getUniqueString().encode('latin-1')
self.object_file = tempfile.NamedTemporaryFile(delete=False)
self.object_file.write(self.content)
self.object_file.close()
(self.md5, self.sha256) = self.cloud._get_file_hashes(
self.object_file.name)
self.endpoint = self.cloud._object_store_client.get_endpoint()
def test_create_object(self):
self.adapter.get(
'https://object-store.example.com/info',
json=dict(
swift={'max_file_size': 1000},
slo={'min_segment_size': 500}))
self.adapter.put(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container,),
status_code=201,
headers={
'Date': 'Fri, 16 Dec 2016 18:21:20 GMT',
'Content-Length': '0',
'Content-Type': 'text/html; charset=UTF-8',
})
self.adapter.head(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container),
[
dict(status_code=404),
dict(headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'Content-Type': 'text/plain; charset=utf-8'}),
])
self.adapter.head(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=404)
self.adapter.put(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=201)
self.cloud.create_object(
container=self.container, name=self.object,
filename=self.object_file.name)
self.calls += [
dict(method='GET', url='https://object-store.example.com/info'),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='PUT',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
headers={
'x-object-meta-x-shade-md5': self.md5,
'x-object-meta-x-shade-sha256': self.sha256,
}),
]
self.assert_calls()
def test_create_dynamic_large_object(self):
max_file_size = 2
min_file_size = 1
self.adapter.get(
'https://object-store.example.com/info',
json=dict(
swift={'max_file_size': max_file_size},
slo={'min_segment_size': min_file_size}))
self.adapter.put(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container,),
status_code=201,
headers={
'Date': 'Fri, 16 Dec 2016 18:21:20 GMT',
'Content-Length': '0',
'Content-Type': 'text/html; charset=UTF-8',
})
self.adapter.head(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container),
[
dict(status_code=404),
dict(headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'Content-Type': 'text/plain; charset=utf-8'}),
])
self.adapter.head(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=404)
self.adapter.put(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=201)
self.calls += [
dict(method='GET', url='https://object-store.example.com/info'),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='PUT',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object)),
]
for index, offset in enumerate(
range(0, len(self.content), max_file_size)):
self.adapter.put(
'{endpoint}/{container}/{object}/{index:0>6}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object,
index=index),
status_code=201)
self.calls += [
dict(
method='PUT',
url='{endpoint}/{container}/{object}/{index:0>6}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object,
index=index))]
self.calls += [
dict(
method='PUT',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
headers={
'x-object-manifest': '{container}/{object}'.format(
container=self.container, object=self.object),
'x-object-meta-x-shade-md5': self.md5,
'x-object-meta-x-shade-sha256': self.sha256,
}),
]
self.cloud.create_object(
container=self.container, name=self.object,
filename=self.object_file.name, use_slo=False)
# After call 6, order become indeterminate because of thread pool
self.assert_calls(stop_after=6)
for key, value in self.calls[-1]['headers'].items():
self.assertEqual(
value, self.adapter.request_history[-1].headers[key],
'header mismatch in manifest call')
def test_create_static_large_object(self):
max_file_size = 25
min_file_size = 1
self.adapter.get(
'https://object-store.example.com/info',
json=dict(
swift={'max_file_size': max_file_size},
slo={'min_segment_size': min_file_size}))
self.adapter.put(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container,),
status_code=201,
headers={
'Date': 'Fri, 16 Dec 2016 18:21:20 GMT',
'Content-Length': '0',
'Content-Type': 'text/html; charset=UTF-8',
})
self.adapter.head(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container),
[
dict(status_code=404),
dict(headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'Content-Type': 'text/plain; charset=utf-8'}),
])
self.adapter.head(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=404)
self.adapter.put(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=201)
self.calls += [
dict(method='GET', url='https://object-store.example.com/info'),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='PUT',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object)),
]
for index, offset in enumerate(
range(0, len(self.content), max_file_size)):
self.adapter.put(
'{endpoint}/{container}/{object}/{index:0>6}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object,
index=index),
status_code=201,
headers=dict(Etag='etag{index}'.format(index=index)))
self.calls += [
dict(
method='PUT',
url='{endpoint}/{container}/{object}/{index:0>6}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object,
index=index))]
self.calls += [
dict(
method='PUT',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-shade-md5': self.md5,
'x-object-meta-x-shade-sha256': self.sha256,
}),
]
self.cloud.create_object(
container=self.container, name=self.object,
filename=self.object_file.name, use_slo=True)
# After call 6, order become indeterminate because of thread pool
self.assert_calls(stop_after=6)
for key, value in self.calls[-1]['headers'].items():
self.assertEqual(
value, self.adapter.request_history[-1].headers[key],
'header mismatch in manifest call')
base_object = '/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)
self.assertEqual([
{
'path': "{base_object}/000000".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag0',
},
{
'path': "{base_object}/000001".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag1',
},
{
'path': "{base_object}/000002".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag2',
},
{
'path': "{base_object}/000003".format(
base_object=base_object),
'size_bytes': 5,
'etag': 'etag3',
},
], self.adapter.request_history[-1].json())
def test_object_segment_retry_failure(self):
max_file_size = 25
min_file_size = 1
self.adapter.get(
'https://object-store.example.com/info',
json=dict(
swift={'max_file_size': max_file_size},
slo={'min_segment_size': min_file_size}))
self.adapter.put(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container,),
status_code=201,
headers={
'Date': 'Fri, 16 Dec 2016 18:21:20 GMT',
'Content-Length': '0',
'Content-Type': 'text/html; charset=UTF-8',
})
self.adapter.head(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container),
[
dict(status_code=404),
dict(headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'Content-Type': 'text/plain; charset=utf-8'}),
])
self.adapter.head(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=404)
self.adapter.put(
'{endpoint}/{container}/{object}/000000'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object),
status_code=201)
self.adapter.put(
'{endpoint}/{container}/{object}/000001'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object),
status_code=201)
self.adapter.put(
'{endpoint}/{container}/{object}/000002'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object),
status_code=201)
self.adapter.put(
'{endpoint}/{container}/{object}/000003'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object),
status_code=501)
self.adapter.put(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=201)
self.calls += [
dict(method='GET', url='https://object-store.example.com/info'),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='PUT',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000000'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000001'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000002'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000003'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000003'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
]
self.assertRaises(
exc.OpenStackCloudException,
self.cloud.create_object,
container=self.container, name=self.object,
filename=self.object_file.name, use_slo=True)
# After call 6, order become indeterminate because of thread pool
self.assert_calls(stop_after=6)
def test_object_segment_retries(self):
max_file_size = 25
min_file_size = 1
self.adapter.get(
'https://object-store.example.com/info',
json=dict(
swift={'max_file_size': max_file_size},
slo={'min_segment_size': min_file_size}))
self.adapter.put(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container,),
status_code=201,
headers={
'Date': 'Fri, 16 Dec 2016 18:21:20 GMT',
'Content-Length': '0',
'Content-Type': 'text/html; charset=UTF-8',
})
self.adapter.head(
'{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container),
[
dict(status_code=404),
dict(headers={
'Content-Length': '0',
'X-Container-Object-Count': '0',
'Accept-Ranges': 'bytes',
'X-Storage-Policy': 'Policy-0',
'Date': 'Fri, 16 Dec 2016 18:29:05 GMT',
'X-Timestamp': '1481912480.41664',
'X-Trans-Id': 'tx60ec128d9dbf44b9add68-0058543271dfw1',
'X-Container-Bytes-Used': '0',
'Content-Type': 'text/plain; charset=utf-8'}),
])
self.adapter.head(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=404)
self.adapter.put(
'{endpoint}/{container}/{object}/000000'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object),
headers={'etag': 'etag0'},
status_code=201)
self.adapter.put(
'{endpoint}/{container}/{object}/000001'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object),
headers={'etag': 'etag1'},
status_code=201)
self.adapter.put(
'{endpoint}/{container}/{object}/000002'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object),
headers={'etag': 'etag2'},
status_code=201)
self.adapter.put(
'{endpoint}/{container}/{object}/000003'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object), [
dict(status_code=501),
dict(status_code=201, headers={'etag': 'etag3'}),
])
self.adapter.put(
'{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
status_code=201)
self.calls += [
dict(method='GET', url='https://object-store.example.com/info'),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='PUT',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}'.format(
endpoint=self.endpoint,
container=self.container)),
dict(
method='HEAD',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000000'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000001'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000002'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000003'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}/000003'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)),
dict(
method='PUT',
url='{endpoint}/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container, object=self.object),
params={
'multipart-manifest', 'put'
},
headers={
'x-object-meta-x-shade-md5': self.md5,
'x-object-meta-x-shade-sha256': self.sha256,
}),
]
self.cloud.create_object(
container=self.container, name=self.object,
filename=self.object_file.name, use_slo=True)
# After call 6, order become indeterminate because of thread pool
self.assert_calls(stop_after=6)
for key, value in self.calls[-1]['headers'].items():
self.assertEqual(
value, self.adapter.request_history[-1].headers[key],
'header mismatch in manifest call')
base_object = '/{container}/{object}'.format(
endpoint=self.endpoint,
container=self.container,
object=self.object)
self.assertEqual([
{
'path': "{base_object}/000000".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag0',
},
{
'path': "{base_object}/000001".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag1',
},
{
'path': "{base_object}/000002".format(
base_object=base_object),
'size_bytes': 25,
'etag': 'etag2',
},
{
'path': "{base_object}/000003".format(
base_object=base_object),
'size_bytes': 1,
'etag': 'etag3',
},
], self.adapter.request_history[-1].json())