Proxy: Use namespaces when getting listing/updating shards

With the Related-Change, container servers can return a list Namespace
objects in response to a GET request.  This patch modifies the proxy
to take advantage of this when fetching namespaces. Specifically,
the proxy only needs Namespaces when caching 'updating' or 'listing'
shard range metadata.

In order to allow upgrades to clusters we can't just send
'X-Backend-Record-Type = namespace', as old container servers won't
know how to respond. Instead, proxies send a new header
'X-Backend-Record-Shard-Format = namespace' along with the existing
'X-Backend-Record-Type = shard' header. Newer container servers will
return namespaces, old container servers continue to return full
shard ranges and they are parsed as Namespaces by the new proxy.

This patch refactors _get_from_shards to clarify that it does not
require ShardRange objects. The method is now passed a list of
namespaces, which is parsed from the response body before the method
is called. Some unit tests are also refactored to be more realistic
when mocking _get_from_shards.

Also refactor the test_container tests to better test shard-range and
namespace responses from legacy and modern container servers.

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Co-Authored-By: Jianjian Huo <jhuo@nvidia.com>
Related-Change: If152942c168d127de13e11e8da00a5760de5ae0d
Change-Id: I7169fb767525753554a40e28b8c8c2e265d08ecd
This commit is contained in:
Matthew Oliver 2023-09-11 17:27:10 +10:00 committed by Alistair Coles
parent c073933387
commit 03b66c94f4
8 changed files with 920 additions and 801 deletions

View File

@ -1168,6 +1168,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
params = params or {}
params.setdefault('format', 'json')
headers = {'X-Backend-Record-Type': 'shard',
'X-Backend-Record-Shard-Format': 'full',
'X-Backend-Override-Deleted': 'true',
'X-Backend-Include-Deleted': str(include_deleted)}
if newest:

View File

@ -44,8 +44,8 @@ from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
document_iters_to_http_response_body, ShardRange, cache_from_env, \
CooperativeIterator, NamespaceBoundList
document_iters_to_http_response_body, cache_from_env, \
CooperativeIterator, NamespaceBoundList, Namespace
from swift.common.bufferedhttp import http_connect
from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
@ -2459,7 +2459,7 @@ class Controller(object):
data = self._parse_listing_response(req, response)
return data, response
def _parse_shard_ranges(self, req, listing, response):
def _parse_namespaces(self, req, listing, response):
if listing is None:
return None
@ -2471,10 +2471,15 @@ class Controller(object):
return None
try:
return [ShardRange.from_dict(shard_range)
for shard_range in listing]
# Note: a legacy container-server could return a list of
# ShardRanges, but that's ok: namespaces just need 'name', 'lower'
# and 'upper' keys. If we ever need to know we can look for a
# 'x-backend-record-shard-format' header from newer container
# servers.
return [Namespace(data['name'], data['lower'], data['upper'])
for data in listing]
except (ValueError, TypeError, KeyError) as err:
self.logger.error(
"Failed to get shard ranges from %s: invalid data: %r",
"Failed to get namespaces from %s: invalid data: %r",
req.path_qs, err)
return None

View File

@ -19,8 +19,7 @@ import six
from six.moves.urllib.parse import unquote
from swift.common.utils import public, private, csv_append, Timestamp, \
config_true_value, ShardRange, cache_from_env, filter_namespaces, \
NamespaceBoundList
config_true_value, cache_from_env, filter_namespaces, NamespaceBoundList
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
from swift.common.http import HTTP_ACCEPTED, is_success
from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
@ -191,7 +190,7 @@ class ContainerController(Controller):
# available shard's lower. At worst, some misplaced objects, in the gap
# above the shard's upper, may be included in the shard's response.
data = self._parse_listing_response(req, resp)
backend_shard_ranges = self._parse_shard_ranges(req, data, resp)
backend_shard_ranges = self._parse_namespaces(req, data, resp)
if backend_shard_ranges is None:
return None
@ -248,6 +247,10 @@ class ContainerController(Controller):
if (resp_record_type == 'shard' and
sharding_state == 'sharded' and
complete_listing):
# note: old container servers return a list of shard ranges, newer
# ones return a list of namespaces. If we ever need to know we can
# look for a 'x-backend-record-shard-format' header from newer
# container servers.
ns_bound_list = self._store_shard_ranges_in_cache(req, resp)
if ns_bound_list:
resp.body = self._make_namespaces_response_body(
@ -353,6 +356,7 @@ class ContainerController(Controller):
else:
record_type = 'auto'
req.headers['X-Backend-Record-Type'] = 'auto'
req.headers['X-Backend-Record-Shard-Format'] = 'namespace'
params['states'] = 'listing'
req.params = params
@ -394,10 +398,18 @@ class ContainerController(Controller):
if all((req.method == "GET", record_type == 'auto',
resp_record_type.lower() == 'shard')):
resp = self._get_from_shards(req, resp)
data = self._parse_listing_response(req, resp)
namespaces = self._parse_namespaces(req, data, resp)
if namespaces is not None:
# we got namespaces, so the container must be sharded; now
# build the listing from shards
# NB: the filtered namespaces list may be empty but we still
# need to build a response body with an empty list of shards
resp = self._get_from_shards(req, resp, namespaces)
if orig_record_type not in ('object', 'shard'):
resp.headers.pop('X-Backend-Record-Type', None)
resp.headers.pop('X-Backend-Record-Shard-Format', None)
if not config_true_value(
resp.headers.get('X-Backend-Cached-Results')):
@ -425,17 +437,28 @@ class ContainerController(Controller):
'False'))
return resp
def _get_from_shards(self, req, resp):
# Construct listing using shards described by the response body.
# The history of containers that have returned shard ranges is
def _get_from_shards(self, req, resp, namespaces):
"""
Construct an object listing using shards described by the list of
namespaces.
:param req: an instance of :class:`~swift.common.swob.Request`.
:param resp: an instance of :class:`~swift.common.swob.Response`.
:param namespaces: a list of :class:`~swift.common.utils.Namespace`.
:return: an instance of :class:`~swift.common.swob.Response`. If an
error is encountered while building the listing an instance of
``HTTPServiceUnavailable`` may be returned. Otherwise, the given
``resp`` is returned with a body that is an object listing.
"""
# The history of containers that have returned namespaces is
# maintained in the request environ so that loops can be avoided by
# forcing an object listing if the same container is visited again.
# This can happen in at least two scenarios:
# 1. a container has filled a gap in its shard ranges with a
# shard range pointing to itself
# 2. a root container returns a (stale) shard range pointing to a
# 1. a container has filled a gap in its namespaces with a
# namespace pointing to itself
# 2. a root container returns a (stale) namespace pointing to a
# shard that has shrunk into the root, in which case the shrunken
# shard may return the root's shard range.
# shard may return the root's namespace.
shard_listing_history = req.environ.setdefault(
'swift.shard_listing_history', [])
policy_key = 'X-Backend-Storage-Policy-Index'
@ -443,28 +466,15 @@ class ContainerController(Controller):
# We're handling the original request to the root container: set
# the root policy index in the request, unless it is already set,
# so that shards will return listings for that policy index.
# Note: we only get here if the root responded with shard ranges,
# or if the shard ranges were cached and the cached root container
# Note: we only get here if the root responded with namespaces,
# or if the namespaces were cached and the cached root container
# info has sharding_state==sharded; in both cases we can assume
# that the response is "modern enough" to include
# 'X-Backend-Storage-Policy-Index'.
req.headers[policy_key] = resp.headers[policy_key]
shard_listing_history.append((self.account_name, self.container_name))
# Note: when the response body has been synthesised from cached data,
# each item in the list only has 'name', 'lower' and 'upper' keys. We
# therefore cannot use ShardRange.from_dict(), and the ShardRange
# instances constructed here will only have 'name', 'lower' and 'upper'
# attributes set.
# Ideally we would construct Namespace objects here, but later we use
# the ShardRange account and container properties to access parsed
# parts of the name.
shard_ranges = [ShardRange(**data) for data in json.loads(resp.body)]
self.logger.debug('GET listing from %s shards for: %s',
len(shard_ranges), req.path_qs)
if not shard_ranges:
# can't find ranges or there was a problem getting the ranges. So
# return what we have.
return resp
len(namespaces), req.path_qs)
objects = []
req_limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
@ -478,12 +488,12 @@ class ContainerController(Controller):
limit = req_limit
all_resp_status = []
for i, shard_range in enumerate(shard_ranges):
for i, namespace in enumerate(namespaces):
params['limit'] = limit
# Always set marker to ensure that object names less than or equal
# to those already in the listing are not fetched; if the listing
# is empty then the original request marker, if any, is used. This
# allows misplaced objects below the expected shard range to be
# allows misplaced objects below the expected namespace to be
# included in the listing.
last_name = ''
last_name_was_subdir = False
@ -502,18 +512,18 @@ class ContainerController(Controller):
else:
params['marker'] = ''
# Always set end_marker to ensure that misplaced objects beyond the
# expected shard range are not fetched. This prevents a misplaced
# expected namespace are not fetched. This prevents a misplaced
# object obscuring correctly placed objects in the next shard
# range.
if end_marker and end_marker in shard_range:
if end_marker and end_marker in namespace:
params['end_marker'] = str_to_wsgi(end_marker)
elif reverse:
params['end_marker'] = str_to_wsgi(shard_range.lower_str)
params['end_marker'] = str_to_wsgi(namespace.lower_str)
else:
params['end_marker'] = str_to_wsgi(shard_range.end_marker)
params['end_marker'] = str_to_wsgi(namespace.end_marker)
headers = {}
if ((shard_range.account, shard_range.container) in
if ((namespace.account, namespace.container) in
shard_listing_history):
# directed back to same container - force GET of objects
headers['X-Backend-Record-Type'] = 'object'
@ -521,26 +531,26 @@ class ContainerController(Controller):
headers['X-Newest'] = 'true'
if prefix:
if prefix > shard_range:
if prefix > namespace:
continue
try:
just_past = prefix[:-1] + chr(ord(prefix[-1]) + 1)
except ValueError:
pass
else:
if just_past < shard_range:
if just_past < namespace:
continue
if last_name_was_subdir and str(
shard_range.lower if reverse else shard_range.upper
namespace.lower if reverse else namespace.upper
).startswith(last_name):
continue
self.logger.debug(
'Getting listing part %d from shard %s %s with %s',
i, shard_range, shard_range.name, headers)
i, namespace, namespace.name, headers)
objs, shard_resp = self._get_container_listing(
req, shard_range.account, shard_range.container,
req, namespace.account, namespace.container,
headers=headers, params=params)
all_resp_status.append(shard_resp.status_int)

View File

@ -48,7 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
find_namespace, NamespaceBoundList, CooperativeIterator, ShardRange)
NamespaceBoundList, CooperativeIterator)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@ -281,31 +281,32 @@ class BaseObjectController(Controller):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
def _get_updating_shard_ranges(
def _get_updating_namespaces(
self, req, account, container, includes=None):
"""
Fetch shard ranges in 'updating' states from given `account/container`.
Fetch namespaces in 'updating' states from given `account/container`.
If `includes` is given then the shard range for that object name is
requested, otherwise all shard ranges are requested.
requested, otherwise all namespaces are requested.
:param req: original Request instance.
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param includes: (optional) restricts the list of fetched shard ranges
:param account: account from which namespaces should be fetched.
:param container: container from which namespaces should be fetched.
:param includes: (optional) restricts the list of fetched namespaces
to those which include the given name.
:return: a list of instances of :class:`swift.common.utils.ShardRange`,
or None if there was a problem fetching the shard ranges
:return: a list of instances of :class:`swift.common.utils.Namespace`,
or None if there was a problem fetching the namespaces.
"""
params = req.params.copy()
params.pop('limit', None)
params['format'] = 'json'
params['states'] = 'updating'
headers = {'X-Backend-Record-Type': 'shard',
'X-Backend-Record-Shard-Format': 'namespace'}
if includes:
params['includes'] = str_to_wsgi(includes)
headers = {'X-Backend-Record-Type': 'shard'}
listing, response = self._get_container_listing(
req, account, container, headers=headers, params=params)
return self._parse_shard_ranges(req, listing, response), response
return self._parse_namespaces(req, listing, response), response
def _get_update_shard_caching_disabled(self, req, account, container, obj):
"""
@ -316,17 +317,17 @@ class BaseObjectController(Controller):
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param obj: object getting updated.
:return: an instance of :class:`swift.common.utils.ShardRange`,
:return: an instance of :class:`swift.common.utils.Namespace`,
or None if the update should go back to the root
"""
# legacy behavior requests container server for includes=obj
shard_ranges, response = self._get_updating_shard_ranges(
namespaces, response = self._get_updating_namespaces(
req, account, container, includes=obj)
record_cache_op_metrics(
self.logger, self.server_type.lower(), 'shard_updating',
'disabled', response)
# there will be only one shard range in the list if any
return shard_ranges[0] if shard_ranges else None
# there will be only one Namespace in the list if any
return namespaces[0] if namespaces else None
def _get_update_shard(self, req, account, container, obj):
"""
@ -340,7 +341,7 @@ class BaseObjectController(Controller):
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param obj: object getting updated.
:return: an instance of :class:`swift.common.utils.ShardRange`,
:return: an instance of :class:`swift.common.utils.Namespace`,
or None if the update should go back to the root
"""
if not self.app.recheck_updating_shard_ranges:
@ -354,20 +355,15 @@ class BaseObjectController(Controller):
skip_chance = self.app.container_updating_shard_ranges_skip_cache
ns_bound_list, get_cache_state = get_namespaces_from_cache(
req, cache_key, skip_chance)
if ns_bound_list:
# found cached namespaces in either infocache or memcache
namespace = ns_bound_list.get_namespace(obj)
update_shard = ShardRange(
name=namespace.name, timestamp=0, lower=namespace.lower,
upper=namespace.upper)
else:
# pull full set of updating shard ranges from backend
shard_ranges, response = self._get_updating_shard_ranges(
if not ns_bound_list:
# namespaces not found in either infocache or memcache so pull full
# set of updating shard ranges from backend
namespaces, response = self._get_updating_namespaces(
req, account, container)
if shard_ranges:
if namespaces:
# only store the list of namespace lower bounds and names into
# infocache and memcache.
ns_bound_list = NamespaceBoundList.parse(shard_ranges)
ns_bound_list = NamespaceBoundList.parse(namespaces)
set_cache_state = set_namespaces_in_cache(
req, cache_key, ns_bound_list,
self.app.recheck_updating_shard_ranges)
@ -377,23 +373,22 @@ class BaseObjectController(Controller):
if set_cache_state == 'set':
self.logger.info(
'Caching updating shards for %s (%d shards)',
cache_key, len(shard_ranges))
update_shard = find_namespace(obj, shard_ranges or [])
cache_key, len(namespaces))
record_cache_op_metrics(
self.logger, self.server_type.lower(), 'shard_updating',
get_cache_state, response)
return update_shard
return ns_bound_list.get_namespace(obj) if ns_bound_list else None
def _get_update_target(self, req, container_info):
# find the sharded container to which we'll send the update
db_state = container_info.get('sharding_state', 'unsharded')
if db_state in ('sharded', 'sharding'):
shard_range = self._get_update_shard(
update_shard_ns = self._get_update_shard(
req, self.account_name, self.container_name, self.object_name)
if shard_range:
if update_shard_ns:
partition, nodes = self.app.container_ring.get_nodes(
shard_range.account, shard_range.container)
return partition, nodes, shard_range.name
update_shard_ns.account, update_shard_ns.container)
return partition, nodes, update_shard_ns.name
return container_info['partition'], container_info['nodes'], None

View File

@ -194,6 +194,7 @@ class BaseTestContainerSharding(ReplProbeTest):
self.assertEqual('object', resp_record_type)
else:
self.assertIsNone(resp_record_type)
self.assertNotIn('X-Backend-Record-Shard-Format', resp.headers)
return json.loads(resp.body)
def get_container_shard_ranges(self, account=None, container=None,

View File

@ -1283,7 +1283,8 @@ class TestSharder(BaseTestSharder):
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Backend-Include-Deleted': 'False',
'X-Backend-Override-Deleted': 'true'}
'X-Backend-Override-Deleted': 'true',
'X-Backend-Record-Shard-Format': 'full'}
broker = self._make_broker()
shard_ranges = self._make_shard_ranges((('', 'm'), ('m', '')))
@ -6325,8 +6326,9 @@ class TestSharder(BaseTestSharder):
as mocked, mock.patch.object(
sharder, 'int_client') as mock_swift:
mock_response = mock.MagicMock()
mock_response.headers = {'x-backend-record-type':
'shard'}
mock_response.headers = {
'x-backend-record-type': 'shard',
'X-Backend-Record-Shard-Format': 'full'}
shard_ranges.sort(key=ShardRange.sort_key)
mock_response.body = json.dumps(
[dict(sr) for sr in shard_ranges])
@ -6348,7 +6350,8 @@ class TestSharder(BaseTestSharder):
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
'X-Backend-Override-Deleted': 'true',
'X-Backend-Record-Shard-Format': 'full'}
params = {'format': 'json', 'marker': marker, 'end_marker': end_marker,
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(
@ -6434,7 +6437,8 @@ class TestSharder(BaseTestSharder):
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
'X-Backend-Override-Deleted': 'true',
'X-Backend-Record-Shard-Format': 'full'}
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k',
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(
@ -6471,7 +6475,8 @@ class TestSharder(BaseTestSharder):
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
'X-Backend-Override-Deleted': 'true',
'X-Backend-Record-Shard-Format': 'full'}
params = {'format': 'json', 'marker': 'k', 'end_marker': 't',
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(
@ -6608,7 +6613,8 @@ class TestSharder(BaseTestSharder):
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
'X-Backend-Override-Deleted': 'true',
'X-Backend-Record-Shard-Format': 'full'}
params = {'format': 'json', 'marker': 'k', 'end_marker': 't',
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(
@ -6690,7 +6696,8 @@ class TestSharder(BaseTestSharder):
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
'X-Backend-Override-Deleted': 'true',
'X-Backend-Record-Shard-Format': 'full'}
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(
@ -7391,7 +7398,8 @@ class TestSharder(BaseTestSharder):
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
'X-Backend-Override-Deleted': 'true',
'X-Backend-Record-Shard-Format': 'full'}
params = {'format': 'json', 'marker': 'a', 'end_marker': 'd',
'states': 'auditing'}
mock_swift.make_request.assert_called_once_with(

File diff suppressed because it is too large Load Diff

View File

@ -42,7 +42,7 @@ from swift.common import utils, swob, exceptions
from swift.common.exceptions import ChunkWriteTimeout, ShortReadError, \
ChunkReadTimeout
from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter, \
ShardRange
ShardRange, Namespace, NamespaceBoundList
from swift.proxy import server as proxy_server
from swift.proxy.controllers import obj
from swift.proxy.controllers.base import \
@ -56,7 +56,7 @@ from test.unit import (
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub,
fake_ec_node_response, StubResponse, mocked_http_conn,
quiet_eventlet_exceptions, FakeSource, make_timestamp_iter)
quiet_eventlet_exceptions, FakeSource, make_timestamp_iter, FakeMemcache)
from test.unit.proxy.test_server import node_error_count
@ -7241,95 +7241,223 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase):
@patch_policies()
class TestGetUpdatingShardRanges(BaseObjectControllerMixin, unittest.TestCase):
class TestGetUpdateShard(BaseObjectControllerMixin, unittest.TestCase):
bound_prefix = 'x'
item = 'x1_test'
def setUp(self):
super(TestGetUpdatingShardRanges, self).setUp()
super(TestGetUpdateShard, self).setUp()
self.ctrl = obj.BaseObjectController(self.app, 'a', 'c', 'o')
self.memcache = FakeMemcache()
ts_iter = make_timestamp_iter()
# NB: these shard ranges have gaps
self.shard_ranges = [ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter),
self.bound_prefix + u'%d_lower' % i,
self.bound_prefix + u'%d_upper' % i,
object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter))
for i in range(3)]
def _create_response_data(self, shards, includes=None):
resp_headers = {'X-Backend-Record-Type': 'shard',
'X-Backend-Record-Shard-Format': 'namespace'}
namespaces = [Namespace(sr.name, sr.lower, sr.upper)
for sr in shards]
if includes is not None:
namespaces = [ns for ns in namespaces if includes in ns]
body = json.dumps([dict(ns) for ns in namespaces]).encode('ascii')
return body, resp_headers
def test_get_update_shard_cache_writing(self):
# verify case when complete set of shards is returned
req = Request.blank('/v1/a/c/o', method='PUT',
environ={'swift.cache': self.memcache})
body, resp_headers = self._create_response_data(self.shard_ranges)
with mocked_http_conn(
200, 200, body_iter=iter([b'', body]),
headers=resp_headers) as fake_conn:
actual = self.ctrl._get_update_shard(req, 'a', 'c', self.item)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'states=updating'], params)
captured_hdrs = captured[1]['headers']
self.assertEqual('shard', captured_hdrs.get('X-Backend-Record-Type'))
self.assertEqual('namespace',
captured_hdrs.get('X-Backend-Record-Shard-Format'))
exp_bounds = NamespaceBoundList.parse(self.shard_ranges).bounds
self.assertEqual(json.loads(json.dumps(exp_bounds)),
self.memcache.get('shard-updating-v2/a/c'))
exp_ns = Namespace(self.shard_ranges[1].name,
self.shard_ranges[1].lower,
self.shard_ranges[2].lower)
self.assertEqual(exp_ns, actual)
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_update_shard_cache_reading(self):
# verify case when complete set of shards is in cache
cached_bounds = NamespaceBoundList.parse(self.shard_ranges).bounds
self.memcache.set('shard-updating-v2/a/c', cached_bounds)
req = Request.blank('/v1/a/c/o', method='PUT',
environ={'swift.cache': self.memcache})
actual = self.ctrl._get_update_shard(req, 'a', 'c', self.item)
self.assertEqual(json.loads(json.dumps(cached_bounds)),
self.memcache.get('shard-updating-v2/a/c'))
exp_ns = Namespace(self.shard_ranges[1].name,
self.shard_ranges[1].lower,
self.shard_ranges[2].lower)
self.assertEqual(exp_ns, actual)
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_update_shard_cache_recheck_time_zero(self):
# verify case when shard caching is disabled
req = Request.blank('/v1/a/c/o', method='PUT',
environ={'swift.cache': self.memcache})
body, resp_headers = self._create_response_data(
self.shard_ranges, self.item)
with mock.patch.object(self.app, 'recheck_updating_shard_ranges', 0):
with mocked_http_conn(
200, 200, body_iter=iter([b'', body]),
headers=resp_headers) as fake_conn:
actual = self.ctrl._get_update_shard(req, 'a', 'c', self.item)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=' + quote(self.item), 'states=updating'],
params)
captured_hdrs = captured[1]['headers']
self.assertEqual('shard', captured_hdrs.get('X-Backend-Record-Type'))
self.assertEqual('namespace',
captured_hdrs.get('X-Backend-Record-Shard-Format'))
self.assertIsNone(self.memcache.get('shard-updating-v2/a/c'))
exp_ns = Namespace(self.shard_ranges[1].name,
self.shard_ranges[1].lower,
self.shard_ranges[1].upper)
self.assertEqual(exp_ns, actual)
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_update_shard_cache_not_available(self):
# verify case when memcache is not available
req = Request.blank('/v1/a/c/o', method='PUT')
body, resp_headers = self._create_response_data(self.shard_ranges)
with mocked_http_conn(
200, 200, body_iter=iter([b'', body]),
headers=resp_headers) as fake_conn:
actual = self.ctrl._get_update_shard(req, 'a', 'c', self.item)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'states=updating'], params)
captured_hdrs = captured[1]['headers']
self.assertEqual('shard', captured_hdrs.get('X-Backend-Record-Type'))
self.assertEqual('namespace',
captured_hdrs.get('X-Backend-Record-Shard-Format'))
self.assertIsNone(self.memcache.get('shard-updating-v2/a/c'))
exp_ns = Namespace(self.shard_ranges[1].name,
self.shard_ranges[1].lower,
self.shard_ranges[2].lower)
self.assertEqual(exp_ns, actual)
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_update_shard_empty_body(self):
# verify case when no shards are returned
req = Request.blank('/v1/a/c/o', method='PUT',
environ={'swift.cache': self.memcache})
body, resp_headers = self._create_response_data(self.shard_ranges)
with mocked_http_conn(
200, 200, body_iter=[b'', b''],
headers=resp_headers) as fake_conn:
actual = self.ctrl._get_update_shard(req, 'a', 'c', self.item)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'states=updating'], params)
captured_hdrs = captured[1]['headers']
self.assertEqual('shard', captured_hdrs.get('X-Backend-Record-Type'))
self.assertEqual('namespace',
captured_hdrs.get('X-Backend-Record-Shard-Format'))
self.assertIsNone(self.memcache.get('shard-updating-v2/a/c'))
self.assertIsNone(actual)
lines = self.app.logger.get_lines_for_level('error')
self.assertEqual(1, len(lines))
self.assertIn('Problem with listing response from /v1/a/c/o', lines[0])
class TestGetUpdateShardUTF8(TestGetUpdateShard):
bound_prefix = u'\u1234'
item = wsgi_to_str('\xe1\x88\xb41_test')
class TestGetUpdateShardLegacy(TestGetUpdateShard):
def _create_response_data(self, shards, includes=None):
# older container servers never return the shorter 'namespace' format
# nor the 'X-Backend-Record-Shard-Format' header
resp_headers = {'X-Backend-Record-Type': 'shard'}
if includes is not None:
shards = [sr for sr in shards if includes in sr]
body = json.dumps([dict(sr) for sr in shards]).encode('ascii')
return body, resp_headers
class TestGetUpdateShardLegacyUTF8(TestGetUpdateShard):
bound_prefix = u'\u1234'
item = wsgi_to_str('\xe1\x88\xb41_test')
@patch_policies()
class TestGetUpdatingNamespacesErrors(BaseObjectControllerMixin,
unittest.TestCase):
def setUp(self):
super(TestGetUpdatingNamespacesErrors, self).setUp()
self.ctrl = obj.BaseObjectController(self.app, 'a', 'c', 'o')
def test_get_shard_ranges_for_object_put(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
'%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
req = Request.blank('/v1/a/c/o', method='PUT')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'',
json.dumps(shard_ranges[1:2]).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=1_test', 'states=updating'], params)
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_shard_ranges_for_utf8_object_put(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), u'\u1234%d_lower' % i,
u'\u1234%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
req = Request.blank('/v1/a/c/o', method='PUT')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'',
json.dumps(shard_ranges[1:2]).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', wsgi_to_str('\xe1\x88\xb41_test'))
self.assertEqual(200, resp.status_int)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=%E1%88%B41_test', 'states=updating'],
params)
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def _check_get_shard_ranges_bad_data(self, body):
def _check_get_namespaces_bad_data(self, body):
req = Request.blank('/v1/a/c/o', method='PUT')
# empty response
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(200, 200, body_iter=iter([b'', body]),
headers=resp_headers):
actual, resp = self.ctrl._get_updating_shard_ranges(
actual, resp = self.ctrl._get_updating_namespaces(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
lines = self.app.logger.get_lines_for_level('error')
return lines
def test_get_shard_ranges_empty_body(self):
error_lines = self._check_get_shard_ranges_bad_data(b'')
def test_get_namespaces_empty_body(self):
error_lines = self._check_get_namespaces_bad_data(b'')
self.assertIn('Problem with listing response', error_lines[0])
if six.PY2:
self.assertIn('No JSON', error_lines[0])
@ -7337,36 +7465,36 @@ class TestGetUpdatingShardRanges(BaseObjectControllerMixin, unittest.TestCase):
self.assertIn('JSONDecodeError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_not_a_list(self):
def test_get_namespaces_not_a_list(self):
body = json.dumps({}).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
error_lines = self._check_get_namespaces_bad_data(body)
self.assertIn('Problem with listing response', error_lines[0])
self.assertIn('not a list', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_key_missing(self):
def test_get_namespaces_key_missing(self):
body = json.dumps([{}]).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Failed to get shard ranges', error_lines[0])
error_lines = self._check_get_namespaces_bad_data(body)
self.assertIn('Failed to get namespaces', error_lines[0])
self.assertIn('KeyError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_invalid_shard_range(self):
sr = ShardRange('a/c', Timestamp.now())
bad_sr_data = dict(sr, name='bad_name')
body = json.dumps([bad_sr_data]).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Failed to get shard ranges', error_lines[0])
def test_get_namespaces_invalid_shard_range(self):
# lower > upper !
bad_ns_data = {'name': 'name', 'lower': 'z', 'upper': 'a'}
body = json.dumps([bad_ns_data]).encode('ascii')
error_lines = self._check_get_namespaces_bad_data(body)
self.assertIn('Failed to get namespaces', error_lines[0])
self.assertIn('ValueError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_missing_record_type(self):
def test_get_namespaces_missing_record_type(self):
req = Request.blank('/v1/a/c/o', method='PUT')
sr = ShardRange('a/c', Timestamp.now())
sr = utils.Namespace('a/c', 'l', 'u')
body = json.dumps([dict(sr)]).encode('ascii')
with mocked_http_conn(
200, 200, body_iter=iter([b'', body])):
actual, resp = self.ctrl._get_updating_shard_ranges(
actual, resp = self.ctrl._get_updating_namespaces(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
@ -7376,15 +7504,15 @@ class TestGetUpdatingShardRanges(BaseObjectControllerMixin, unittest.TestCase):
self.assertIn('/a/c', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_wrong_record_type(self):
def test_get_namespaces_wrong_record_type(self):
req = Request.blank('/v1/a/c/o', method='PUT')
sr = ShardRange('a/c', Timestamp.now())
sr = utils.Namespace('a/c', 'l', 'u')
body = json.dumps([dict(sr)]).encode('ascii')
headers = {'X-Backend-Record-Type': 'object'}
with mocked_http_conn(
200, 200, body_iter=iter([b'', body]),
headers=headers):
actual, resp = self.ctrl._get_updating_shard_ranges(
actual, resp = self.ctrl._get_updating_namespaces(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
@ -7394,10 +7522,10 @@ class TestGetUpdatingShardRanges(BaseObjectControllerMixin, unittest.TestCase):
self.assertIn('/a/c', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_request_failed(self):
def test_get_namespaces_request_failed(self):
req = Request.blank('/v1/a/c/o', method='PUT')
with mocked_http_conn(200, 404, 404, 404):
actual, resp = self.ctrl._get_updating_shard_ranges(
actual, resp = self.ctrl._get_updating_namespaces(
req, 'a', 'c', '1_test')
self.assertEqual(404, resp.status_int)
self.assertIsNone(actual)