Enable proxy to build listings from shards
When a container is sharding or sharded the proxy container controller now builds container listings by concatenating components from shard ranges. Co-Authored-By: Matthew Oliver <matt@oliver.net.au> Co-Authored-By: Tim Burke <tim.burke@gmail.com> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Co-Authored-By: Samuel Merritt <sam@swiftstack.com> Change-Id: Ia4cfebbe50338a761b8b6e9903b1869cb1f5b47e
This commit is contained in:
parent
723eac907c
commit
e940bc6cb1
@ -28,6 +28,7 @@ from six.moves.urllib.parse import quote
|
||||
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import functools
|
||||
import inspect
|
||||
import itertools
|
||||
@ -40,11 +41,11 @@ from eventlet import sleep
|
||||
from eventlet.timeout import Timeout
|
||||
import six
|
||||
|
||||
from swift.common.wsgi import make_pre_authed_env
|
||||
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
|
||||
from swift.common.utils import Timestamp, config_true_value, \
|
||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||
GreenAsyncPile, quorum_size, parse_content_type, \
|
||||
document_iters_to_http_response_body
|
||||
document_iters_to_http_response_body, ShardRange
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common import constraints
|
||||
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
||||
@ -188,6 +189,7 @@ def headers_to_container_info(headers, status_int=HTTP_OK):
|
||||
},
|
||||
'meta': meta,
|
||||
'sysmeta': sysmeta,
|
||||
'sharding_state': headers.get('x-backend-sharding-state', 'unsharded'),
|
||||
}
|
||||
|
||||
|
||||
@ -375,6 +377,9 @@ def get_container_info(env, app, swift_source=None):
|
||||
else:
|
||||
info[field] = int(info[field])
|
||||
|
||||
if info.get('sharding_state') is None:
|
||||
info['sharding_state'] = 'unsharded'
|
||||
|
||||
return info
|
||||
|
||||
|
||||
@ -1994,3 +1999,91 @@ class Controller(object):
|
||||
else:
|
||||
raise ValueError(
|
||||
"server_type can only be 'account' or 'container'")
|
||||
|
||||
def _get_container_listing(self, req, account, container, headers=None,
|
||||
params=None):
|
||||
"""
|
||||
Fetch container listing from given `account/container`.
|
||||
|
||||
:param req: original Request instance.
|
||||
:param account: account in which `container` is stored.
|
||||
:param container: container from which listing should be fetched.
|
||||
:param headers: headers to be included with the request
|
||||
:param params: query string parameters to be used.
|
||||
:return: a tuple of (deserialized json data structure, swob Response)
|
||||
"""
|
||||
params = params or {}
|
||||
version, _a, _c, _other = req.split_path(3, 4, True)
|
||||
path = '/'.join(['', version, account, container])
|
||||
|
||||
subreq = make_pre_authed_request(
|
||||
req.environ, method='GET', path=quote(path), headers=req.headers,
|
||||
swift_source='SH')
|
||||
if headers:
|
||||
subreq.headers.update(headers)
|
||||
subreq.params = params
|
||||
self.app.logger.debug(
|
||||
'Get listing from %s %s' % (subreq.path_qs, headers))
|
||||
response = self.app.handle_request(subreq)
|
||||
|
||||
if not is_success(response.status_int):
|
||||
self.app.logger.warning(
|
||||
'Failed to get container listing from %s: %s',
|
||||
subreq.path_qs, response.status_int)
|
||||
return None, response
|
||||
|
||||
try:
|
||||
data = json.loads(response.body)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError('not a list')
|
||||
return data, response
|
||||
except ValueError as err:
|
||||
self.app.logger.error(
|
||||
'Problem with listing response from %s: %r',
|
||||
subreq.path_qs, err)
|
||||
return None, response
|
||||
|
||||
def _get_shard_ranges(self, req, account, container, includes=None,
|
||||
states=None):
|
||||
"""
|
||||
Fetch shard ranges from given `account/container`. If `includes` is
|
||||
given then the shard range for that object name is requested, otherwise
|
||||
all shard ranges are requested.
|
||||
|
||||
:param req: original Request instance.
|
||||
:param account: account from which shard ranges should be fetched.
|
||||
:param container: container from which shard ranges should be fetched.
|
||||
:param includes: (optional) restricts the list of fetched shard ranges
|
||||
to those which include the given name.
|
||||
:param states: (optional) the states of shard ranges to be fetched.
|
||||
:return: a list of instances of :class:`swift.common.utils.ShardRange`,
|
||||
or None if there was a problem fetching the shard ranges
|
||||
"""
|
||||
params = req.params.copy()
|
||||
params.pop('limit', None)
|
||||
params['format'] = 'json'
|
||||
if includes:
|
||||
params['includes'] = includes
|
||||
if states:
|
||||
params['states'] = states
|
||||
headers = {'X-Backend-Record-Type': 'shard'}
|
||||
listing, response = self._get_container_listing(
|
||||
req, account, container, headers=headers, params=params)
|
||||
if listing is None:
|
||||
return None
|
||||
|
||||
record_type = response.headers.get('x-backend-record-type')
|
||||
if record_type != 'shard':
|
||||
err = 'unexpected record type %r' % record_type
|
||||
self.app.logger.error("Failed to get shard ranges from %s: %s",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
|
||||
try:
|
||||
return [ShardRange.from_dict(shard_range)
|
||||
for shard_range in listing]
|
||||
except (ValueError, TypeError, KeyError) as err:
|
||||
self.app.logger.error(
|
||||
"Failed to get shard ranges from %s: invalid data: %r",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
|
@ -14,10 +14,12 @@
|
||||
# limitations under the License.
|
||||
|
||||
from swift import gettext_ as _
|
||||
import json
|
||||
|
||||
from six.moves.urllib.parse import unquote
|
||||
from swift.common.utils import public, csv_append, Timestamp
|
||||
from swift.common.constraints import check_metadata
|
||||
from swift.common.utils import public, csv_append, Timestamp, \
|
||||
config_true_value, ShardRange
|
||||
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
|
||||
from swift.common.http import HTTP_ACCEPTED, is_success
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
cors_validation, set_info_cache, clear_info_cache
|
||||
@ -103,10 +105,20 @@ class ContainerController(Controller):
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part)
|
||||
params = req.params
|
||||
params['format'] = 'json'
|
||||
record_type = req.headers.get('X-Backend-Record-Type', '').lower()
|
||||
if not record_type:
|
||||
record_type = 'auto'
|
||||
req.headers['X-Backend-Record-Type'] = 'auto'
|
||||
params['states'] = 'listing'
|
||||
req.params = params
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Container'), node_iter, part,
|
||||
req.swift_entity_path, concurrency)
|
||||
resp_record_type = resp.headers.get('X-Backend-Record-Type', '')
|
||||
if all((req.method == "GET", record_type == 'auto',
|
||||
resp_record_type.lower() == 'shard')):
|
||||
resp = self._get_from_shards(req, resp)
|
||||
|
||||
# Cache this. We just made a request to a storage node and got
|
||||
# up-to-date information for the container.
|
||||
resp.headers['X-Backend-Recheck-Container-Existence'] = str(
|
||||
@ -126,6 +138,99 @@ class ContainerController(Controller):
|
||||
del resp.headers[key]
|
||||
return resp
|
||||
|
||||
def _get_from_shards(self, req, resp):
|
||||
# construct listing using shards described by the response body
|
||||
shard_ranges = [ShardRange.from_dict(data)
|
||||
for data in json.loads(resp.body)]
|
||||
self.app.logger.debug('GET listing from %s shards for: %s',
|
||||
len(shard_ranges), req.path_qs)
|
||||
if not shard_ranges:
|
||||
# can't find ranges or there was a problem getting the ranges. So
|
||||
# return what we have.
|
||||
return resp
|
||||
|
||||
objects = []
|
||||
req_limit = int(req.params.get('limit', CONTAINER_LISTING_LIMIT))
|
||||
params = req.params.copy()
|
||||
params.pop('states', None)
|
||||
req.headers.pop('X-Backend-Record-Type', None)
|
||||
reverse = config_true_value(params.get('reverse'))
|
||||
marker = params.get('marker')
|
||||
end_marker = params.get('end_marker')
|
||||
|
||||
limit = req_limit
|
||||
for shard_range in shard_ranges:
|
||||
params['limit'] = limit
|
||||
# Always set marker to ensure that object names less than or equal
|
||||
# to those already in the listing are not fetched
|
||||
if objects:
|
||||
last_name = objects[-1].get('name',
|
||||
objects[-1].get('subdir', u''))
|
||||
params['marker'] = last_name.encode('utf-8')
|
||||
elif reverse and marker and marker > shard_range.lower:
|
||||
params['marker'] = marker
|
||||
elif marker and marker <= shard_range.upper:
|
||||
params['marker'] = marker
|
||||
else:
|
||||
params['marker'] = shard_range.upper_str if reverse \
|
||||
else shard_range.lower_str
|
||||
if params['marker'] and reverse:
|
||||
params['marker'] += '\x00'
|
||||
|
||||
# Always set end_marker to ensure that misplaced objects beyond
|
||||
# the expected shard range are not fetched
|
||||
if end_marker and end_marker in shard_range:
|
||||
params['end_marker'] = end_marker
|
||||
else:
|
||||
params['end_marker'] = shard_range.lower_str if reverse \
|
||||
else shard_range.upper_str
|
||||
if params['end_marker'] and not reverse:
|
||||
params['end_marker'] += '\x00'
|
||||
|
||||
if (shard_range.account == self.account_name and
|
||||
shard_range.container == self.container_name):
|
||||
# directed back to same container - force GET of objects
|
||||
headers = {'X-Backend-Record-Type': 'object'}
|
||||
else:
|
||||
headers = None
|
||||
self.app.logger.debug('Getting from %s %s with %s',
|
||||
shard_range, shard_range.name, headers)
|
||||
objs, shard_resp = self._get_container_listing(
|
||||
req, shard_range.account, shard_range.container,
|
||||
headers=headers, params=params)
|
||||
|
||||
if not objs:
|
||||
# tolerate errors or empty shard containers
|
||||
continue
|
||||
|
||||
objects.extend(objs)
|
||||
limit -= len(objs)
|
||||
|
||||
if limit <= 0:
|
||||
break
|
||||
elif (end_marker and reverse and
|
||||
end_marker >= objects[-1]['name'].encode('utf-8')):
|
||||
break
|
||||
elif (end_marker and not reverse and
|
||||
end_marker <= objects[-1]['name'].encode('utf-8')):
|
||||
break
|
||||
|
||||
resp.body = json.dumps(objects)
|
||||
constrained = any(req.params.get(constraint) for constraint in (
|
||||
'marker', 'end_marker', 'path', 'prefix', 'delimiter'))
|
||||
if not constrained and len(objects) < req_limit:
|
||||
self.app.logger.debug('Setting object count to %s' % len(objects))
|
||||
# prefer the actual listing stats over the potentially outdated
|
||||
# root stats. This condition is only likely when a sharded
|
||||
# container is shrinking or in tests; typically a sharded container
|
||||
# will have more than CONTAINER_LISTING_LIMIT objects so any
|
||||
# unconstrained listing will be capped by the limit and total
|
||||
# object stats cannot therefore be inferred from the listing.
|
||||
resp.headers['X-Container-Object-Count'] = len(objects)
|
||||
resp.headers['X-Container-Bytes-Used'] = sum(
|
||||
[o['bytes'] for o in objects])
|
||||
return resp
|
||||
|
||||
@public
|
||||
@delay_denial
|
||||
@cors_validation
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import json
|
||||
from collections import defaultdict
|
||||
import unittest
|
||||
import mock
|
||||
@ -23,11 +24,14 @@ from swift.proxy.controllers.base import headers_to_container_info, \
|
||||
Controller, GetOrHeadHandler, bytes_to_skip
|
||||
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS
|
||||
from swift.common import exceptions
|
||||
from swift.common.utils import split_path
|
||||
from swift.common.utils import split_path, ShardRange, Timestamp
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.http import is_success
|
||||
from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection
|
||||
from test.unit import fake_http_connect, FakeRing, FakeMemcache, PatchPolicies
|
||||
from test.unit import (
|
||||
fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, FakeLogger,
|
||||
make_timestamp_iter,
|
||||
mocked_http_conn)
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.common.request_helpers import (
|
||||
get_sys_meta_prefix, get_object_transient_sysmeta
|
||||
@ -172,7 +176,8 @@ class TestFuncs(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.app = proxy_server.Application(None, FakeMemcache(),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
container_ring=FakeRing(),
|
||||
logger=FakeLogger())
|
||||
|
||||
def test_get_info_zero_recheck(self):
|
||||
mock_cache = mock.Mock()
|
||||
@ -1030,3 +1035,146 @@ class TestFuncs(unittest.TestCase):
|
||||
# prime numbers
|
||||
self.assertEqual(bytes_to_skip(11, 7), 4)
|
||||
self.assertEqual(bytes_to_skip(97, 7873823), 55)
|
||||
|
||||
def test_get_shard_ranges_for_container_get(self):
|
||||
ts_iter = make_timestamp_iter()
|
||||
shard_ranges = [dict(ShardRange(
|
||||
'.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
|
||||
'%d_upper' % i, object_count=i, bytes_used=1024 * i,
|
||||
meta_timestamp=next(ts_iter)))
|
||||
for i in range(3)]
|
||||
base = Controller(self.app)
|
||||
req = Request.blank('/v1/a/c', method='GET')
|
||||
resp_headers = {'X-Backend-Record-Type': 'shard'}
|
||||
with mocked_http_conn(
|
||||
200, 200, body_iter=iter(['', json.dumps(shard_ranges)]),
|
||||
headers=resp_headers
|
||||
) as fake_conn:
|
||||
actual = base._get_shard_ranges(req, 'a', 'c')
|
||||
|
||||
# account info
|
||||
captured = fake_conn.requests
|
||||
self.assertEqual('HEAD', captured[0]['method'])
|
||||
self.assertEqual('a', captured[0]['path'][7:])
|
||||
# container GET
|
||||
self.assertEqual('GET', captured[1]['method'])
|
||||
self.assertEqual('a/c', captured[1]['path'][7:])
|
||||
self.assertEqual('format=json', captured[1]['qs'])
|
||||
self.assertEqual(
|
||||
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
|
||||
self.assertEqual(shard_ranges, [dict(pr) for pr in actual])
|
||||
self.assertFalse(self.app.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_get_shard_ranges_for_object_put(self):
|
||||
ts_iter = make_timestamp_iter()
|
||||
shard_ranges = [dict(ShardRange(
|
||||
'.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
|
||||
'%d_upper' % i, object_count=i, bytes_used=1024 * i,
|
||||
meta_timestamp=next(ts_iter)))
|
||||
for i in range(3)]
|
||||
base = Controller(self.app)
|
||||
req = Request.blank('/v1/a/c/o', method='PUT')
|
||||
resp_headers = {'X-Backend-Record-Type': 'shard'}
|
||||
with mocked_http_conn(
|
||||
200, 200, body_iter=iter(['', json.dumps(shard_ranges[1:2])]),
|
||||
headers=resp_headers
|
||||
) as fake_conn:
|
||||
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
|
||||
|
||||
# account info
|
||||
captured = fake_conn.requests
|
||||
self.assertEqual('HEAD', captured[0]['method'])
|
||||
self.assertEqual('a', captured[0]['path'][7:])
|
||||
# container GET
|
||||
self.assertEqual('GET', captured[1]['method'])
|
||||
self.assertEqual('a/c', captured[1]['path'][7:])
|
||||
params = sorted(captured[1]['qs'].split('&'))
|
||||
self.assertEqual(
|
||||
['format=json', 'includes=1_test'], params)
|
||||
self.assertEqual(
|
||||
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
|
||||
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
|
||||
self.assertFalse(self.app.logger.get_lines_for_level('error'))
|
||||
|
||||
def _check_get_shard_ranges_bad_data(self, body):
|
||||
base = Controller(self.app)
|
||||
req = Request.blank('/v1/a/c/o', method='PUT')
|
||||
# empty response
|
||||
headers = {'X-Backend-Record-Type': 'shard'}
|
||||
with mocked_http_conn(200, 200, body_iter=iter(['', body]),
|
||||
headers=headers):
|
||||
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
|
||||
self.assertIsNone(actual)
|
||||
lines = self.app.logger.get_lines_for_level('error')
|
||||
return lines
|
||||
|
||||
def test_get_shard_ranges_empty_body(self):
|
||||
error_lines = self._check_get_shard_ranges_bad_data('')
|
||||
self.assertIn('Problem with listing response', error_lines[0])
|
||||
self.assertIn('No JSON', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
|
||||
def test_get_shard_ranges_not_a_list(self):
|
||||
error_lines = self._check_get_shard_ranges_bad_data(json.dumps({}))
|
||||
self.assertIn('Problem with listing response', error_lines[0])
|
||||
self.assertIn('not a list', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
|
||||
def test_get_shard_ranges_key_missing(self):
|
||||
error_lines = self._check_get_shard_ranges_bad_data(json.dumps([{}]))
|
||||
self.assertIn('Failed to get shard ranges', error_lines[0])
|
||||
self.assertIn('KeyError', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
|
||||
def test_get_shard_ranges_invalid_shard_range(self):
|
||||
sr = ShardRange('a/c', Timestamp.now())
|
||||
bad_sr_data = dict(sr, name='bad_name')
|
||||
error_lines = self._check_get_shard_ranges_bad_data(
|
||||
json.dumps([bad_sr_data]))
|
||||
self.assertIn('Failed to get shard ranges', error_lines[0])
|
||||
self.assertIn('ValueError', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
|
||||
def test_get_shard_ranges_missing_record_type(self):
|
||||
base = Controller(self.app)
|
||||
req = Request.blank('/v1/a/c/o', method='PUT')
|
||||
sr = ShardRange('a/c', Timestamp.now())
|
||||
body = json.dumps([dict(sr)])
|
||||
with mocked_http_conn(
|
||||
200, 200, body_iter=iter(['', body])):
|
||||
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
|
||||
self.assertIsNone(actual)
|
||||
error_lines = self.app.logger.get_lines_for_level('error')
|
||||
self.assertIn('Failed to get shard ranges', error_lines[0])
|
||||
self.assertIn('unexpected record type', error_lines[0])
|
||||
self.assertIn('/a/c', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
|
||||
def test_get_shard_ranges_wrong_record_type(self):
|
||||
base = Controller(self.app)
|
||||
req = Request.blank('/v1/a/c/o', method='PUT')
|
||||
sr = ShardRange('a/c', Timestamp.now())
|
||||
body = json.dumps([dict(sr)])
|
||||
headers = {'X-Backend-Record-Type': 'object'}
|
||||
with mocked_http_conn(
|
||||
200, 200, body_iter=iter(['', body]),
|
||||
headers=headers):
|
||||
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
|
||||
self.assertIsNone(actual)
|
||||
error_lines = self.app.logger.get_lines_for_level('error')
|
||||
self.assertIn('Failed to get shard ranges', error_lines[0])
|
||||
self.assertIn('unexpected record type', error_lines[0])
|
||||
self.assertIn('/a/c', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
|
||||
def test_get_shard_ranges_request_failed(self):
|
||||
base = Controller(self.app)
|
||||
req = Request.blank('/v1/a/c/o', method='PUT')
|
||||
with mocked_http_conn(200, 404, 404, 404):
|
||||
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
|
||||
self.assertIsNone(actual)
|
||||
self.assertFalse(self.app.logger.get_lines_for_level('error'))
|
||||
warning_lines = self.app.logger.get_lines_for_level('warning')
|
||||
self.assertIn('Failed to get container listing', warning_lines[0])
|
||||
self.assertIn('/a/c', warning_lines[0])
|
||||
self.assertFalse(warning_lines[1:])
|
||||
|
@ -12,17 +12,24 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
|
||||
import mock
|
||||
import socket
|
||||
import unittest
|
||||
|
||||
from eventlet import Timeout
|
||||
from six.moves import urllib
|
||||
|
||||
from swift.common.constraints import CONTAINER_LISTING_LIMIT
|
||||
from swift.common.swob import Request
|
||||
from swift.common.utils import ShardRange, Timestamp
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers.base import headers_to_container_info, Controller
|
||||
from test.unit import fake_http_connect, FakeRing, FakeMemcache
|
||||
from swift.proxy.controllers.base import headers_to_container_info, Controller, \
|
||||
get_container_info
|
||||
from test import annotate_failure
|
||||
from test.unit import fake_http_connect, FakeRing, FakeMemcache, \
|
||||
make_timestamp_iter
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
|
||||
@ -72,6 +79,7 @@ class TestContainerController(TestRingBase):
|
||||
new=FakeAccountInfoContainerController):
|
||||
return _orig_get_controller(*args, **kwargs)
|
||||
self.app.get_controller = wrapped_get_controller
|
||||
self.ts_iter = make_timestamp_iter()
|
||||
|
||||
def _make_callback_func(self, context):
|
||||
def callback(ipaddr, port, device, partition, method, path,
|
||||
@ -329,6 +337,852 @@ class TestContainerController(TestRingBase):
|
||||
]
|
||||
self._assert_responses('POST', POST_TEST_CASES)
|
||||
|
||||
def _make_shard_objects(self, shard_range):
|
||||
lower = ord(shard_range.lower[0]) if shard_range.lower else ord('@')
|
||||
upper = ord(shard_range.upper[0]) if shard_range.upper else ord('z')
|
||||
|
||||
objects = [{'name': chr(i), 'bytes': i, 'hash': 'hash%s' % chr(i),
|
||||
'content_type': 'text/plain', 'deleted': 0,
|
||||
'last_modified': next(self.ts_iter).isoformat}
|
||||
for i in range(lower + 1, upper + 1)]
|
||||
return objects
|
||||
|
||||
def _check_GET_shard_listing(self, mock_responses, expected_objects,
|
||||
expected_requests, query_string='',
|
||||
reverse=False):
|
||||
# mock_responses is a list of tuples (status, json body, headers)
|
||||
# expected objects is a list of dicts
|
||||
# expected_requests is a list of tuples (path, hdrs dict, params dict)
|
||||
|
||||
# sanity check that expected objects is name ordered with no repeats
|
||||
def name(obj):
|
||||
return obj.get('name', obj.get('subdir'))
|
||||
|
||||
for (prev, next_) in zip(expected_objects, expected_objects[1:]):
|
||||
if reverse:
|
||||
self.assertGreater(name(prev), name(next_))
|
||||
else:
|
||||
self.assertLess(name(prev), name(next_))
|
||||
container_path = '/v1/a/c' + query_string
|
||||
codes = (resp[0] for resp in mock_responses)
|
||||
bodies = iter([json.dumps(resp[1]) for resp in mock_responses])
|
||||
exp_headers = [resp[2] for resp in mock_responses]
|
||||
request = Request.blank(container_path)
|
||||
with mocked_http_conn(
|
||||
*codes, body_iter=bodies, headers=exp_headers) as fake_conn:
|
||||
resp = request.get_response(self.app)
|
||||
for backend_req in fake_conn.requests:
|
||||
self.assertEqual(request.headers['X-Trans-Id'],
|
||||
backend_req['headers']['X-Trans-Id'])
|
||||
self.assertTrue(backend_req['headers']['User-Agent'].startswith(
|
||||
'proxy-server'))
|
||||
self.assertEqual(200, resp.status_int)
|
||||
actual_objects = json.loads(resp.body)
|
||||
self.assertEqual(len(expected_objects), len(actual_objects))
|
||||
self.assertEqual(expected_objects, actual_objects)
|
||||
self.assertEqual(len(expected_requests), len(fake_conn.requests))
|
||||
for i, ((exp_path, exp_headers, exp_params), req) in enumerate(
|
||||
zip(expected_requests, fake_conn.requests)):
|
||||
with annotate_failure('Request check at index %d.' % i):
|
||||
# strip off /sdx/0/ from path
|
||||
self.assertEqual(exp_path, req['path'][7:])
|
||||
self.assertEqual(
|
||||
dict(exp_params, format='json'),
|
||||
dict(urllib.parse.parse_qsl(req['qs'], True)))
|
||||
for k, v in exp_headers.items():
|
||||
self.assertIn(k, req['headers'])
|
||||
self.assertEqual(v, req['headers'][k])
|
||||
self.assertNotIn('X-Backend-Override-Delete', req['headers'])
|
||||
return resp
|
||||
|
||||
def check_response(self, resp, root_resp_hdrs, expected_objects=None):
|
||||
info_hdrs = dict(root_resp_hdrs)
|
||||
if expected_objects is None:
|
||||
# default is to expect whatever the root container sent
|
||||
expected_obj_count = root_resp_hdrs['X-Container-Object-Count']
|
||||
expected_bytes_used = root_resp_hdrs['X-Container-Bytes-Used']
|
||||
else:
|
||||
expected_bytes_used = sum([o['bytes'] for o in expected_objects])
|
||||
expected_obj_count = len(expected_objects)
|
||||
info_hdrs['X-Container-Bytes-Used'] = expected_bytes_used
|
||||
info_hdrs['X-Container-Object-Count'] = expected_obj_count
|
||||
self.assertEqual(expected_bytes_used,
|
||||
int(resp.headers['X-Container-Bytes-Used']))
|
||||
self.assertEqual(expected_obj_count,
|
||||
int(resp.headers['X-Container-Object-Count']))
|
||||
self.assertEqual('sharded', resp.headers['X-Backend-Sharding-State'])
|
||||
for k, v in root_resp_hdrs.items():
|
||||
if k.lower().startswith('x-container-meta'):
|
||||
self.assertEqual(v, resp.headers[k])
|
||||
# check that info cache is correct for root container
|
||||
info = get_container_info(resp.request.environ, self.app)
|
||||
self.assertEqual(headers_to_container_info(info_hdrs), info)
|
||||
|
||||
def test_GET_sharded_container(self):
|
||||
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('pie', ''))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in shard_bounds]
|
||||
sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
sr_objs = [self._make_shard_objects(sr) for sr in shard_ranges]
|
||||
shard_resp_hdrs = [
|
||||
{'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': len(sr_objs[i]),
|
||||
'X-Container-Bytes-Used':
|
||||
sum([obj['bytes'] for obj in sr_objs[i]]),
|
||||
'X-Container-Meta-Flavour': 'flavour%d' % i,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
for i in range(3)]
|
||||
|
||||
all_objects = []
|
||||
for objects in sr_objs:
|
||||
all_objects.extend(objects)
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
expected_objects = all_objects
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
# pretend root object stats are not yet updated
|
||||
'X-Container-Object-Count': num_all_objects - 1,
|
||||
'X-Container-Bytes-Used': size_all_objects - 1,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
root_shard_resp_hdrs = dict(root_resp_hdrs)
|
||||
root_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
# GET all objects
|
||||
# include some failed responses
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(404, '', {}),
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(200, sr_objs[1], shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2], shard_resp_hdrs[2])
|
||||
]
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 404
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', limit=str(limit),
|
||||
states='listing')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))), # 200
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='p', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + sr_objs[1])))) # 200
|
||||
]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs,
|
||||
expected_objects=expected_objects)
|
||||
|
||||
# GET all objects - sharding, final shard range points back to root
|
||||
root_range = ShardRange('a/c', Timestamp.now(), 'pie', '')
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts[:2] + [dict(root_range)], root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(200, sr_objs[1], shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2], root_resp_hdrs)
|
||||
]
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', limit=str(limit),
|
||||
states='listing')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))), # 200
|
||||
(root_range.name, {'X-Backend-Record-Type': 'object'},
|
||||
dict(marker='p', end_marker='',
|
||||
limit=str(limit - len(sr_objs[0] + sr_objs[1])))) # 200
|
||||
]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs,
|
||||
expected_objects=expected_objects)
|
||||
|
||||
# GET all objects in reverse
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, list(reversed(sr_dicts)), root_shard_resp_hdrs),
|
||||
(200, list(reversed(sr_objs[2])), shard_resp_hdrs[2]),
|
||||
(200, list(reversed(sr_objs[1])), shard_resp_hdrs[1]),
|
||||
(200, list(reversed(sr_objs[0])), shard_resp_hdrs[0]),
|
||||
]
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing', reverse='true')),
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='pie', reverse='true',
|
||||
limit=str(limit), states='listing')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='q', end_marker='ham', states='listing',
|
||||
reverse='true', limit=str(limit - len(sr_objs[2])))), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='i', end_marker='', states='listing', reverse='true',
|
||||
limit=str(limit - len(sr_objs[2] + sr_objs[1])))), # 200
|
||||
]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, list(reversed(expected_objects)),
|
||||
expected_requests, query_string='?reverse=true', reverse=True)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs,
|
||||
expected_objects=expected_objects)
|
||||
|
||||
# GET with limit param
|
||||
limit = len(sr_objs[0]) + len(sr_objs[1]) + 1
|
||||
expected_objects = all_objects[:limit]
|
||||
mock_responses = [
|
||||
(404, '', {}),
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(200, sr_objs[1], shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2][:1], shard_resp_hdrs[2])
|
||||
]
|
||||
expected_requests = [
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(limit=str(limit), states='listing')), # 404
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(limit=str(limit), states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker='', end_marker='ham\x00', states='listing',
|
||||
limit=str(limit))),
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))),
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker='p', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + sr_objs[1]))))
|
||||
]
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests,
|
||||
query_string='?limit=%s' % limit)
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
# GET with marker
|
||||
marker = sr_objs[1][2]['name']
|
||||
first_included = len(sr_objs[0]) + 2
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
expected_objects = all_objects[first_included:]
|
||||
mock_responses = [
|
||||
(404, '', {}),
|
||||
(200, sr_dicts[1:], root_shard_resp_hdrs),
|
||||
(404, '', {}),
|
||||
(200, sr_objs[1][2:], shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2], shard_resp_hdrs[2])
|
||||
]
|
||||
expected_requests = [
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker=marker, states='listing')), # 404
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker=marker, states='listing')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, # 404
|
||||
dict(marker=marker, end_marker='pie\x00', states='listing',
|
||||
limit=str(limit))),
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker=marker, end_marker='pie\x00', states='listing',
|
||||
limit=str(limit))),
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker='p', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[1][2:])))),
|
||||
]
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests,
|
||||
query_string='?marker=%s' % marker)
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
# GET with end marker
|
||||
end_marker = sr_objs[1][6]['name']
|
||||
first_excluded = len(sr_objs[0]) + 6
|
||||
expected_objects = all_objects[:first_excluded]
|
||||
mock_responses = [
|
||||
(404, '', {}),
|
||||
(200, sr_dicts[:2], root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(404, '', {}),
|
||||
(200, sr_objs[1][:6], shard_resp_hdrs[1])
|
||||
]
|
||||
expected_requests = [
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(end_marker=end_marker, states='listing')), # 404
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(end_marker=end_marker, states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker='', end_marker='ham\x00', states='listing',
|
||||
limit=str(limit))),
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, # 404
|
||||
dict(marker='h', end_marker=end_marker, states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))),
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker='h', end_marker=end_marker, states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))),
|
||||
]
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests,
|
||||
query_string='?end_marker=%s' % end_marker)
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
# marker and end_marker and limit
|
||||
limit = 2
|
||||
expected_objects = all_objects[first_included:first_excluded]
|
||||
mock_responses = [
|
||||
(200, sr_dicts[1:2], root_shard_resp_hdrs),
|
||||
(200, sr_objs[1][2:6], shard_resp_hdrs[1])
|
||||
]
|
||||
expected_requests = [
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing', limit=str(limit),
|
||||
marker=marker, end_marker=end_marker)), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker=marker, end_marker=end_marker, states='listing',
|
||||
limit=str(limit))),
|
||||
]
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests,
|
||||
query_string='?marker=%s&end_marker=%s&limit=%s'
|
||||
% (marker, end_marker, limit))
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
# reverse with marker, end_marker
|
||||
expected_objects.reverse()
|
||||
mock_responses = [
|
||||
(200, sr_dicts[1:2], root_shard_resp_hdrs),
|
||||
(200, list(reversed(sr_objs[1][2:6])), shard_resp_hdrs[1])
|
||||
]
|
||||
expected_requests = [
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker=end_marker, reverse='true', end_marker=marker,
|
||||
limit=str(limit), states='listing',)), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'}, # 200
|
||||
dict(marker=end_marker, end_marker=marker, states='listing',
|
||||
limit=str(limit), reverse='true')),
|
||||
]
|
||||
self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests,
|
||||
query_string='?marker=%s&end_marker=%s&limit=%s&reverse=true'
|
||||
% (end_marker, marker, limit), reverse=True)
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
def test_GET_sharded_container_with_delimiter(self):
|
||||
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('pie', ''))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in shard_bounds]
|
||||
sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
shard_resp_hdrs = {'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': 2,
|
||||
'X-Container-Bytes-Used': 4,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
# pretend root object stats are not yet updated
|
||||
'X-Container-Object-Count': 6,
|
||||
'X-Container-Bytes-Used': 12,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
root_shard_resp_hdrs = dict(root_resp_hdrs)
|
||||
root_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
sr_0_obj = {'name': 'apple',
|
||||
'bytes': 1,
|
||||
'hash': 'hash',
|
||||
'content_type': 'text/plain',
|
||||
'deleted': 0,
|
||||
'last_modified': next(self.ts_iter).isoformat}
|
||||
sr_2_obj = {'name': 'pumpkin',
|
||||
'bytes': 1,
|
||||
'hash': 'hash',
|
||||
'content_type': 'text/plain',
|
||||
'deleted': 0,
|
||||
'last_modified': next(self.ts_iter).isoformat}
|
||||
subdir = {'subdir': 'ha/'}
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, [sr_0_obj, subdir], shard_resp_hdrs),
|
||||
(200, [], shard_resp_hdrs),
|
||||
(200, [sr_2_obj], shard_resp_hdrs)
|
||||
]
|
||||
expected_requests = [
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing', delimiter='/')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', limit=str(limit),
|
||||
states='listing', delimiter='/')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='ha/', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - 2), delimiter='/')), # 200
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='ha/', end_marker='', states='listing',
|
||||
limit=str(limit - 2), delimiter='/')) # 200
|
||||
]
|
||||
|
||||
expected_objects = [sr_0_obj, subdir, sr_2_obj]
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests,
|
||||
query_string='?delimiter=/')
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
def test_GET_sharded_container_overlapping_shards(self):
|
||||
# verify ordered listing even if unexpected overlapping shard ranges
|
||||
shard_bounds = (('', 'ham', ShardRange.CLEAVED),
|
||||
('', 'pie', ShardRange.ACTIVE),
|
||||
('lemon', '', ShardRange.ACTIVE))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_' + upper, Timestamp.now(), lower, upper,
|
||||
state=state)
|
||||
for lower, upper, state in shard_bounds]
|
||||
sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
sr_objs = [self._make_shard_objects(sr) for sr in shard_ranges]
|
||||
shard_resp_hdrs = [
|
||||
{'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': len(sr_objs[i]),
|
||||
'X-Container-Bytes-Used':
|
||||
sum([obj['bytes'] for obj in sr_objs[i]]),
|
||||
'X-Container-Meta-Flavour': 'flavour%d' % i,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
for i in range(3)]
|
||||
|
||||
all_objects = []
|
||||
for objects in sr_objs:
|
||||
all_objects.extend(objects)
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
# pretend root object stats are not yet updated
|
||||
'X-Container-Object-Count': num_all_objects - 1,
|
||||
'X-Container-Bytes-Used': size_all_objects - 1,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
root_shard_resp_hdrs = dict(root_resp_hdrs)
|
||||
root_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
# forwards listing
|
||||
|
||||
# expect subset of second shard range
|
||||
objs_1 = [o for o in sr_objs[1] if o['name'] > sr_objs[0][-1]['name']]
|
||||
# expect subset of third shard range
|
||||
objs_2 = [o for o in sr_objs[2] if o['name'] > sr_objs[1][-1]['name']]
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(200, objs_1, shard_resp_hdrs[1]),
|
||||
(200, objs_2, shard_resp_hdrs[2])
|
||||
]
|
||||
# NB marker always advances to last object name
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', states='listing',
|
||||
limit=str(limit))), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))), # 200
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='p', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + objs_1)))) # 200
|
||||
]
|
||||
|
||||
expected_objects = sr_objs[0] + objs_1 + objs_2
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs,
|
||||
expected_objects=expected_objects)
|
||||
|
||||
# reverse listing
|
||||
|
||||
# expect subset of third shard range
|
||||
objs_0 = [o for o in sr_objs[0] if o['name'] < sr_objs[1][0]['name']]
|
||||
# expect subset of second shard range
|
||||
objs_1 = [o for o in sr_objs[1] if o['name'] < sr_objs[2][0]['name']]
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, list(reversed(sr_dicts)), root_shard_resp_hdrs),
|
||||
(200, list(reversed(sr_objs[2])), shard_resp_hdrs[2]),
|
||||
(200, list(reversed(objs_1)), shard_resp_hdrs[1]),
|
||||
(200, list(reversed(objs_0)), shard_resp_hdrs[0]),
|
||||
]
|
||||
# NB marker always advances to last object name
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing', reverse='true')), # 200
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='lemon', states='listing',
|
||||
limit=str(limit),
|
||||
reverse='true')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='m', end_marker='', reverse='true', states='listing',
|
||||
limit=str(limit - len(sr_objs[2])))), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='A', end_marker='', reverse='true', states='listing',
|
||||
limit=str(limit - len(sr_objs[2] + objs_1)))) # 200
|
||||
]
|
||||
|
||||
expected_objects = list(reversed(objs_0 + objs_1 + sr_objs[2]))
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests,
|
||||
query_string='?reverse=true', reverse=True)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs,
|
||||
expected_objects=expected_objects)
|
||||
|
||||
def test_GET_sharded_container_gap_in_shards(self):
|
||||
# verify ordered listing even if unexpected gap between shard ranges
|
||||
shard_bounds = (('', 'ham'), ('onion', 'pie'), ('rhubarb', ''))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_' + upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in shard_bounds]
|
||||
sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
sr_objs = [self._make_shard_objects(sr) for sr in shard_ranges]
|
||||
shard_resp_hdrs = [
|
||||
{'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': len(sr_objs[i]),
|
||||
'X-Container-Bytes-Used':
|
||||
sum([obj['bytes'] for obj in sr_objs[i]]),
|
||||
'X-Container-Meta-Flavour': 'flavour%d' % i,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
for i in range(3)]
|
||||
|
||||
all_objects = []
|
||||
for objects in sr_objs:
|
||||
all_objects.extend(objects)
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Container-Object-Count': num_all_objects,
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
root_shard_resp_hdrs = dict(root_resp_hdrs)
|
||||
root_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(200, sr_objs[1], shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2], shard_resp_hdrs[2])
|
||||
]
|
||||
# NB marker always advances to last object name
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', states='listing',
|
||||
limit=str(limit))), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))), # 200
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='p', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + sr_objs[1])))) # 200
|
||||
]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, all_objects, expected_requests)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
def test_GET_sharded_container_empty_shard(self):
|
||||
# verify ordered listing when a shard is empty
|
||||
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('lemon', ''))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in shard_bounds]
|
||||
sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
sr_objs = [self._make_shard_objects(sr) for sr in shard_ranges]
|
||||
# empty second shard range
|
||||
sr_objs[1] = []
|
||||
shard_resp_hdrs = [
|
||||
{'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': len(sr_objs[i]),
|
||||
'X-Container-Bytes-Used':
|
||||
sum([obj['bytes'] for obj in sr_objs[i]]),
|
||||
'X-Container-Meta-Flavour': 'flavour%d' % i,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
for i in range(3)]
|
||||
|
||||
all_objects = []
|
||||
for objects in sr_objs:
|
||||
all_objects.extend(objects)
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Container-Object-Count': num_all_objects,
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
root_shard_resp_hdrs = dict(root_resp_hdrs)
|
||||
root_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(200, sr_objs[1], shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2], shard_resp_hdrs[2])
|
||||
]
|
||||
# NB marker always advances to last object name
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', states='listing',
|
||||
limit=str(limit))), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))), # 200
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + sr_objs[1])))) # 200
|
||||
]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, all_objects, expected_requests)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
# marker in empty second range
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts[1:], root_shard_resp_hdrs),
|
||||
(200, sr_objs[1], shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2], shard_resp_hdrs[2])
|
||||
]
|
||||
# NB marker unchanged when getting from third range
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing', marker='koolaid')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='koolaid', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit))), # 200
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='koolaid', end_marker='', states='listing',
|
||||
limit=str(limit))) # 200
|
||||
]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, sr_objs[2], expected_requests,
|
||||
query_string='?marker=koolaid')
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
# marker in empty second range, reverse
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, list(reversed(sr_dicts[:2])), root_shard_resp_hdrs),
|
||||
(200, list(reversed(sr_objs[1])), shard_resp_hdrs[1]),
|
||||
(200, list(reversed(sr_objs[0])), shard_resp_hdrs[2])
|
||||
]
|
||||
# NB marker unchanged when getting from first range
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing', marker='koolaid', reverse='true')), # 200
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='koolaid', end_marker='ham', reverse='true',
|
||||
states='listing', limit=str(limit))), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='koolaid', end_marker='', reverse='true',
|
||||
states='listing', limit=str(limit))) # 200
|
||||
]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, list(reversed(sr_objs[0])), expected_requests,
|
||||
query_string='?marker=koolaid&reverse=true', reverse=True)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
def _check_GET_sharded_container_shard_error(self, error):
|
||||
# verify ordered listing when a shard is empty
|
||||
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('lemon', ''))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in shard_bounds]
|
||||
sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
sr_objs = [self._make_shard_objects(sr) for sr in shard_ranges]
|
||||
# empty second shard range
|
||||
sr_objs[1] = []
|
||||
shard_resp_hdrs = [
|
||||
{'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': len(sr_objs[i]),
|
||||
'X-Container-Bytes-Used':
|
||||
sum([obj['bytes'] for obj in sr_objs[i]]),
|
||||
'X-Container-Meta-Flavour': 'flavour%d' % i,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
for i in range(3)]
|
||||
|
||||
all_objects = []
|
||||
for objects in sr_objs:
|
||||
all_objects.extend(objects)
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Container-Object-Count': num_all_objects,
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
root_shard_resp_hdrs = dict(root_resp_hdrs)
|
||||
root_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0])] + \
|
||||
[(error, [], {})] * 2 * self.CONTAINER_REPLICAS + \
|
||||
[(200, sr_objs[2], shard_resp_hdrs[2])]
|
||||
|
||||
# NB marker always advances to last object name
|
||||
expected_requests = [
|
||||
# path, headers, params
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 200
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', states='listing',
|
||||
limit=str(limit)))] \
|
||||
+ [(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0]))))
|
||||
] * 2 * self.CONTAINER_REPLICAS \
|
||||
+ [(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + sr_objs[1]))))]
|
||||
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, all_objects, expected_requests)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
def test_GET_sharded_container_shard_errors(self):
|
||||
self._check_GET_sharded_container_shard_error(404)
|
||||
self._check_GET_sharded_container_shard_error(500)
|
||||
|
||||
def test_GET_sharded_container_sharding_shard(self):
|
||||
# one shard is in process of sharding
|
||||
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('pie', ''))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_' + upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in shard_bounds]
|
||||
sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
sr_objs = [self._make_shard_objects(sr) for sr in shard_ranges]
|
||||
shard_resp_hdrs = [
|
||||
{'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': len(sr_objs[i]),
|
||||
'X-Container-Bytes-Used':
|
||||
sum([obj['bytes'] for obj in sr_objs[i]]),
|
||||
'X-Container-Meta-Flavour': 'flavour%d' % i,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
for i in range(3)]
|
||||
shard_1_shard_resp_hdrs = dict(shard_resp_hdrs[1])
|
||||
shard_1_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
# second shard is sharding and has cleaved two out of three sub shards
|
||||
shard_resp_hdrs[1]['X-Backend-Sharding-State'] = 'sharding'
|
||||
sub_shard_bounds = (('ham', 'juice'), ('juice', 'lemon'))
|
||||
sub_shard_ranges = [
|
||||
ShardRange('a/c_sub_' + upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in sub_shard_bounds]
|
||||
sub_sr_dicts = [dict(sr) for sr in sub_shard_ranges]
|
||||
sub_sr_objs = [self._make_shard_objects(sr) for sr in sub_shard_ranges]
|
||||
sub_shard_resp_hdrs = [
|
||||
{'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Container-Object-Count': len(sub_sr_objs[i]),
|
||||
'X-Container-Bytes-Used':
|
||||
sum([obj['bytes'] for obj in sub_sr_objs[i]]),
|
||||
'X-Container-Meta-Flavour': 'flavour%d' % i,
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
for i in range(2)]
|
||||
|
||||
all_objects = []
|
||||
for objects in sr_objs:
|
||||
all_objects.extend(objects)
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Container-Object-Count': num_all_objects,
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
'X-Backend-Storage-Policy-Index': 0}
|
||||
root_shard_resp_hdrs = dict(root_resp_hdrs)
|
||||
root_shard_resp_hdrs['X-Backend-Record-Type'] = 'shard'
|
||||
|
||||
mock_responses = [
|
||||
# status, body, headers
|
||||
(200, sr_dicts, root_shard_resp_hdrs),
|
||||
(200, sr_objs[0], shard_resp_hdrs[0]),
|
||||
(200, sub_sr_dicts + [sr_dicts[1]], shard_1_shard_resp_hdrs),
|
||||
(200, sub_sr_objs[0], sub_shard_resp_hdrs[0]),
|
||||
(200, sub_sr_objs[1], sub_shard_resp_hdrs[1]),
|
||||
(200, sr_objs[1][len(sub_sr_objs[0] + sub_sr_objs[1]):],
|
||||
shard_resp_hdrs[1]),
|
||||
(200, sr_objs[2], shard_resp_hdrs[2])
|
||||
]
|
||||
# NB marker always advances to last object name
|
||||
expected_requests = [
|
||||
# get root shard ranges
|
||||
('a/c', {'X-Backend-Record-Type': 'auto'},
|
||||
dict(states='listing')), # 200
|
||||
# get first shard objects
|
||||
(shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='', end_marker='ham\x00', states='listing',
|
||||
limit=str(limit))), # 200
|
||||
# get second shard sub-shard ranges
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='pie\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))),
|
||||
# get first sub-shard objects
|
||||
(sub_shard_ranges[0].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='h', end_marker='juice\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0])))),
|
||||
# get second sub-shard objects
|
||||
(sub_shard_ranges[1].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='j', end_marker='lemon\x00', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + sub_sr_objs[0])))),
|
||||
# get remainder of first shard objects
|
||||
(shard_ranges[1].name, {'X-Backend-Record-Type': 'object'},
|
||||
dict(marker='l', end_marker='pie\x00',
|
||||
limit=str(limit - len(sr_objs[0] + sub_sr_objs[0] +
|
||||
sub_sr_objs[1])))), # 200
|
||||
# get third shard objects
|
||||
(shard_ranges[2].name, {'X-Backend-Record-Type': 'auto'},
|
||||
dict(marker='p', end_marker='', states='listing',
|
||||
limit=str(limit - len(sr_objs[0] + sr_objs[1])))) # 200
|
||||
]
|
||||
expected_objects = (
|
||||
sr_objs[0] + sub_sr_objs[0] + sub_sr_objs[1] +
|
||||
sr_objs[1][len(sub_sr_objs[0] + sub_sr_objs[1]):] + sr_objs[2])
|
||||
resp = self._check_GET_shard_listing(
|
||||
mock_responses, expected_objects, expected_requests)
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
|
||||
@patch_policies(
|
||||
[StoragePolicy(0, 'zero', True, object_ring=FakeRing(replicas=4))])
|
||||
|
Loading…
x
Reference in New Issue
Block a user