Merge "Let clients request heartbeats during SLO PUTs"
This commit is contained in:
commit
2596b3ca9d
@ -51,7 +51,7 @@ To create a static large object, divide your content into pieces and
|
||||
create (upload) a segment object to contain each piece.
|
||||
|
||||
Create a manifest object. Include the ``multipart-manifest=put``
|
||||
query string at the end of the manifest object name to indicate that
|
||||
query parameter at the end of the manifest object name to indicate that
|
||||
this is a manifest object.
|
||||
|
||||
The body of the **PUT** request on the manifest object comprises a json
|
||||
@ -102,7 +102,7 @@ contrast to dynamic large objects.
|
||||
}
|
||||
]
|
||||
|
||||
|
|
||||
|
|
||||
|
||||
The ``Content-Length`` request header must contain the length of the
|
||||
json content—not the length of the segment objects. However, after the
|
||||
@ -113,9 +113,22 @@ of the concatenated ``ETag`` values of the object segments. You can also
|
||||
set the ``Content-Type`` request header and custom object metadata.
|
||||
|
||||
When the **PUT** operation sees the ``multipart-manifest=put`` query
|
||||
string, it reads the request body and verifies that each segment
|
||||
parameter, it reads the request body and verifies that each segment
|
||||
object exists and that the sizes and ETags match. If there is a
|
||||
mismatch, the **PUT**\ operation fails.
|
||||
mismatch, the **PUT** operation fails.
|
||||
|
||||
This verification process can take a long time to complete, particularly
|
||||
as the number of segments increases. You may include a ``heartbeat=on``
|
||||
query parameter to have the server:
|
||||
|
||||
1. send a ``202 Accepted`` response before it begins validating segments,
|
||||
2. periodically send whitespace characters to keep the connection alive, and
|
||||
3. send a final response code in the body.
|
||||
|
||||
.. note::
|
||||
The server may still immediately respond with ``400 Bad Request``
|
||||
if it can determine that the request is invalid before making
|
||||
backend requests.
|
||||
|
||||
If everything matches, the manifest object is created. The
|
||||
``X-Static-Large-Object`` metadata is set to ``true`` indicating that
|
||||
@ -124,18 +137,18 @@ this is a static object manifest.
|
||||
Normally when you perform a **GET** operation on the manifest object,
|
||||
the response body contains the concatenated content of the segment
|
||||
objects. To download the manifest list, use the
|
||||
``multipart-manifest=get`` query string. The resulting list is not
|
||||
``multipart-manifest=get`` query parameter. The resulting list is not
|
||||
formatted the same as the manifest you originally used in the **PUT**
|
||||
operation.
|
||||
|
||||
If you use the **DELETE** operation on a manifest object, the manifest
|
||||
object is deleted. The segment objects are not affected. However, if you
|
||||
add the ``multipart-manifest=delete`` query string, the segment
|
||||
add the ``multipart-manifest=delete`` query parameter, the segment
|
||||
objects are deleted and if all are successfully deleted, the manifest
|
||||
object is also deleted.
|
||||
|
||||
To change the manifest, use a **PUT** operation with the
|
||||
``multipart-manifest=put`` query string. This request creates a
|
||||
``multipart-manifest=put`` query parameter. This request creates a
|
||||
manifest object. You can also update the object metadata in the usual
|
||||
way.
|
||||
|
||||
@ -326,7 +339,7 @@ a manifest object but a normal object with content same as what you would
|
||||
get on a **GET** request to the original manifest object.
|
||||
|
||||
To copy the manifest object, you include the ``multipart-manifest=get``
|
||||
query string in the **COPY** request. The new object contains the same
|
||||
query parameter in the **COPY** request. The new object contains the same
|
||||
manifest as the original. The segment objects are not copied. Instead,
|
||||
both the original and new manifest objects share the same set of segment
|
||||
objects.
|
||||
|
@ -766,6 +766,11 @@ use = egg:swift#slo
|
||||
# Default is to use the concurrency value from above; all of the same caveats
|
||||
# apply regarding recommended ranges.
|
||||
# delete_concurrency = 2
|
||||
#
|
||||
# In order to keep a connection active during a potentially long PUT request,
|
||||
# clients may request that Swift send whitespace ahead of the final response
|
||||
# body. This whitespace will be yielded at most every yield_frequency seconds.
|
||||
# yield_frequency = 10
|
||||
|
||||
# Note: Put after auth and staticweb in the pipeline.
|
||||
# If you don't put it in the pipeline, it will be inserted for you.
|
||||
|
@ -13,7 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
r"""
|
||||
Middleware that will provide Static Large Object (SLO) support.
|
||||
|
||||
This feature is very similar to Dynamic Large Object (DLO) support in that
|
||||
@ -72,6 +72,33 @@ found, size/etag mismatch, below minimum size, invalid range) then the user
|
||||
will receive a 4xx error response. If everything does match, the user will
|
||||
receive a 2xx response and the SLO object is ready for downloading.
|
||||
|
||||
Note that large manifests may take a long time to verify; historically,
|
||||
clients would need to use a long read timeout for the connection to give
|
||||
Swift enough time to send a final ``201 Created`` or ``400 Bad Request``
|
||||
response. Now, clients should use the query parameters::
|
||||
|
||||
?multipart-manifest=put&heartbeat=on
|
||||
|
||||
to request that Swift send an immediate ``202 Accepted`` response and periodic
|
||||
whitespace to keep the connection alive. A final response code will appear in
|
||||
the body. The format of the response body defaults to text/plain but can be
|
||||
either json or xml depending on the ``Accept`` header. An example body is as
|
||||
follows::
|
||||
|
||||
Response Status: 201 Created
|
||||
Response Body:
|
||||
Etag: "8f481cede6d2ddc07cb36aa084d9a64d"
|
||||
Last Modified: Wed, 25 Oct 2017 17:08:55 GMT
|
||||
Errors:
|
||||
|
||||
Or, as a json response::
|
||||
|
||||
{"Response Status": "201 Created",
|
||||
"Response Body": "",
|
||||
"Etag": "\"8f481cede6d2ddc07cb36aa084d9a64d\"",
|
||||
"Last Modified": "Wed, 25 Oct 2017 17:08:55 GMT",
|
||||
"Errors": []}
|
||||
|
||||
Behind the scenes, on success, a JSON manifest generated from the user input is
|
||||
sent to object servers with an extra ``X-Static-Large-Object: True`` header
|
||||
and a modified ``Content-Type``. The items in this manifest will include the
|
||||
@ -251,12 +278,14 @@ import json
|
||||
import mimetypes
|
||||
import re
|
||||
import six
|
||||
import time
|
||||
from hashlib import md5
|
||||
from swift.common.exceptions import ListingIterError, SegmentError
|
||||
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
|
||||
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
|
||||
HTTPOk, HTTPPreconditionFailed, HTTPException, HTTPNotFound, \
|
||||
HTTPUnauthorized, HTTPConflict, HTTPUnprocessableEntity, Response, Range
|
||||
HTTPUnauthorized, HTTPConflict, HTTPUnprocessableEntity, Response, Range, \
|
||||
RESPONSE_REASONS
|
||||
from swift.common.utils import get_logger, config_true_value, \
|
||||
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
|
||||
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
|
||||
@ -273,6 +302,7 @@ from swift.common.middleware.bulk import get_response_body, \
|
||||
DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 * 1024 # 1 MiB
|
||||
DEFAULT_MAX_MANIFEST_SEGMENTS = 1000
|
||||
DEFAULT_MAX_MANIFEST_SIZE = 1024 * 1024 * 2 # 2 MiB
|
||||
DEFAULT_YIELD_FREQUENCY = 10
|
||||
|
||||
|
||||
REQUIRED_SLO_KEYS = set(['path'])
|
||||
@ -862,16 +892,26 @@ class StaticLargeObject(object):
|
||||
|
||||
:param app: The next WSGI filter or app in the paste.deploy chain.
|
||||
:param conf: The configuration dict for the middleware.
|
||||
:param max_manifest_segments: The maximum number of segments allowed in
|
||||
newly-created static large objects.
|
||||
:param max_manifest_size: The maximum size (in bytes) of newly-created
|
||||
static-large-object manifests.
|
||||
:param yield_frequency: If the client included ``heartbeat=on`` in the
|
||||
query parameters when creating a new static large
|
||||
object, the period of time to wait between sending
|
||||
whitespace to keep the connection alive.
|
||||
"""
|
||||
|
||||
def __init__(self, app, conf,
|
||||
max_manifest_segments=DEFAULT_MAX_MANIFEST_SEGMENTS,
|
||||
max_manifest_size=DEFAULT_MAX_MANIFEST_SIZE):
|
||||
max_manifest_size=DEFAULT_MAX_MANIFEST_SIZE,
|
||||
yield_frequency=DEFAULT_YIELD_FREQUENCY):
|
||||
self.conf = conf
|
||||
self.app = app
|
||||
self.logger = get_logger(conf, log_route='slo')
|
||||
self.max_manifest_segments = max_manifest_segments
|
||||
self.max_manifest_size = max_manifest_size
|
||||
self.yield_frequency = yield_frequency
|
||||
self.max_get_time = int(self.conf.get('max_get_time', 86400))
|
||||
self.rate_limit_under_size = int(self.conf.get(
|
||||
'rate_limit_under_size', DEFAULT_RATE_LIMIT_UNDER_SIZE))
|
||||
@ -930,7 +970,6 @@ class StaticLargeObject(object):
|
||||
raise HTTPRequestEntityTooLarge(
|
||||
'Number of segments must be <= %d' %
|
||||
self.max_manifest_segments)
|
||||
total_size = 0
|
||||
try:
|
||||
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
|
||||
except ValueError:
|
||||
@ -954,6 +993,7 @@ class StaticLargeObject(object):
|
||||
return obj_name, sub_req.get_response(self)
|
||||
|
||||
def validate_seg_dict(seg_dict, head_seg_resp, allow_empty_segment):
|
||||
obj_name = seg_dict['path']
|
||||
if not head_seg_resp.is_success:
|
||||
problem_segments.append([quote(obj_name),
|
||||
head_seg_resp.status])
|
||||
@ -1011,61 +1051,115 @@ class StaticLargeObject(object):
|
||||
seg_data['sub_slo'] = True
|
||||
return segment_length, seg_data
|
||||
|
||||
heartbeat = config_true_value(req.params.get('heartbeat'))
|
||||
separator = ''
|
||||
if heartbeat:
|
||||
# Apparently some ways of deploying require that this to happens
|
||||
# *before* the return? Not sure why.
|
||||
req.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
start_response('202 Accepted', [ # NB: not 201 !
|
||||
('Content-Type', out_content_type),
|
||||
])
|
||||
separator = '\r\n\r\n'
|
||||
data_for_storage = [None] * len(parsed_data)
|
||||
with StreamingPile(self.concurrency) as pile:
|
||||
for obj_name, resp in pile.asyncstarmap(do_head, (
|
||||
(path, ) for path in path2indices)):
|
||||
for i in path2indices[obj_name]:
|
||||
segment_length, seg_data = validate_seg_dict(
|
||||
parsed_data[i], resp,
|
||||
allow_empty_segment=(i == len(parsed_data) - 1))
|
||||
data_for_storage[i] = seg_data
|
||||
total_size += segment_length
|
||||
|
||||
if problem_segments:
|
||||
resp_body = get_response_body(
|
||||
out_content_type, {}, problem_segments, 'upload')
|
||||
raise HTTPBadRequest(resp_body, content_type=out_content_type)
|
||||
def resp_iter():
|
||||
total_size = 0
|
||||
# wsgi won't propagate start_response calls until some data has
|
||||
# been yielded so make sure first heartbeat is sent immediately
|
||||
if heartbeat:
|
||||
yield ' '
|
||||
last_yield_time = time.time()
|
||||
with StreamingPile(self.concurrency) as pile:
|
||||
for obj_name, resp in pile.asyncstarmap(do_head, (
|
||||
(path, ) for path in path2indices)):
|
||||
now = time.time()
|
||||
if heartbeat and (now - last_yield_time >
|
||||
self.yield_frequency):
|
||||
# Make sure we've called start_response before
|
||||
# sending data
|
||||
yield ' '
|
||||
last_yield_time = now
|
||||
for i in path2indices[obj_name]:
|
||||
segment_length, seg_data = validate_seg_dict(
|
||||
parsed_data[i], resp,
|
||||
allow_empty_segment=(i == len(parsed_data) - 1))
|
||||
data_for_storage[i] = seg_data
|
||||
total_size += segment_length
|
||||
|
||||
slo_etag = md5()
|
||||
for seg_data in data_for_storage:
|
||||
if seg_data.get('range'):
|
||||
slo_etag.update('%s:%s;' % (seg_data['hash'],
|
||||
seg_data['range']))
|
||||
if problem_segments:
|
||||
err = HTTPBadRequest(content_type=out_content_type)
|
||||
resp_dict = {}
|
||||
if heartbeat:
|
||||
resp_dict['Response Status'] = err.status
|
||||
resp_dict['Response Body'] = err.body or '\n'.join(
|
||||
RESPONSE_REASONS.get(err.status_int, ['']))
|
||||
else:
|
||||
start_response(err.status,
|
||||
[(h, v) for h, v in err.headers.items()
|
||||
if h.lower() != 'content-length'])
|
||||
yield separator + get_response_body(
|
||||
out_content_type, resp_dict, problem_segments, 'upload')
|
||||
return
|
||||
|
||||
slo_etag = md5()
|
||||
for seg_data in data_for_storage:
|
||||
if seg_data.get('range'):
|
||||
slo_etag.update('%s:%s;' % (seg_data['hash'],
|
||||
seg_data['range']))
|
||||
else:
|
||||
slo_etag.update(seg_data['hash'])
|
||||
|
||||
slo_etag = slo_etag.hexdigest()
|
||||
client_etag = req.headers.get('Etag')
|
||||
if client_etag and client_etag.strip('"') != slo_etag:
|
||||
err = HTTPUnprocessableEntity(request=req)
|
||||
if heartbeat:
|
||||
yield separator + get_response_body(out_content_type, {
|
||||
'Response Status': err.status,
|
||||
'Response Body': err.body or '\n'.join(
|
||||
RESPONSE_REASONS.get(err.status_int, [''])),
|
||||
}, problem_segments, 'upload')
|
||||
else:
|
||||
for chunk in err(req.environ, start_response):
|
||||
yield chunk
|
||||
return
|
||||
|
||||
json_data = json.dumps(data_for_storage)
|
||||
if six.PY3:
|
||||
json_data = json_data.encode('utf-8')
|
||||
req.body = json_data
|
||||
req.headers.update({
|
||||
SYSMETA_SLO_ETAG: slo_etag,
|
||||
SYSMETA_SLO_SIZE: total_size,
|
||||
'X-Static-Large-Object': 'True',
|
||||
'Etag': md5(json_data).hexdigest(),
|
||||
})
|
||||
|
||||
env = req.environ
|
||||
if not env.get('CONTENT_TYPE'):
|
||||
guessed_type, _junk = mimetypes.guess_type(req.path_info)
|
||||
env['CONTENT_TYPE'] = (guessed_type or
|
||||
'application/octet-stream')
|
||||
env['swift.content_type_overridden'] = True
|
||||
env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size
|
||||
|
||||
resp = req.get_response(self.app)
|
||||
resp_dict = {'Response Status': resp.status}
|
||||
if resp.is_success:
|
||||
resp.etag = slo_etag
|
||||
resp_dict['Etag'] = resp.headers['Etag']
|
||||
resp_dict['Last Modified'] = resp.headers['Last-Modified']
|
||||
|
||||
if heartbeat:
|
||||
resp_dict['Response Body'] = resp.body
|
||||
yield separator + get_response_body(
|
||||
out_content_type, resp_dict, [], 'upload')
|
||||
else:
|
||||
slo_etag.update(seg_data['hash'])
|
||||
for chunk in resp(req.environ, start_response):
|
||||
yield chunk
|
||||
|
||||
slo_etag = slo_etag.hexdigest()
|
||||
client_etag = req.headers.get('Etag')
|
||||
if client_etag and client_etag.strip('"') != slo_etag:
|
||||
raise HTTPUnprocessableEntity(request=req)
|
||||
|
||||
json_data = json.dumps(data_for_storage)
|
||||
if six.PY3:
|
||||
json_data = json_data.encode('utf-8')
|
||||
req.body = json_data
|
||||
req.headers.update({
|
||||
SYSMETA_SLO_ETAG: slo_etag,
|
||||
SYSMETA_SLO_SIZE: total_size,
|
||||
'X-Static-Large-Object': 'True',
|
||||
'Etag': md5(json_data).hexdigest(),
|
||||
})
|
||||
|
||||
env = req.environ
|
||||
if not env.get('CONTENT_TYPE'):
|
||||
guessed_type, _junk = mimetypes.guess_type(req.path_info)
|
||||
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'
|
||||
env['swift.content_type_overridden'] = True
|
||||
env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size
|
||||
|
||||
def start_response_wrapper(status, headers, exc_info=None):
|
||||
for i, (header, _value) in enumerate(headers):
|
||||
if header.lower() == 'etag':
|
||||
headers[i] = ('Etag', '"%s"' % slo_etag)
|
||||
break
|
||||
return start_response(status, headers, exc_info)
|
||||
|
||||
return self.app(env, start_response_wrapper)
|
||||
return resp_iter()
|
||||
|
||||
def get_segments_to_delete_iter(self, req):
|
||||
"""
|
||||
@ -1214,10 +1308,13 @@ def filter_factory(global_conf, **local_conf):
|
||||
DEFAULT_MAX_MANIFEST_SEGMENTS))
|
||||
max_manifest_size = int(conf.get('max_manifest_size',
|
||||
DEFAULT_MAX_MANIFEST_SIZE))
|
||||
yield_frequency = int(conf.get('yield_frequency',
|
||||
DEFAULT_YIELD_FREQUENCY))
|
||||
|
||||
register_swift_info('slo',
|
||||
max_manifest_segments=max_manifest_segments,
|
||||
max_manifest_size=max_manifest_size,
|
||||
yield_frequency=yield_frequency,
|
||||
# this used to be configurable; report it as 1 for
|
||||
# clients that might still care
|
||||
min_segment_size=1)
|
||||
@ -1226,5 +1323,6 @@ def filter_factory(global_conf, **local_conf):
|
||||
return StaticLargeObject(
|
||||
app, conf,
|
||||
max_manifest_segments=max_manifest_segments,
|
||||
max_manifest_size=max_manifest_size)
|
||||
max_manifest_size=max_manifest_size,
|
||||
yield_frequency=yield_frequency)
|
||||
return slo_filter
|
||||
|
@ -347,6 +347,8 @@ class Connection(object):
|
||||
self.connection.send('0\r\n\r\n')
|
||||
|
||||
self.response = self.connection.getresponse()
|
||||
# Hope it isn't big!
|
||||
self.response.body = self.response.read()
|
||||
self.connection.close()
|
||||
return self.response.status
|
||||
|
||||
|
@ -792,6 +792,101 @@ class TestSlo(Base):
|
||||
except ValueError:
|
||||
self.fail("COPY didn't copy the manifest (invalid json on GET)")
|
||||
|
||||
def test_slo_put_heartbeating(self):
|
||||
if 'yield_frequency' not in cluster_info['slo']:
|
||||
# old swift?
|
||||
raise SkipTest('Swift does not seem to support heartbeating')
|
||||
|
||||
def do_put(headers=None, include_error=False):
|
||||
file_item = self.env.container.file("manifest-heartbeat")
|
||||
seg_info = self.env.seg_info
|
||||
manifest_data = [seg_info['seg_a'], seg_info['seg_b'],
|
||||
seg_info['seg_c'], seg_info['seg_d'],
|
||||
seg_info['seg_e']]
|
||||
if include_error:
|
||||
manifest_data.append({'path': 'non-existent/segment'})
|
||||
resp = file_item.write(
|
||||
json.dumps(manifest_data),
|
||||
parms={'multipart-manifest': 'put', 'heartbeat': 'on'},
|
||||
hdrs=headers, return_resp=True)
|
||||
self.assertEqual(resp.status, 202)
|
||||
self.assertTrue(resp.chunked)
|
||||
body_lines = resp.body.split('\n', 2)
|
||||
self.assertFalse(body_lines[0].strip()) # all whitespace
|
||||
self.assertEqual('\r', body_lines[1])
|
||||
return body_lines[2]
|
||||
|
||||
body_lines = do_put().split('\n')
|
||||
self.assertIn('Response Status: 201 Created', body_lines)
|
||||
self.assertIn('Etag', [line.split(':', 1)[0] for line in body_lines])
|
||||
self.assertIn('Last Modified', [line.split(':', 1)[0]
|
||||
for line in body_lines])
|
||||
|
||||
body_lines = do_put({'Accept': 'text/plain'}).split('\n')
|
||||
self.assertIn('Response Status: 201 Created', body_lines)
|
||||
self.assertIn('Etag', [line.split(':', 1)[0] for line in body_lines])
|
||||
self.assertIn('Last Modified', [line.split(':', 1)[0]
|
||||
for line in body_lines])
|
||||
|
||||
body = do_put({'Accept': 'application/json'})
|
||||
try:
|
||||
resp = json.loads(body)
|
||||
except ValueError:
|
||||
self.fail('Expected JSON, got %r' % body)
|
||||
self.assertIn('Etag', resp)
|
||||
del resp['Etag']
|
||||
self.assertIn('Last Modified', resp)
|
||||
del resp['Last Modified']
|
||||
self.assertEqual(resp, {
|
||||
'Response Status': '201 Created',
|
||||
'Response Body': '',
|
||||
'Errors': [],
|
||||
})
|
||||
|
||||
body_lines = do_put(include_error=True).split('\n')
|
||||
self.assertIn('Response Status: 400 Bad Request', body_lines)
|
||||
self.assertIn('Response Body: Bad Request', body_lines)
|
||||
self.assertNotIn('Etag', [line.split(':', 1)[0]
|
||||
for line in body_lines])
|
||||
self.assertNotIn('Last Modified', [line.split(':', 1)[0]
|
||||
for line in body_lines])
|
||||
self.assertEqual(body_lines[-3:], [
|
||||
'Errors:',
|
||||
'non-existent/segment, 404 Not Found',
|
||||
'',
|
||||
])
|
||||
|
||||
body = do_put({'Accept': 'application/json'}, include_error=True)
|
||||
try:
|
||||
resp = json.loads(body)
|
||||
except ValueError:
|
||||
self.fail('Expected JSON, got %r' % body)
|
||||
self.assertNotIn('Etag', resp)
|
||||
self.assertNotIn('Last Modified', resp)
|
||||
self.assertEqual(resp, {
|
||||
'Response Status': '400 Bad Request',
|
||||
'Response Body': 'Bad Request\nThe server could not comply with '
|
||||
'the request since it is either malformed or '
|
||||
'otherwise incorrect.',
|
||||
'Errors': [
|
||||
['non-existent/segment', '404 Not Found'],
|
||||
],
|
||||
})
|
||||
|
||||
body = do_put({'Accept': 'application/json', 'ETag': 'bad etag'})
|
||||
try:
|
||||
resp = json.loads(body)
|
||||
except ValueError:
|
||||
self.fail('Expected JSON, got %r' % body)
|
||||
self.assertNotIn('Etag', resp)
|
||||
self.assertNotIn('Last Modified', resp)
|
||||
self.assertEqual(resp, {
|
||||
'Response Status': '422 Unprocessable Entity',
|
||||
'Response Body': 'Unprocessable Entity\nUnable to process the '
|
||||
'contained instructions',
|
||||
'Errors': [],
|
||||
})
|
||||
|
||||
def _make_manifest(self):
|
||||
file_item = self.env.container.file("manifest-post")
|
||||
seg_info = self.env.seg_info
|
||||
|
@ -312,6 +312,9 @@ class TestSloPutManifest(SloTestCase):
|
||||
self.app.register(
|
||||
'PUT', '/', swob.HTTPOk, {}, 'passed')
|
||||
|
||||
self.app.register(
|
||||
'HEAD', '/v1/AUTH_test/cont/missing-object',
|
||||
swob.HTTPNotFound, {}, None)
|
||||
self.app.register(
|
||||
'HEAD', '/v1/AUTH_test/cont/object',
|
||||
swob.HTTPOk,
|
||||
@ -343,7 +346,8 @@ class TestSloPutManifest(SloTestCase):
|
||||
{'Content-Length': '1', 'Etag': 'a'},
|
||||
None)
|
||||
self.app.register(
|
||||
'PUT', '/v1/AUTH_test/c/man', swob.HTTPCreated, {}, None)
|
||||
'PUT', '/v1/AUTH_test/c/man', swob.HTTPCreated,
|
||||
{'Last-Modified': 'Fri, 01 Feb 2012 20:38:36 GMT'}, None)
|
||||
self.app.register(
|
||||
'DELETE', '/v1/AUTH_test/c/man', swob.HTTPNoContent, {}, None)
|
||||
|
||||
@ -432,6 +436,219 @@ class TestSloPutManifest(SloTestCase):
|
||||
'Content-Type %r does not end with swift_bytes=100' %
|
||||
req.headers['Content-Type'])
|
||||
|
||||
@patch('swift.common.middleware.slo.time')
|
||||
def test_handle_multipart_put_fast_heartbeat(self, mock_time):
|
||||
mock_time.time.side_effect = [
|
||||
0, # start time
|
||||
1, # first segment's fast
|
||||
2, # second segment's also fast!
|
||||
]
|
||||
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100},
|
||||
{'path': '/cont/object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100}])
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
|
||||
body=test_json_data)
|
||||
|
||||
status, headers, body = self.call_slo(req)
|
||||
self.assertEqual('202 Accepted', status)
|
||||
headers_found = [h.lower() for h, v in headers]
|
||||
self.assertNotIn('etag', headers_found)
|
||||
body = ''.join(body)
|
||||
gen_etag = '"' + md5hex('etagoftheobjectsegment' * 2) + '"'
|
||||
self.assertTrue(body.startswith(' \r\n\r\n'),
|
||||
'Expected body to start with single space and two '
|
||||
'blank lines; got %r' % body)
|
||||
self.assertIn('\nResponse Status: 201 Created\n', body)
|
||||
self.assertIn('\nResponse Body: \n', body)
|
||||
self.assertIn('\nEtag: %s\n' % gen_etag, body)
|
||||
self.assertIn('\nLast Modified: Fri, 01 Feb 2012 20:38:36 GMT\n', body)
|
||||
|
||||
@patch('swift.common.middleware.slo.time')
|
||||
def test_handle_multipart_long_running_put_success(self, mock_time):
|
||||
mock_time.time.side_effect = [
|
||||
0, # start time
|
||||
1, # first segment's fast
|
||||
20, # second segment's slow
|
||||
]
|
||||
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100},
|
||||
{'path': '/cont/object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100}])
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
|
||||
body=test_json_data)
|
||||
|
||||
status, headers, body = self.call_slo(req)
|
||||
self.assertEqual('202 Accepted', status)
|
||||
headers_found = [h.lower() for h, v in headers]
|
||||
self.assertNotIn('etag', headers_found)
|
||||
body = ''.join(body)
|
||||
gen_etag = '"' + md5hex('etagoftheobjectsegment' * 2) + '"'
|
||||
self.assertTrue(body.startswith(' \r\n\r\n'),
|
||||
'Expected body to start with two spaces and two '
|
||||
'blank lines; got %r' % body)
|
||||
self.assertIn('\nResponse Status: 201 Created\n', body)
|
||||
self.assertIn('\nResponse Body: \n', body)
|
||||
self.assertIn('\nEtag: %s\n' % gen_etag, body)
|
||||
self.assertIn('\nLast Modified: Fri, 01 Feb 2012 20:38:36 GMT\n', body)
|
||||
|
||||
@patch('swift.common.middleware.slo.time')
|
||||
def test_handle_multipart_long_running_put_success_json(self, mock_time):
|
||||
mock_time.time.side_effect = [
|
||||
0, # start time
|
||||
11, # first segment's slow
|
||||
22, # second segment's also slow
|
||||
]
|
||||
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100},
|
||||
{'path': '/cont/object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100}])
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Accept': 'application/json'},
|
||||
body=test_json_data)
|
||||
|
||||
status, headers, body = self.call_slo(req)
|
||||
self.assertEqual('202 Accepted', status)
|
||||
headers_found = [h.lower() for h, v in headers]
|
||||
self.assertNotIn('etag', headers_found)
|
||||
body = ''.join(body)
|
||||
gen_etag = '"' + md5hex('etagoftheobjectsegment' * 2) + '"'
|
||||
self.assertTrue(body.startswith(' \r\n\r\n'),
|
||||
'Expected body to start with three spaces and two '
|
||||
'blank lines; got %r' % body)
|
||||
body = json.loads(body)
|
||||
self.assertEqual(body['Response Status'], '201 Created')
|
||||
self.assertEqual(body['Response Body'], '')
|
||||
self.assertEqual(body['Etag'], gen_etag)
|
||||
self.assertEqual(body['Last Modified'],
|
||||
'Fri, 01 Feb 2012 20:38:36 GMT')
|
||||
|
||||
@patch('swift.common.middleware.slo.time')
|
||||
def test_handle_multipart_long_running_put_failure(self, mock_time):
|
||||
mock_time.time.side_effect = [
|
||||
0, # start time
|
||||
1, # first segment's fast
|
||||
20, # second segment's slow
|
||||
]
|
||||
test_json_data = json.dumps([{'path': u'/cont/missing-object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100},
|
||||
{'path': '/cont/object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 99}])
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
|
||||
body=test_json_data)
|
||||
|
||||
status, headers, body = self.call_slo(req)
|
||||
self.assertEqual('202 Accepted', status)
|
||||
headers_found = [h.lower() for h, v in headers]
|
||||
self.assertNotIn('etag', headers_found)
|
||||
body = ''.join(body).split('\n')
|
||||
self.assertEqual([' \r', '\r'], body[:2],
|
||||
'Expected body to start with two spaces and two '
|
||||
'blank lines; got %r' % '\n'.join(body))
|
||||
self.assertIn('Response Status: 400 Bad Request', body[2:5])
|
||||
self.assertIn('Response Body: Bad Request', body)
|
||||
self.assertIn('The server could not comply with the request since it '
|
||||
'is either malformed or otherwise incorrect.', body)
|
||||
self.assertFalse(any(line.startswith('Etag: ') for line in body))
|
||||
self.assertFalse(any(line.startswith('Last Modified: ')
|
||||
for line in body))
|
||||
self.assertEqual(body[-4], 'Errors:')
|
||||
self.assertEqual(sorted(body[-3:-1]), [
|
||||
'/cont/missing-object, 404 Not Found',
|
||||
'/cont/object, Size Mismatch',
|
||||
])
|
||||
self.assertEqual(body[-1], '')
|
||||
|
||||
@patch('swift.common.middleware.slo.time')
|
||||
def test_handle_multipart_long_running_put_failure_json(self, mock_time):
|
||||
mock_time.time.side_effect = [
|
||||
0, # start time
|
||||
11, # first segment's slow
|
||||
22, # second segment's also slow
|
||||
]
|
||||
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 99},
|
||||
{'path': '/cont/object',
|
||||
'etag': 'some other etag',
|
||||
'size_bytes': 100}])
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Accept': 'application/json'},
|
||||
body=test_json_data)
|
||||
|
||||
status, headers, body = self.call_slo(req)
|
||||
self.assertEqual('202 Accepted', status)
|
||||
headers_found = [h.lower() for h, v in headers]
|
||||
self.assertNotIn('etag', headers_found)
|
||||
body = ''.join(body)
|
||||
self.assertTrue(body.startswith(' \r\n\r\n'),
|
||||
'Expected body to start with three spaces and two '
|
||||
'blank lines; got %r' % body)
|
||||
body = json.loads(body)
|
||||
self.assertEqual(body['Response Status'], '400 Bad Request')
|
||||
self.assertEqual(body['Response Body'], 'Bad Request\nThe server '
|
||||
'could not comply with the request since it is '
|
||||
'either malformed or otherwise incorrect.')
|
||||
self.assertNotIn('Etag', body)
|
||||
self.assertNotIn('Last Modified', body)
|
||||
self.assertEqual(sorted(body['Errors']), [
|
||||
['/cont/object', 'Etag Mismatch'],
|
||||
[quote(u'/cont/object\u2661'.encode('utf8')), 'Size Mismatch'],
|
||||
])
|
||||
|
||||
@patch('swift.common.middleware.slo.time')
|
||||
def test_handle_multipart_long_running_put_bad_etag_json(self, mock_time):
|
||||
mock_time.time.side_effect = [
|
||||
0, # start time
|
||||
11, # first segment's slow
|
||||
22, # second segment's also slow
|
||||
]
|
||||
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100},
|
||||
{'path': '/cont/object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100}])
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/c/man?multipart-manifest=put&heartbeat=on',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Accept': 'application/json', 'ETag': 'bad etag'},
|
||||
body=test_json_data)
|
||||
|
||||
status, headers, body = self.call_slo(req)
|
||||
self.assertEqual('202 Accepted', status)
|
||||
headers_found = [h.lower() for h, v in headers]
|
||||
self.assertNotIn('etag', headers_found)
|
||||
body = ''.join(body)
|
||||
self.assertTrue(body.startswith(' \r\n\r\n'),
|
||||
'Expected body to start with three spaces and two '
|
||||
'blank lines; got %r' % body)
|
||||
body = json.loads(body)
|
||||
self.assertEqual(body['Response Status'], '422 Unprocessable Entity')
|
||||
self.assertEqual('Unprocessable Entity\nUnable to process the '
|
||||
'contained instructions', body['Response Body'])
|
||||
self.assertNotIn('Etag', body)
|
||||
self.assertNotIn('Last Modified', body)
|
||||
self.assertEqual(body['Errors'], [])
|
||||
|
||||
def test_manifest_put_no_etag_success(self):
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/c/man?multipart-manifest=put',
|
||||
@ -464,10 +681,10 @@ class TestSloPutManifest(SloTestCase):
|
||||
self.assertEqual(resp.status_int, 422)
|
||||
|
||||
def test_handle_multipart_put_disallow_empty_first_segment(self):
|
||||
test_json_data = json.dumps([{'path': '/cont/object',
|
||||
test_json_data = json.dumps([{'path': '/cont/small_object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 0},
|
||||
{'path': '/cont/small_object',
|
||||
{'path': '/cont/object',
|
||||
'etag': 'etagoftheobjectsegment',
|
||||
'size_bytes': 100}])
|
||||
req = Request.blank('/v1/a/c/o?multipart-manifest=put',
|
||||
@ -3109,6 +3326,35 @@ class TestSwiftInfo(unittest.TestCase):
|
||||
self.assertEqual(swift_info['slo'].get('min_segment_size'), 1)
|
||||
self.assertEqual(swift_info['slo'].get('max_manifest_size'),
|
||||
mware.max_manifest_size)
|
||||
self.assertEqual(1000, mware.max_manifest_segments)
|
||||
self.assertEqual(2097152, mware.max_manifest_size)
|
||||
self.assertEqual(1048576, mware.rate_limit_under_size)
|
||||
self.assertEqual(10, mware.rate_limit_after_segment)
|
||||
self.assertEqual(1, mware.rate_limit_segments_per_sec)
|
||||
self.assertEqual(10, mware.yield_frequency)
|
||||
self.assertEqual(2, mware.concurrency)
|
||||
self.assertEqual(2, mware.bulk_deleter.delete_concurrency)
|
||||
|
||||
def test_registered_non_defaults(self):
|
||||
conf = dict(
|
||||
max_manifest_segments=500, max_manifest_size=1048576,
|
||||
rate_limit_under_size=2097152, rate_limit_after_segment=20,
|
||||
rate_limit_segments_per_sec=2, yield_frequency=5, concurrency=1,
|
||||
delete_concurrency=3)
|
||||
mware = slo.filter_factory(conf)('have to pass in an app')
|
||||
swift_info = utils.get_swift_info()
|
||||
self.assertTrue('slo' in swift_info)
|
||||
self.assertEqual(swift_info['slo'].get('max_manifest_segments'), 500)
|
||||
self.assertEqual(swift_info['slo'].get('min_segment_size'), 1)
|
||||
self.assertEqual(swift_info['slo'].get('max_manifest_size'), 1048576)
|
||||
self.assertEqual(500, mware.max_manifest_segments)
|
||||
self.assertEqual(1048576, mware.max_manifest_size)
|
||||
self.assertEqual(2097152, mware.rate_limit_under_size)
|
||||
self.assertEqual(20, mware.rate_limit_after_segment)
|
||||
self.assertEqual(2, mware.rate_limit_segments_per_sec)
|
||||
self.assertEqual(5, mware.yield_frequency)
|
||||
self.assertEqual(1, mware.concurrency)
|
||||
self.assertEqual(3, mware.bulk_deleter.delete_concurrency)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user