Use cached shard ranges for container GETs
This patch makes four significant changes to the handling of GET requests for sharding or sharded containers: - container server GET requests may now result in the entire list of shard ranges being returned for the 'listing' state regardless of any request parameter constraints. - the proxy server may cache that list of shard ranges in memcache and the requests environ infocache dict, and subsequently use the cached shard ranges when handling GET requests for the same container. - the proxy now caches more container metadata so that it can synthesize a complete set of container GET response headers from cache. - the proxy server now enforces more container GET request validity checks that were previously only enforced by the backend server, e.g. checks for valid request parameter values With this change, when the proxy learns from container metadata that the container is sharded then it will cache shard ranges fetched from the backend during a container GET in memcache. On subsequent container GETs the proxy will use the cached shard ranges to gather object listings from shard containers, avoiding further GET requests to the root container until the cached shard ranges expire from cache. Cached shard ranges are most useful if they cover the entire object name space in the container. The proxy therefore uses a new X-Backend-Override-Shard-Name-Filter header to instruct the container server to ignore any request parameters that would constrain the returned shard range listing i.e. 'marker', 'end_marker', 'includes' and 'reverse' parameters. Having obtained the entire shard range listing (either from the server or from cache) the proxy now applies those request parameter constraints itself when constructing the client response. When using cached shard ranges the proxy will synthesize response headers from the container metadata that is also in cache. To enable the full set of container GET response headers to be synthezised in this way, the set of metadata that the proxy caches when handling a backend container GET response is expanded to include various timestamps. The X-Newest header may be used to disable looking up shard ranges in cache. Change-Id: I5fc696625d69d1ee9218ee2a508a1b9be6cf9685
This commit is contained in:
parent
4539837647
commit
077ba77ea6
@ -131,12 +131,19 @@ use = egg:swift#proxy
|
||||
# recheck_account_existence = 60
|
||||
# recheck_container_existence = 60
|
||||
#
|
||||
# How long the proxy should cache a set of shard ranges for a container.
|
||||
# How long the proxy should cache a set of shard ranges for a container when
|
||||
# the set is to be used for directing object updates.
|
||||
# Note that stale shard range info should be fine; updates will still
|
||||
# eventually make their way to the correct shard. As a result, you can
|
||||
# usually set this much higher than the existence checks above.
|
||||
# recheck_updating_shard_ranges = 3600
|
||||
#
|
||||
# How long the proxy should cache a set of shard ranges for a container when
|
||||
# the set is to be used for gathering object listings.
|
||||
# Note that stale shard range info might result in incomplete object listings
|
||||
# so this value should be set less than recheck_updating_shard_ranges.
|
||||
# recheck_listing_shard_ranges = 600
|
||||
#
|
||||
# object_chunk_size = 65536
|
||||
# client_chunk_size = 65536
|
||||
#
|
||||
@ -973,7 +980,7 @@ use = egg:swift#proxy_logging
|
||||
# log_anonymization_method = MD5
|
||||
#
|
||||
# Salt added during log anonymization
|
||||
# log_anonymization_salt =
|
||||
# log_anonymization_salt =
|
||||
#
|
||||
# Template used to format access logs. All words surrounded by curly brackets
|
||||
# will be substituted with the appropriate values
|
||||
|
@ -28,7 +28,8 @@ import six
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
|
||||
from swift import gettext_ as _
|
||||
from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX
|
||||
from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX, \
|
||||
CONTAINER_LISTING_LIMIT
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.exceptions import ListingIterError, SegmentError
|
||||
from swift.common.http import is_success, is_server_error
|
||||
@ -61,7 +62,7 @@ else:
|
||||
|
||||
def get_param(req, name, default=None):
|
||||
"""
|
||||
Get parameters from an HTTP request ensuring proper handling UTF-8
|
||||
Get a parameter from an HTTP request ensuring proper handling UTF-8
|
||||
encoding.
|
||||
|
||||
:param req: request object
|
||||
@ -94,6 +95,27 @@ def get_param(req, name, default=None):
|
||||
return value
|
||||
|
||||
|
||||
def validate_params(req, names):
|
||||
"""
|
||||
Get list of parameters from an HTTP request, validating the encoding of
|
||||
each parameter.
|
||||
|
||||
:param req: request object
|
||||
:param names: parameter names
|
||||
:returns: a dict mapping parameter names to values for each name that
|
||||
appears in the request parameters
|
||||
:raises HTTPBadRequest: if any parameter value is not a valid UTF-8 byte
|
||||
sequence
|
||||
"""
|
||||
params = {}
|
||||
for name in names:
|
||||
value = get_param(req, name)
|
||||
if value is None:
|
||||
continue
|
||||
params[name] = value
|
||||
return params
|
||||
|
||||
|
||||
def constrain_req_limit(req, constrained_limit):
|
||||
given_limit = get_param(req, 'limit')
|
||||
limit = constrained_limit
|
||||
@ -105,6 +127,14 @@ def constrain_req_limit(req, constrained_limit):
|
||||
return limit
|
||||
|
||||
|
||||
def validate_container_params(req):
|
||||
params = validate_params(req, ('marker', 'end_marker', 'prefix',
|
||||
'delimiter', 'path', 'format', 'reverse',
|
||||
'states', 'includes'))
|
||||
params['limit'] = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
|
||||
return params
|
||||
|
||||
|
||||
def _validate_internal_name(name, type_='name'):
|
||||
if RESERVED in name and not name.startswith(RESERVED):
|
||||
raise HTTPBadRequest(body='Invalid reserved-namespace %s' % (type_))
|
||||
|
@ -5496,6 +5496,25 @@ def find_shard_range(item, ranges):
|
||||
return None
|
||||
|
||||
|
||||
def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
|
||||
if includes:
|
||||
shard_range = find_shard_range(includes, shard_ranges)
|
||||
return [shard_range] if shard_range else []
|
||||
|
||||
def shard_range_filter(sr):
|
||||
end = start = True
|
||||
if end_marker:
|
||||
end = end_marker > sr.lower
|
||||
if marker:
|
||||
start = marker < sr.upper
|
||||
return start and end
|
||||
|
||||
if marker or end_marker:
|
||||
return list(filter(shard_range_filter, shard_ranges))
|
||||
|
||||
return shard_ranges
|
||||
|
||||
|
||||
def modify_priority(conf, logger):
|
||||
"""
|
||||
Modify priority by nice and ionice.
|
||||
|
@ -30,9 +30,9 @@ from swift.common.constraints import CONTAINER_LISTING_LIMIT
|
||||
from swift.common.exceptions import LockTimeout
|
||||
from swift.common.utils import Timestamp, encode_timestamps, \
|
||||
decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \
|
||||
ShardRange, renamer, find_shard_range, MD5_OF_EMPTY_STRING, mkdirs, \
|
||||
get_db_files, parse_db_filename, make_db_file_path, split_path, \
|
||||
RESERVED_BYTE
|
||||
ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \
|
||||
parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \
|
||||
filter_shard_ranges
|
||||
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
|
||||
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
|
||||
|
||||
@ -1757,14 +1757,6 @@ class ContainerBroker(DatabaseBroker):
|
||||
at the tail of other shard ranges.
|
||||
:return: a list of instances of :class:`swift.common.utils.ShardRange`
|
||||
"""
|
||||
def shard_range_filter(sr):
|
||||
end = start = True
|
||||
if end_marker:
|
||||
end = end_marker > sr.lower
|
||||
if marker:
|
||||
start = marker < sr.upper
|
||||
return start and end
|
||||
|
||||
if reverse:
|
||||
marker, end_marker = end_marker, marker
|
||||
if marker and end_marker and marker >= end_marker:
|
||||
@ -1776,14 +1768,13 @@ class ContainerBroker(DatabaseBroker):
|
||||
include_deleted=include_deleted, states=states,
|
||||
include_own=include_own,
|
||||
exclude_others=exclude_others)]
|
||||
shard_ranges.sort(key=ShardRange.sort_key)
|
||||
if includes:
|
||||
shard_range = find_shard_range(includes, shard_ranges)
|
||||
return [shard_range] if shard_range else []
|
||||
|
||||
if marker or end_marker:
|
||||
shard_ranges = list(filter(shard_range_filter, shard_ranges))
|
||||
if fill_gaps:
|
||||
shard_ranges.sort(key=ShardRange.sort_key)
|
||||
|
||||
shard_ranges = filter_shard_ranges(shard_ranges, includes,
|
||||
marker, end_marker)
|
||||
|
||||
if not includes and fill_gaps:
|
||||
if shard_ranges:
|
||||
last_upper = shard_ranges[-1].upper
|
||||
else:
|
||||
|
@ -32,9 +32,9 @@ from swift.container.backend import ContainerBroker, DATADIR, \
|
||||
from swift.container.replicator import ContainerReplicatorRpc
|
||||
from swift.common.db import DatabaseAlreadyExists
|
||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||
from swift.common.request_helpers import get_param, \
|
||||
split_and_validate_path, is_sys_or_user_meta, \
|
||||
validate_internal_container, validate_internal_obj, constrain_req_limit
|
||||
from swift.common.request_helpers import split_and_validate_path, \
|
||||
is_sys_or_user_meta, validate_internal_container, validate_internal_obj, \
|
||||
validate_container_params
|
||||
from swift.common.utils import get_logger, hash_path, public, \
|
||||
Timestamp, storage_directory, validate_sync_to, \
|
||||
config_true_value, timing_stats, replication, \
|
||||
@ -43,7 +43,6 @@ from swift.common.utils import get_logger, hash_path, public, \
|
||||
ShardRange
|
||||
from swift.common.constraints import valid_timestamp, check_utf8, \
|
||||
check_drive, AUTO_CREATE_ACCOUNT_PREFIX
|
||||
from swift.common import constraints
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.http import HTTP_NO_CONTENT, HTTP_NOT_FOUND, is_success
|
||||
@ -647,6 +646,19 @@ class ContainerController(BaseStorageServer):
|
||||
``sharded``, then the listing will be a list of shard ranges;
|
||||
otherwise the response body will be a list of objects.
|
||||
|
||||
* Both shard range and object listings may be filtered according to
|
||||
the constraints described below. However, the
|
||||
``X-Backend-Ignore-Shard-Name-Filter`` header may be used to override
|
||||
the application of the ``marker``, ``end_marker``, ``includes`` and
|
||||
``reverse`` parameters to shard range listings. These parameters will
|
||||
be ignored if the header has the value 'sharded' and the current db
|
||||
sharding state is also 'sharded'. Note that this header does not
|
||||
override the ``states`` constraint on shard range listings.
|
||||
|
||||
* The order of both shard range and object listings may be reversed by
|
||||
using a ``reverse`` query string parameter with a
|
||||
value in :attr:`swift.common.utils.TRUE_VALUES`.
|
||||
|
||||
* Both shard range and object listings may be constrained to a name
|
||||
range by the ``marker`` and ``end_marker`` query string parameters.
|
||||
Object listings will only contain objects whose names are greater
|
||||
@ -698,13 +710,14 @@ class ContainerController(BaseStorageServer):
|
||||
:returns: an instance of :class:`swift.common.swob.Response`
|
||||
"""
|
||||
drive, part, account, container, obj = get_obj_name_and_placement(req)
|
||||
path = get_param(req, 'path')
|
||||
prefix = get_param(req, 'prefix')
|
||||
delimiter = get_param(req, 'delimiter')
|
||||
marker = get_param(req, 'marker', '')
|
||||
end_marker = get_param(req, 'end_marker')
|
||||
limit = constrain_req_limit(req, constraints.CONTAINER_LISTING_LIMIT)
|
||||
reverse = config_true_value(get_param(req, 'reverse'))
|
||||
params = validate_container_params(req)
|
||||
path = params.get('path')
|
||||
prefix = params.get('prefix')
|
||||
delimiter = params.get('delimiter')
|
||||
marker = params.get('marker', '')
|
||||
end_marker = params.get('end_marker')
|
||||
limit = params['limit']
|
||||
reverse = config_true_value(params.get('reverse'))
|
||||
out_content_type = listing_formats.get_listing_content_type(req)
|
||||
try:
|
||||
check_drive(self.root, drive, self.mount_check)
|
||||
@ -715,8 +728,8 @@ class ContainerController(BaseStorageServer):
|
||||
stale_reads_ok=True)
|
||||
info, is_deleted = broker.get_info_is_deleted()
|
||||
record_type = req.headers.get('x-backend-record-type', '').lower()
|
||||
if record_type == 'auto' and info.get('db_state') in (SHARDING,
|
||||
SHARDED):
|
||||
db_state = info.get('db_state')
|
||||
if record_type == 'auto' and db_state in (SHARDING, SHARDED):
|
||||
record_type = 'shard'
|
||||
if record_type == 'shard':
|
||||
override_deleted = info and config_true_value(
|
||||
@ -726,8 +739,16 @@ class ContainerController(BaseStorageServer):
|
||||
if is_deleted and not override_deleted:
|
||||
return HTTPNotFound(request=req, headers=resp_headers)
|
||||
resp_headers['X-Backend-Record-Type'] = 'shard'
|
||||
includes = get_param(req, 'includes')
|
||||
states = get_param(req, 'states')
|
||||
includes = params.get('includes')
|
||||
override_filter_hdr = req.headers.get(
|
||||
'x-backend-override-shard-name-filter', '').lower()
|
||||
if override_filter_hdr == db_state == 'sharded':
|
||||
# respect the request to send back *all* ranges if the db is in
|
||||
# sharded state
|
||||
resp_headers['X-Backend-Override-Shard-Name-Filter'] = 'true'
|
||||
marker = end_marker = includes = None
|
||||
reverse = False
|
||||
states = params.get('states')
|
||||
fill_gaps = False
|
||||
if states:
|
||||
states = list_from_csv(states)
|
||||
|
@ -63,13 +63,15 @@ from swift.common.swob import Request, Response, Range, \
|
||||
from swift.common.request_helpers import strip_sys_meta_prefix, \
|
||||
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \
|
||||
http_response_to_document_iters, is_object_transient_sysmeta, \
|
||||
strip_object_transient_sysmeta_prefix, get_ip_port
|
||||
strip_object_transient_sysmeta_prefix, get_ip_port, get_user_meta_prefix, \
|
||||
get_sys_meta_prefix
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds
|
||||
DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds
|
||||
DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds
|
||||
DEFAULT_RECHECK_LISTING_SHARD_RANGES = 600 # seconds
|
||||
|
||||
|
||||
def update_headers(response, headers):
|
||||
@ -195,9 +197,100 @@ def headers_to_container_info(headers, status_int=HTTP_OK):
|
||||
'meta': meta,
|
||||
'sysmeta': sysmeta,
|
||||
'sharding_state': headers.get('x-backend-sharding-state', 'unsharded'),
|
||||
# the 'internal' format version of timestamps is cached since the
|
||||
# normal format can be derived from this when required
|
||||
'created_at': headers.get('x-backend-timestamp'),
|
||||
'put_timestamp': headers.get('x-backend-put-timestamp'),
|
||||
'delete_timestamp': headers.get('x-backend-delete-timestamp'),
|
||||
'status_changed_at': headers.get('x-backend-status-changed-at'),
|
||||
}
|
||||
|
||||
|
||||
def headers_from_container_info(info):
|
||||
"""
|
||||
Construct a HeaderKeyDict from a container info dict.
|
||||
|
||||
:param info: a dict of container metadata
|
||||
:returns: a HeaderKeyDict or None if info is None or any required headers
|
||||
could not be constructed
|
||||
"""
|
||||
if not info:
|
||||
return None
|
||||
|
||||
required = (
|
||||
('x-backend-timestamp', 'created_at'),
|
||||
('x-backend-put-timestamp', 'put_timestamp'),
|
||||
('x-backend-delete-timestamp', 'delete_timestamp'),
|
||||
('x-backend-status-changed-at', 'status_changed_at'),
|
||||
('x-backend-storage-policy-index', 'storage_policy'),
|
||||
('x-container-object-count', 'object_count'),
|
||||
('x-container-bytes-used', 'bytes'),
|
||||
('x-backend-sharding-state', 'sharding_state'),
|
||||
)
|
||||
required_normal_format_timestamps = (
|
||||
('x-timestamp', 'created_at'),
|
||||
('x-put-timestamp', 'put_timestamp'),
|
||||
)
|
||||
optional = (
|
||||
('x-container-read', 'read_acl'),
|
||||
('x-container-write', 'write_acl'),
|
||||
('x-container-sync-key', 'sync_key'),
|
||||
('x-container-sync-to', 'sync_to'),
|
||||
('x-versions-location', 'versions'),
|
||||
)
|
||||
cors_optional = (
|
||||
('access-control-allow-origin', 'allow_origin'),
|
||||
('access-control-expose-headers', 'expose_headers'),
|
||||
('access-control-max-age', 'max_age')
|
||||
)
|
||||
|
||||
def lookup(info, key):
|
||||
# raises KeyError or ValueError
|
||||
val = info[key]
|
||||
if val is None:
|
||||
raise ValueError
|
||||
return val
|
||||
|
||||
# note: required headers may be missing from info for example during
|
||||
# upgrade when stale info is still in cache
|
||||
headers = HeaderKeyDict()
|
||||
for hdr, key in required:
|
||||
try:
|
||||
headers[hdr] = lookup(info, key)
|
||||
except (KeyError, ValueError):
|
||||
return None
|
||||
|
||||
for hdr, key in required_normal_format_timestamps:
|
||||
try:
|
||||
headers[hdr] = Timestamp(lookup(info, key)).normal
|
||||
except (KeyError, ValueError):
|
||||
return None
|
||||
|
||||
for hdr, key in optional:
|
||||
try:
|
||||
headers[hdr] = lookup(info, key)
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
|
||||
policy_index = info.get('storage_policy')
|
||||
headers['x-storage-policy'] = POLICIES[int(policy_index)].name
|
||||
prefix = get_user_meta_prefix('container')
|
||||
headers.update(
|
||||
(prefix + k, v)
|
||||
for k, v in info.get('meta', {}).items())
|
||||
for hdr, key in cors_optional:
|
||||
try:
|
||||
headers[prefix + hdr] = lookup(info.get('cors'), key)
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
prefix = get_sys_meta_prefix('container')
|
||||
headers.update(
|
||||
(prefix + k, v)
|
||||
for k, v in info.get('sysmeta', {}).items())
|
||||
|
||||
return headers
|
||||
|
||||
|
||||
def headers_to_object_info(headers, status_int=HTTP_OK):
|
||||
"""
|
||||
Construct a cacheable dict of object info based on response headers.
|
||||
@ -544,9 +637,7 @@ def set_info_cache(app, env, account, container, resp):
|
||||
infocache = env.setdefault('swift.infocache', {})
|
||||
memcache = cache_from_env(env, True)
|
||||
if resp is None:
|
||||
infocache.pop(cache_key, None)
|
||||
if memcache:
|
||||
memcache.delete(cache_key)
|
||||
clear_info_cache(app, env, account, container)
|
||||
return
|
||||
|
||||
if container:
|
||||
@ -603,16 +694,24 @@ def set_object_info_cache(app, env, account, container, obj, resp):
|
||||
return info
|
||||
|
||||
|
||||
def clear_info_cache(app, env, account, container=None):
|
||||
def clear_info_cache(app, env, account, container=None, shard=None):
|
||||
"""
|
||||
Clear the cached info in both memcache and env
|
||||
|
||||
:param app: the application object
|
||||
:param env: the WSGI environment
|
||||
:param account: the account name
|
||||
:param container: the containr name or None if setting info for containers
|
||||
:param container: the container name if clearing info for containers, or
|
||||
None
|
||||
:param shard: the sharding state if clearing info for container shard
|
||||
ranges, or None
|
||||
"""
|
||||
set_info_cache(app, env, account, container, None)
|
||||
cache_key = get_cache_key(account, container, shard=shard)
|
||||
infocache = env.setdefault('swift.infocache', {})
|
||||
memcache = cache_from_env(env, True)
|
||||
infocache.pop(cache_key, None)
|
||||
if memcache:
|
||||
memcache.delete(cache_key)
|
||||
|
||||
|
||||
def _get_info_from_infocache(env, account, container=None):
|
||||
@ -2160,6 +2259,24 @@ class Controller(object):
|
||||
raise ValueError(
|
||||
"server_type can only be 'account' or 'container'")
|
||||
|
||||
def _parse_listing_response(self, req, response):
|
||||
if not is_success(response.status_int):
|
||||
self.app.logger.warning(
|
||||
'Failed to get container listing from %s: %s',
|
||||
req.path_qs, response.status_int)
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(response.body)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError('not a list')
|
||||
return data
|
||||
except ValueError as err:
|
||||
self.app.logger.error(
|
||||
'Problem with listing response from %s: %r',
|
||||
req.path_qs, err)
|
||||
return None
|
||||
|
||||
def _get_container_listing(self, req, account, container, headers=None,
|
||||
params=None):
|
||||
"""
|
||||
@ -2167,7 +2284,7 @@ class Controller(object):
|
||||
|
||||
:param req: original Request instance.
|
||||
:param account: account in which `container` is stored.
|
||||
:param container: container from listing should be fetched.
|
||||
:param container: container from which listing should be fetched.
|
||||
:param headers: headers to be included with the request
|
||||
:param params: query string parameters to be used.
|
||||
:return: a tuple of (deserialized json data structure, swob Response)
|
||||
@ -2185,23 +2302,28 @@ class Controller(object):
|
||||
self.app.logger.debug(
|
||||
'Get listing from %s %s' % (subreq.path_qs, headers))
|
||||
response = self.app.handle_request(subreq)
|
||||
data = self._parse_listing_response(req, response)
|
||||
return data, response
|
||||
|
||||
if not is_success(response.status_int):
|
||||
self.app.logger.warning(
|
||||
'Failed to get container listing from %s: %s',
|
||||
subreq.path_qs, response.status_int)
|
||||
return None, response
|
||||
def _parse_shard_ranges(self, req, listing, response):
|
||||
if listing is None:
|
||||
return None
|
||||
|
||||
record_type = response.headers.get('x-backend-record-type')
|
||||
if record_type != 'shard':
|
||||
err = 'unexpected record type %r' % record_type
|
||||
self.app.logger.error("Failed to get shard ranges from %s: %s",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(response.body)
|
||||
if not isinstance(data, list):
|
||||
raise ValueError('not a list')
|
||||
return data, response
|
||||
except ValueError as err:
|
||||
return [ShardRange.from_dict(shard_range)
|
||||
for shard_range in listing]
|
||||
except (ValueError, TypeError, KeyError) as err:
|
||||
self.app.logger.error(
|
||||
'Problem with listing response from %s: %r',
|
||||
subreq.path_qs, err)
|
||||
return None, response
|
||||
"Failed to get shard ranges from %s: invalid data: %r",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
|
||||
def _get_shard_ranges(self, req, account, container, includes=None,
|
||||
states=None):
|
||||
@ -2229,24 +2351,7 @@ class Controller(object):
|
||||
headers = {'X-Backend-Record-Type': 'shard'}
|
||||
listing, response = self._get_container_listing(
|
||||
req, account, container, headers=headers, params=params)
|
||||
if listing is None:
|
||||
return None
|
||||
|
||||
record_type = response.headers.get('x-backend-record-type')
|
||||
if record_type != 'shard':
|
||||
err = 'unexpected record type %r' % record_type
|
||||
self.app.logger.error("Failed to get shard ranges from %s: %s",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
|
||||
try:
|
||||
return [ShardRange.from_dict(shard_range)
|
||||
for shard_range in listing]
|
||||
except (ValueError, TypeError, KeyError) as err:
|
||||
self.app.logger.error(
|
||||
"Failed to get shard ranges from %s: invalid data: %r",
|
||||
req.path_qs, err)
|
||||
return None
|
||||
return self._parse_shard_ranges(req, listing, response)
|
||||
|
||||
def _get_update_shard(self, req, account, container, obj):
|
||||
"""
|
||||
|
@ -15,21 +15,23 @@
|
||||
|
||||
from swift import gettext_ as _
|
||||
import json
|
||||
import math
|
||||
|
||||
import six
|
||||
from six.moves.urllib.parse import unquote
|
||||
|
||||
from swift.common.utils import public, private, csv_append, Timestamp, \
|
||||
config_true_value, ShardRange
|
||||
config_true_value, ShardRange, cache_from_env, filter_shard_ranges
|
||||
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
|
||||
from swift.common.http import HTTP_ACCEPTED, is_success
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
|
||||
constrain_req_limit, validate_container_params
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
cors_validation, set_info_cache, clear_info_cache
|
||||
cors_validation, set_info_cache, clear_info_cache, _get_info_from_caches, \
|
||||
get_cache_key, headers_from_container_info, update_headers
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
|
||||
HTTPNotFound, HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, \
|
||||
bytes_to_wsgi
|
||||
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
|
||||
HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, bytes_to_wsgi, Response
|
||||
|
||||
|
||||
class ContainerController(Controller):
|
||||
@ -87,6 +89,144 @@ class ContainerController(Controller):
|
||||
return HTTPBadRequest(request=req, body=str(err))
|
||||
return None
|
||||
|
||||
def _clear_container_info_cache(self, req):
|
||||
clear_info_cache(self.app, req.environ,
|
||||
self.account_name, self.container_name)
|
||||
clear_info_cache(self.app, req.environ,
|
||||
self.account_name, self.container_name, 'listing')
|
||||
# TODO: should we also purge updating shards from cache?
|
||||
|
||||
def _GETorHEAD_from_backend(self, req):
|
||||
part = self.app.container_ring.get_part(
|
||||
self.account_name, self.container_name)
|
||||
concurrency = self.app.container_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part)
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Container'), node_iter, part,
|
||||
req.swift_entity_path, concurrency)
|
||||
return resp
|
||||
|
||||
def _filter_resp_shard_ranges(self, req, cached_ranges):
|
||||
# filter returned shard ranges according to request constraints
|
||||
marker = get_param(req, 'marker', '')
|
||||
end_marker = get_param(req, 'end_marker')
|
||||
includes = get_param(req, 'includes')
|
||||
reverse = config_true_value(get_param(req, 'reverse'))
|
||||
if reverse:
|
||||
marker, end_marker = end_marker, marker
|
||||
shard_ranges = [
|
||||
ShardRange.from_dict(shard_range)
|
||||
for shard_range in cached_ranges]
|
||||
shard_ranges = filter_shard_ranges(shard_ranges, includes, marker,
|
||||
end_marker)
|
||||
if reverse:
|
||||
shard_ranges.reverse()
|
||||
return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
|
||||
|
||||
def _GET_using_cache(self, req):
|
||||
# It may be possible to fulfil the request from cache: we only reach
|
||||
# here if request record_type is 'shard' or 'auto', so if the container
|
||||
# state is 'sharded' then look for cached shard ranges. However, if
|
||||
# X-Newest is true then we always fetch from the backend servers.
|
||||
get_newest = config_true_value(req.headers.get('x-newest', False))
|
||||
if get_newest:
|
||||
self.app.logger.debug(
|
||||
'Skipping shard cache lookup (x-newest) for %s', req.path_qs)
|
||||
info = None
|
||||
else:
|
||||
info = _get_info_from_caches(self.app, req.environ,
|
||||
self.account_name,
|
||||
self.container_name)
|
||||
if (info and is_success(info['status']) and
|
||||
info.get('sharding_state') == 'sharded'):
|
||||
# container is sharded so we may have the shard ranges cached
|
||||
headers = headers_from_container_info(info)
|
||||
if headers:
|
||||
# only use cached values if all required headers available
|
||||
infocache = req.environ.setdefault('swift.infocache', {})
|
||||
memcache = cache_from_env(req.environ, True)
|
||||
cache_key = get_cache_key(self.account_name,
|
||||
self.container_name,
|
||||
shard='listing')
|
||||
cached_ranges = infocache.get(cache_key)
|
||||
if cached_ranges is None and memcache:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
if cached_ranges is not None:
|
||||
infocache[cache_key] = tuple(cached_ranges)
|
||||
# shard ranges can be returned from cache
|
||||
self.app.logger.debug('Found %d shards in cache for %s',
|
||||
len(cached_ranges), req.path_qs)
|
||||
headers.update({'x-backend-record-type': 'shard',
|
||||
'x-backend-cached-results': 'true'})
|
||||
shard_range_body = self._filter_resp_shard_ranges(
|
||||
req, cached_ranges)
|
||||
# mimic GetOrHeadHandler.get_working_response...
|
||||
# note: server sets charset with content_type but proxy
|
||||
# GETorHEAD_base does not, so don't set it here either
|
||||
resp = Response(request=req, body=shard_range_body)
|
||||
update_headers(resp, headers)
|
||||
resp.last_modified = math.ceil(
|
||||
float(headers['x-put-timestamp']))
|
||||
resp.environ['swift_x_timestamp'] = headers.get(
|
||||
'x-timestamp')
|
||||
resp.accept_ranges = 'bytes'
|
||||
resp.content_type = 'application/json'
|
||||
return resp
|
||||
|
||||
# The request was not fulfilled from cache so send to the backend
|
||||
# server, but instruct the backend server to ignore name constraints in
|
||||
# request params if returning shard ranges so that the response can
|
||||
# potentially be cached. Only do this if the container state is
|
||||
# 'sharded'. We don't attempt to cache shard ranges for a 'sharding'
|
||||
# container as they may include the container itself as a 'gap filler'
|
||||
# for shard ranges that have not yet cleaved; listings from 'gap
|
||||
# filler' shard ranges are likely to become stale as the container
|
||||
# continues to cleave objects to its shards and caching them is
|
||||
# therefore more likely to result in stale or incomplete listings on
|
||||
# subsequent container GETs.
|
||||
req.headers['x-backend-override-shard-name-filter'] = 'sharded'
|
||||
resp = self._GETorHEAD_from_backend(req)
|
||||
|
||||
sharding_state = resp.headers.get(
|
||||
'x-backend-sharding-state', '').lower()
|
||||
resp_record_type = resp.headers.get(
|
||||
'x-backend-record-type', '').lower()
|
||||
complete_listing = config_true_value(resp.headers.pop(
|
||||
'x-backend-override-shard-name-filter', False))
|
||||
# given that we sent 'x-backend-override-shard-name-filter=sharded' we
|
||||
# should only receive back 'x-backend-override-shard-name-filter=true'
|
||||
# if the sharding state is 'sharded', but check them both anyway...
|
||||
if (resp_record_type == 'shard' and
|
||||
sharding_state == 'sharded' and
|
||||
complete_listing):
|
||||
# backend returned unfiltered listing state shard ranges so parse
|
||||
# them and replace response body with filtered listing
|
||||
cache_key = get_cache_key(self.account_name, self.container_name,
|
||||
shard='listing')
|
||||
data = self._parse_listing_response(req, resp)
|
||||
backend_shard_ranges = self._parse_shard_ranges(req, data, resp)
|
||||
if backend_shard_ranges is not None:
|
||||
cached_ranges = [dict(sr) for sr in backend_shard_ranges]
|
||||
if resp.headers.get('x-backend-sharding-state') == 'sharded':
|
||||
# cache in infocache even if no shard ranges returned; this
|
||||
# is unexpected but use that result for this request
|
||||
infocache = req.environ.setdefault('swift.infocache', {})
|
||||
infocache[cache_key] = tuple(cached_ranges)
|
||||
memcache = cache_from_env(req.environ, True)
|
||||
if memcache and cached_ranges:
|
||||
# cache in memcache only if shard ranges as expected
|
||||
self.app.logger.debug('Caching %d shards for %s',
|
||||
len(cached_ranges), req.path_qs)
|
||||
memcache.set(
|
||||
cache_key, cached_ranges,
|
||||
time=self.app.recheck_listing_shard_ranges)
|
||||
|
||||
# filter returned shard ranges according to request constraints
|
||||
resp.body = self._filter_resp_shard_ranges(req, cached_ranges)
|
||||
|
||||
return resp
|
||||
|
||||
def GETorHEAD(self, req):
|
||||
"""Handler for HTTP GET/HEAD requests."""
|
||||
ai = self.account_info(self.account_name, req)
|
||||
@ -102,33 +242,51 @@ class ContainerController(Controller):
|
||||
# Don't cache this. The lack of account will be cached, and that
|
||||
# is sufficient.
|
||||
return HTTPNotFound(request=req)
|
||||
part = self.app.container_ring.get_part(
|
||||
self.account_name, self.container_name)
|
||||
concurrency = self.app.container_ring.replica_count \
|
||||
if self.app.get_policy_options(None).concurrent_gets else 1
|
||||
node_iter = self.app.iter_nodes(self.app.container_ring, part)
|
||||
|
||||
# The read-modify-write of params here is because the Request.params
|
||||
# getter dynamically generates a dict of params from the query string;
|
||||
# the setter must be called for new params to update the query string.
|
||||
params = req.params
|
||||
params['format'] = 'json'
|
||||
# x-backend-record-type may be sent via internal client e.g. from
|
||||
# the sharder or in probe tests
|
||||
record_type = req.headers.get('X-Backend-Record-Type', '').lower()
|
||||
if not record_type:
|
||||
record_type = 'auto'
|
||||
req.headers['X-Backend-Record-Type'] = 'auto'
|
||||
params['states'] = 'listing'
|
||||
req.params = params
|
||||
resp = self.GETorHEAD_base(
|
||||
req, _('Container'), node_iter, part,
|
||||
req.swift_entity_path, concurrency)
|
||||
|
||||
memcache = cache_from_env(req.environ, True)
|
||||
if (req.method == 'GET' and
|
||||
record_type != 'object' and
|
||||
self.app.recheck_listing_shard_ranges > 0 and
|
||||
memcache and
|
||||
get_param(req, 'states') == 'listing' and
|
||||
not config_true_value(
|
||||
req.headers.get('x-backend-include-deleted', False))):
|
||||
# This GET might be served from cache or might populate cache.
|
||||
# 'x-backend-include-deleted' is not usually expected in requests
|
||||
# to the proxy (it is used from sharder to container servers) but
|
||||
# it is included in the conditions just in case because we don't
|
||||
# cache deleted shard ranges.
|
||||
resp = self._GET_using_cache(req)
|
||||
else:
|
||||
resp = self._GETorHEAD_from_backend(req)
|
||||
|
||||
resp_record_type = resp.headers.get('X-Backend-Record-Type', '')
|
||||
if all((req.method == "GET", record_type == 'auto',
|
||||
resp_record_type.lower() == 'shard')):
|
||||
resp = self._get_from_shards(req, resp)
|
||||
|
||||
# Cache this. We just made a request to a storage node and got
|
||||
# up-to-date information for the container.
|
||||
resp.headers['X-Backend-Recheck-Container-Existence'] = str(
|
||||
self.app.recheck_container_existence)
|
||||
set_info_cache(self.app, req.environ, self.account_name,
|
||||
self.container_name, resp)
|
||||
if not config_true_value(
|
||||
resp.headers.get('X-Backend-Cached-Results')):
|
||||
# Cache container metadata. We just made a request to a storage
|
||||
# node and got up-to-date information for the container.
|
||||
resp.headers['X-Backend-Recheck-Container-Existence'] = str(
|
||||
self.app.recheck_container_existence)
|
||||
set_info_cache(self.app, req.environ, self.account_name,
|
||||
self.container_name, resp)
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = resp.headers.get('x-container-read')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
@ -171,7 +329,7 @@ class ContainerController(Controller):
|
||||
return resp
|
||||
|
||||
objects = []
|
||||
req_limit = int(req.params.get('limit') or CONTAINER_LISTING_LIMIT)
|
||||
req_limit = constrain_req_limit(req, CONTAINER_LISTING_LIMIT)
|
||||
params = req.params.copy()
|
||||
params.pop('states', None)
|
||||
req.headers.pop('X-Backend-Record-Type', None)
|
||||
@ -181,7 +339,7 @@ class ContainerController(Controller):
|
||||
prefix = wsgi_to_str(params.get('prefix'))
|
||||
|
||||
limit = req_limit
|
||||
for shard_range in shard_ranges:
|
||||
for i, shard_range in enumerate(shard_ranges):
|
||||
params['limit'] = limit
|
||||
# Always set marker to ensure that object names less than or equal
|
||||
# to those already in the listing are not fetched; if the listing
|
||||
@ -207,12 +365,13 @@ class ContainerController(Controller):
|
||||
else:
|
||||
params['end_marker'] = str_to_wsgi(shard_range.end_marker)
|
||||
|
||||
headers = {}
|
||||
if ((shard_range.account, shard_range.container) in
|
||||
shard_listing_history):
|
||||
# directed back to same container - force GET of objects
|
||||
headers = {'X-Backend-Record-Type': 'object'}
|
||||
else:
|
||||
headers = None
|
||||
headers['X-Backend-Record-Type'] = 'object'
|
||||
if config_true_value(req.headers.get('x-newest', False)):
|
||||
headers['X-Newest'] = 'true'
|
||||
|
||||
if prefix:
|
||||
if prefix > shard_range:
|
||||
@ -225,14 +384,33 @@ class ContainerController(Controller):
|
||||
if just_past < shard_range:
|
||||
continue
|
||||
|
||||
self.app.logger.debug('Getting from %s %s with %s',
|
||||
shard_range, shard_range.name, headers)
|
||||
self.app.logger.debug(
|
||||
'Getting listing part %d from shard %s %s with %s',
|
||||
i, shard_range, shard_range.name, headers)
|
||||
objs, shard_resp = self._get_container_listing(
|
||||
req, shard_range.account, shard_range.container,
|
||||
headers=headers, params=params)
|
||||
|
||||
shard_state = 'unknown'
|
||||
try:
|
||||
shard_state = shard_resp.headers['x-backend-sharding-state']
|
||||
shard_state = ShardRange.resolve_state(shard_state)
|
||||
except (AttributeError, ValueError, KeyError):
|
||||
pass
|
||||
|
||||
if objs is None:
|
||||
# tolerate errors
|
||||
self.app.logger.debug(
|
||||
'Failed to get objects from shard (state=%s), total = %d',
|
||||
shard_state, len(objects))
|
||||
continue
|
||||
|
||||
self.app.logger.debug(
|
||||
'Found %d objects in shard (state=%s), total = %d',
|
||||
len(objs), shard_state, len(objs) + len(objects))
|
||||
|
||||
if not objs:
|
||||
# tolerate errors or empty shard containers
|
||||
# tolerate empty shard containers
|
||||
continue
|
||||
|
||||
objects.extend(objs)
|
||||
@ -270,6 +448,8 @@ class ContainerController(Controller):
|
||||
@cors_validation
|
||||
def GET(self, req):
|
||||
"""Handler for HTTP GET requests."""
|
||||
# early checks for request validity
|
||||
validate_container_params(req)
|
||||
return self.GETorHEAD(req)
|
||||
|
||||
@public
|
||||
@ -328,8 +508,7 @@ class ContainerController(Controller):
|
||||
resp = self.make_requests(
|
||||
req, self.app.container_ring,
|
||||
container_partition, 'PUT', req.swift_entity_path, headers)
|
||||
clear_info_cache(self.app, req.environ,
|
||||
self.account_name, self.container_name)
|
||||
self._clear_container_info_cache(req)
|
||||
return resp
|
||||
|
||||
@public
|
||||
@ -354,8 +533,7 @@ class ContainerController(Controller):
|
||||
container_partition, containers = self.app.container_ring.get_nodes(
|
||||
self.account_name, self.container_name)
|
||||
headers = self.generate_request_headers(req, transfer=True)
|
||||
clear_info_cache(self.app, req.environ,
|
||||
self.account_name, self.container_name)
|
||||
self._clear_container_info_cache(req)
|
||||
resp = self.make_requests(
|
||||
req, self.app.container_ring, container_partition, 'POST',
|
||||
req.swift_entity_path, [headers] * len(containers))
|
||||
@ -373,8 +551,7 @@ class ContainerController(Controller):
|
||||
self.account_name, self.container_name)
|
||||
headers = self._backend_requests(req, len(containers),
|
||||
account_partition, accounts)
|
||||
clear_info_cache(self.app, req.environ,
|
||||
self.account_name, self.container_name)
|
||||
self._clear_container_info_cache(req)
|
||||
resp = self.make_requests(
|
||||
req, self.app.container_ring, container_partition, 'DELETE',
|
||||
req.swift_entity_path, headers)
|
||||
|
@ -41,7 +41,7 @@ from swift.proxy.controllers import AccountController, ContainerController, \
|
||||
ObjectControllerRouter, InfoController
|
||||
from swift.proxy.controllers.base import get_container_info, NodeIter, \
|
||||
DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE, \
|
||||
DEFAULT_RECHECK_UPDATING_SHARD_RANGES
|
||||
DEFAULT_RECHECK_UPDATING_SHARD_RANGES, DEFAULT_RECHECK_LISTING_SHARD_RANGES
|
||||
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
|
||||
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
|
||||
HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \
|
||||
@ -221,6 +221,9 @@ class Application(object):
|
||||
self.recheck_updating_shard_ranges = \
|
||||
int(conf.get('recheck_updating_shard_ranges',
|
||||
DEFAULT_RECHECK_UPDATING_SHARD_RANGES))
|
||||
self.recheck_listing_shard_ranges = \
|
||||
int(conf.get('recheck_listing_shard_ranges',
|
||||
DEFAULT_RECHECK_LISTING_SHARD_RANGES))
|
||||
self.recheck_account_existence = \
|
||||
int(conf.get('recheck_account_existence',
|
||||
DEFAULT_RECHECK_ACCOUNT_EXISTENCE))
|
||||
|
@ -288,9 +288,10 @@ class BaseTestContainerSharding(ReplProbeTest):
|
||||
actual = sum(sr['object_count'] for sr in shard_ranges)
|
||||
self.assertEqual(expected_object_count, actual)
|
||||
|
||||
def assert_container_listing(self, expected_listing):
|
||||
def assert_container_listing(self, expected_listing, req_hdrs=None):
|
||||
req_hdrs = req_hdrs if req_hdrs else {}
|
||||
headers, actual_listing = client.get_container(
|
||||
self.url, self.token, self.container_name)
|
||||
self.url, self.token, self.container_name, headers=req_hdrs)
|
||||
self.assertIn('x-container-object-count', headers)
|
||||
expected_obj_count = len(expected_listing)
|
||||
self.assertEqual(expected_listing, [
|
||||
@ -390,18 +391,21 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
|
||||
# verify parameterised listing of a container during sharding
|
||||
all_obj_names = self._make_object_names(4 * self.max_shard_size)
|
||||
obj_names = all_obj_names[::2]
|
||||
self.put_objects(obj_names)
|
||||
obj_content = 'testing'
|
||||
self.put_objects(obj_names, contents=obj_content)
|
||||
# choose some names approx in middle of each expected shard range
|
||||
markers = [
|
||||
obj_names[i] for i in range(self.max_shard_size // 4,
|
||||
2 * self.max_shard_size,
|
||||
self.max_shard_size // 2)]
|
||||
|
||||
def check_listing(objects, **params):
|
||||
def check_listing(objects, req_hdrs=None, **params):
|
||||
req_hdrs = req_hdrs if req_hdrs else {}
|
||||
qs = '&'.join('%s=%s' % (k, quote(str(v)))
|
||||
for k, v in params.items())
|
||||
headers, listing = client.get_container(
|
||||
self.url, self.token, self.container_name, query_string=qs)
|
||||
self.url, self.token, self.container_name, query_string=qs,
|
||||
headers=req_hdrs)
|
||||
listing = [x['name'].encode('utf-8') if six.PY2 else x['name']
|
||||
for x in listing]
|
||||
if params.get('reverse'):
|
||||
@ -416,6 +420,12 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
|
||||
if 'limit' in params:
|
||||
expected = expected[:params['limit']]
|
||||
self.assertEqual(expected, listing)
|
||||
self.assertIn('x-timestamp', headers)
|
||||
self.assertIn('last-modified', headers)
|
||||
self.assertIn('x-trans-id', headers)
|
||||
self.assertEqual('bytes', headers.get('accept-ranges'))
|
||||
self.assertEqual('application/json; charset=utf-8',
|
||||
headers.get('content-type'))
|
||||
|
||||
def check_listing_fails(exp_status, **params):
|
||||
qs = '&'.join(['%s=%s' % param for param in params.items()])
|
||||
@ -425,38 +435,39 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
|
||||
self.assertEqual(exp_status, cm.exception.http_status)
|
||||
return cm.exception
|
||||
|
||||
def do_listing_checks(objects):
|
||||
check_listing(objects)
|
||||
check_listing(objects, marker=markers[0], end_marker=markers[1])
|
||||
check_listing(objects, marker=markers[0], end_marker=markers[2])
|
||||
check_listing(objects, marker=markers[1], end_marker=markers[3])
|
||||
check_listing(objects, marker=markers[1], end_marker=markers[3],
|
||||
def do_listing_checks(objs, hdrs=None):
|
||||
hdrs = hdrs if hdrs else {}
|
||||
check_listing(objs, hdrs)
|
||||
check_listing(objs, hdrs, marker=markers[0], end_marker=markers[1])
|
||||
check_listing(objs, hdrs, marker=markers[0], end_marker=markers[2])
|
||||
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[3])
|
||||
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[3],
|
||||
limit=self.max_shard_size // 4)
|
||||
check_listing(objects, marker=markers[1], end_marker=markers[3],
|
||||
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[3],
|
||||
limit=self.max_shard_size // 4)
|
||||
check_listing(objects, marker=markers[1], end_marker=markers[2],
|
||||
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[2],
|
||||
limit=self.max_shard_size // 2)
|
||||
check_listing(objects, marker=markers[1], end_marker=markers[1])
|
||||
check_listing(objects, reverse=True)
|
||||
check_listing(objects, reverse=True, end_marker=markers[1])
|
||||
check_listing(objects, reverse=True, marker=markers[3],
|
||||
check_listing(objs, hdrs, marker=markers[1], end_marker=markers[1])
|
||||
check_listing(objs, hdrs, reverse=True)
|
||||
check_listing(objs, hdrs, reverse=True, end_marker=markers[1])
|
||||
check_listing(objs, hdrs, reverse=True, marker=markers[3],
|
||||
end_marker=markers[1],
|
||||
limit=self.max_shard_size // 4)
|
||||
check_listing(objects, reverse=True, marker=markers[3],
|
||||
check_listing(objs, hdrs, reverse=True, marker=markers[3],
|
||||
end_marker=markers[1], limit=0)
|
||||
check_listing([], marker=markers[0], end_marker=markers[0])
|
||||
check_listing([], marker=markers[0], end_marker=markers[1],
|
||||
check_listing([], hdrs, marker=markers[0], end_marker=markers[0])
|
||||
check_listing([], hdrs, marker=markers[0], end_marker=markers[1],
|
||||
reverse=True)
|
||||
check_listing(objects, prefix='obj')
|
||||
check_listing([], prefix='zzz')
|
||||
check_listing(objs, hdrs, prefix='obj')
|
||||
check_listing([], hdrs, prefix='zzz')
|
||||
# delimiter
|
||||
headers, listing = client.get_container(
|
||||
self.url, self.token, self.container_name,
|
||||
query_string='delimiter=' + quote(self.DELIM))
|
||||
query_string='delimiter=' + quote(self.DELIM), headers=hdrs)
|
||||
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
|
||||
headers, listing = client.get_container(
|
||||
self.url, self.token, self.container_name,
|
||||
query_string='delimiter=j' + quote(self.DELIM))
|
||||
query_string='delimiter=j' + quote(self.DELIM), headers=hdrs)
|
||||
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
|
||||
|
||||
limit = self.cluster_info['swift']['container_listing_limit']
|
||||
@ -494,13 +505,16 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
|
||||
self.assert_container_post_ok('sharding')
|
||||
do_listing_checks(obj_names)
|
||||
|
||||
# put some new objects spread through entire namespace
|
||||
# put some new objects spread through entire namespace; object updates
|
||||
# should be directed to the shard container (both the cleaved and the
|
||||
# created shards)
|
||||
new_obj_names = all_obj_names[1::4]
|
||||
self.put_objects(new_obj_names)
|
||||
self.put_objects(new_obj_names, obj_content)
|
||||
|
||||
# new objects that fell into the first two cleaved shard ranges are
|
||||
# reported in listing, new objects in the yet-to-be-cleaved shard
|
||||
# ranges are not yet included in listing
|
||||
# reported in listing; new objects in the yet-to-be-cleaved shard
|
||||
# ranges are not yet included in listing because listings prefer the
|
||||
# root over the final two shards that are not yet-cleaved
|
||||
exp_obj_names = [o for o in obj_names + new_obj_names
|
||||
if o <= shard_ranges[1].upper]
|
||||
exp_obj_names += [o for o in obj_names
|
||||
@ -515,9 +529,53 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
|
||||
shard_ranges = self.get_container_shard_ranges()
|
||||
self.assert_shard_range_state(ShardRange.ACTIVE, shard_ranges)
|
||||
|
||||
# listings are now gathered from all four shard ranges so should have
|
||||
# all the specified objects
|
||||
exp_obj_names = obj_names + new_obj_names
|
||||
exp_obj_names.sort()
|
||||
do_listing_checks(exp_obj_names)
|
||||
# shard ranges may now be cached by proxy so do listings checks again
|
||||
# forcing backend request
|
||||
do_listing_checks(exp_obj_names, hdrs={'X-Newest': 'true'})
|
||||
|
||||
# post more metadata to the container and check that it is read back
|
||||
# correctly from backend (using x-newest) and cache
|
||||
test_headers = {'x-container-meta-test': 'testing',
|
||||
'x-container-read': 'read_acl',
|
||||
'x-container-write': 'write_acl',
|
||||
'x-container-sync-key': 'sync_key',
|
||||
# 'x-container-sync-to': 'sync_to',
|
||||
'x-versions-location': 'versions',
|
||||
'x-container-meta-access-control-allow-origin': 'aa',
|
||||
'x-container-meta-access-control-expose-headers': 'bb',
|
||||
'x-container-meta-access-control-max-age': '123'}
|
||||
client.post_container(self.url, self.admin_token, self.container_name,
|
||||
headers=test_headers)
|
||||
headers, listing = client.get_container(
|
||||
self.url, self.token, self.container_name,
|
||||
headers={'X-Newest': 'true'})
|
||||
exp_headers = dict(test_headers)
|
||||
exp_headers.update({
|
||||
'x-container-object-count': str(len(exp_obj_names)),
|
||||
'x-container-bytes-used':
|
||||
str(len(exp_obj_names) * len(obj_content))
|
||||
})
|
||||
for k, v in exp_headers.items():
|
||||
self.assertIn(k, headers)
|
||||
self.assertEqual(v, headers[k], dict(headers))
|
||||
|
||||
cache_headers, listing = client.get_container(
|
||||
self.url, self.token, self.container_name)
|
||||
for k, v in exp_headers.items():
|
||||
self.assertIn(k, cache_headers)
|
||||
self.assertEqual(v, cache_headers[k], dict(exp_headers))
|
||||
# we don't expect any of these headers to be equal...
|
||||
for k in ('x-timestamp', 'last-modified', 'date', 'x-trans-id',
|
||||
'x-openstack-request-id'):
|
||||
headers.pop(k, None)
|
||||
cache_headers.pop(k, None)
|
||||
self.assertEqual(headers, cache_headers)
|
||||
|
||||
self.assert_container_delete_fails()
|
||||
self.assert_container_has_shard_sysmeta()
|
||||
self.assert_container_post_ok('sharded')
|
||||
@ -881,20 +939,29 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
# ... and check some other container properties
|
||||
self.assertEqual(headers['last-modified'],
|
||||
pre_sharding_headers['last-modified'])
|
||||
|
||||
# It even works in reverse!
|
||||
headers, listing = client.get_container(self.url, self.token,
|
||||
self.container_name,
|
||||
query_string='reverse=on')
|
||||
self.assertEqual(pre_sharding_listing[::-1], listing)
|
||||
|
||||
# and repeat checks to use shard ranges now cached in proxy
|
||||
headers, actual_listing = self.assert_container_listing(obj_names)
|
||||
self.assertEqual(headers['last-modified'],
|
||||
pre_sharding_headers['last-modified'])
|
||||
headers, listing = client.get_container(self.url, self.token,
|
||||
self.container_name,
|
||||
query_string='reverse=on')
|
||||
self.assertEqual(pre_sharding_listing[::-1], listing)
|
||||
|
||||
# Now put some new objects into first shard, taking its count to
|
||||
# 3 shard ranges' worth
|
||||
more_obj_names = [
|
||||
'beta%03d' % x for x in range(self.max_shard_size)]
|
||||
self.put_objects(more_obj_names)
|
||||
|
||||
# The listing includes new objects...
|
||||
# The listing includes new objects (shard ranges haven't changed, just
|
||||
# their object content, so cached shard ranges are still correct)...
|
||||
headers, listing = self.assert_container_listing(
|
||||
more_obj_names + obj_names)
|
||||
self.assertEqual(pre_sharding_listing, listing[len(more_obj_names):])
|
||||
@ -2002,10 +2069,14 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
# then run sharder on the shard node without the alpha object
|
||||
self.sharders.once(additional_args='--partitions=%s' % shard_part,
|
||||
number=shard_nodes[2])
|
||||
# root sees first shard has shrunk, only second shard range used for
|
||||
# listing so alpha object not in listing
|
||||
# root sees first shard has shrunk
|
||||
self.assertLengthEqual(self.get_container_shard_ranges(), 1)
|
||||
self.assert_container_listing([])
|
||||
# cached shard ranges still show first shard range as active so listing
|
||||
# will include 'alpha' if the shard listing is fetched from node (0,1)
|
||||
# but not if fetched from node 2; to achieve predictability we use
|
||||
# x-newest to use shard ranges from the root so that only the second
|
||||
# shard range is used for listing, so alpha object not in listing
|
||||
self.assert_container_listing([], req_hdrs={'x-newest': 'true'})
|
||||
self.assert_container_object_count(0)
|
||||
|
||||
# run the updaters: the async pending update will be redirected from
|
||||
|
@ -396,18 +396,29 @@ class FakeMemcache(object):
|
||||
|
||||
def __init__(self):
|
||||
self.store = {}
|
||||
self.calls = []
|
||||
|
||||
def clear_calls(self):
|
||||
del self.calls[:]
|
||||
|
||||
def _called(self, method, key=None, value=None, time=None):
|
||||
self.calls.append((method, key, value, time))
|
||||
|
||||
def get(self, key):
|
||||
self._called('get', key)
|
||||
return self.store.get(key)
|
||||
|
||||
def keys(self):
|
||||
self._called('keys')
|
||||
return self.store.keys()
|
||||
|
||||
def set(self, key, value, time=0):
|
||||
self._called('set', key, value, time)
|
||||
self.store[key] = value
|
||||
return True
|
||||
|
||||
def incr(self, key, time=0):
|
||||
self._called('incr', key, time=time)
|
||||
self.store[key] = self.store.setdefault(key, 0) + 1
|
||||
return self.store[key]
|
||||
|
||||
@ -416,12 +427,16 @@ class FakeMemcache(object):
|
||||
yield True
|
||||
|
||||
def delete(self, key):
|
||||
self._called('delete', key)
|
||||
try:
|
||||
del self.store[key]
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
def delete_all(self):
|
||||
self.store.clear()
|
||||
|
||||
|
||||
class FakeIterable(object):
|
||||
def __init__(self, values):
|
||||
|
@ -42,6 +42,70 @@ class TestRequestHelpers(unittest.TestCase):
|
||||
rh.constrain_req_limit(req, 10)
|
||||
self.assertEqual(raised.exception.status_int, 412)
|
||||
|
||||
def test_validate_params(self):
|
||||
req = Request.blank('')
|
||||
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
|
||||
self.assertEqual({}, actual)
|
||||
|
||||
req = Request.blank('', query_string='limit=1;junk=here;marker=foo')
|
||||
actual = rh.validate_params(req, ())
|
||||
self.assertEqual({}, actual)
|
||||
|
||||
req = Request.blank('', query_string='limit=1;junk=here;marker=foo')
|
||||
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
|
||||
expected = {'limit': '1', 'marker': 'foo'}
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
req = Request.blank('', query_string='limit=1;junk=here;marker=')
|
||||
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
|
||||
expected = {'limit': '1', 'marker': ''}
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
# ignore bad junk
|
||||
req = Request.blank('', query_string='limit=1;junk=%ff;marker=foo')
|
||||
actual = rh.validate_params(req, ('limit', 'marker', 'end_marker'))
|
||||
expected = {'limit': '1', 'marker': 'foo'}
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
# error on bad wanted parameter
|
||||
req = Request.blank('', query_string='limit=1;junk=here;marker=%ff')
|
||||
with self.assertRaises(HTTPException) as raised:
|
||||
rh.validate_params(req, ('limit', 'marker', 'end_marker'))
|
||||
self.assertEqual(raised.exception.status_int, 400)
|
||||
|
||||
def test_validate_container_params(self):
|
||||
req = Request.blank('')
|
||||
actual = rh.validate_container_params(req)
|
||||
self.assertEqual({'limit': 10000}, actual)
|
||||
|
||||
req = Request.blank('', query_string='limit=1;junk=here;marker=foo')
|
||||
actual = rh.validate_container_params(req)
|
||||
expected = {'limit': 1, 'marker': 'foo'}
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
req = Request.blank('', query_string='limit=1;junk=here;marker=')
|
||||
actual = rh.validate_container_params(req)
|
||||
expected = {'limit': 1, 'marker': ''}
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
# ignore bad junk
|
||||
req = Request.blank('', query_string='limit=1;junk=%ff;marker=foo')
|
||||
actual = rh.validate_container_params(req)
|
||||
expected = {'limit': 1, 'marker': 'foo'}
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
# error on bad wanted parameter
|
||||
req = Request.blank('', query_string='limit=1;junk=here;marker=%ff')
|
||||
with self.assertRaises(HTTPException) as raised:
|
||||
rh.validate_container_params(req)
|
||||
self.assertEqual(raised.exception.status_int, 400)
|
||||
|
||||
# error on bad limit
|
||||
req = Request.blank('', query_string='limit=10001')
|
||||
with self.assertRaises(HTTPException) as raised:
|
||||
rh.validate_container_params(req)
|
||||
self.assertEqual(raised.exception.status_int, 412)
|
||||
|
||||
def test_is_user_meta(self):
|
||||
m_type = 'meta'
|
||||
for st in server_types:
|
||||
|
@ -2643,7 +2643,8 @@ class TestContainerController(unittest.TestCase):
|
||||
# make a container
|
||||
ts_iter = make_timestamp_iter()
|
||||
ts_now = Timestamp.now() # used when mocking Timestamp.now()
|
||||
headers = {'X-Timestamp': next(ts_iter).normal}
|
||||
ts_put = next(ts_iter)
|
||||
headers = {'X-Timestamp': ts_put.normal}
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers=headers)
|
||||
self.assertEqual(201, req.get_response(self.controller).status_int)
|
||||
# PUT some objects
|
||||
@ -2713,6 +2714,25 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
|
||||
def check_shard_GET_override_filter(
|
||||
expected_shard_ranges, path, state, params=''):
|
||||
req_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': state}
|
||||
req = Request.blank('/sda1/p/%s?format=json%s' %
|
||||
(path, params), method='GET',
|
||||
headers=req_headers)
|
||||
with mock_timestamp_now(ts_now):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
expected = [
|
||||
dict(sr, last_modified=Timestamp(sr.timestamp).isoformat)
|
||||
for sr in expected_shard_ranges]
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
return resp
|
||||
|
||||
# all shards
|
||||
check_shard_GET(shard_ranges, 'a/c')
|
||||
check_shard_GET(reversed(shard_ranges), 'a/c', params='&reverse=true')
|
||||
@ -2862,6 +2882,72 @@ class TestContainerController(unittest.TestCase):
|
||||
check_shard_GET([], 'a/c',
|
||||
params='&marker=cheese&end_marker=egg&reverse=true')
|
||||
|
||||
# now vary the sharding state and check the consequences of sending the
|
||||
# x-backend-override-shard-name-filter header:
|
||||
# in unsharded & sharding state the header should be ignored
|
||||
self.assertEqual('unsharded', broker.get_db_state())
|
||||
check_shard_GET(
|
||||
reversed(shard_ranges[:2]), 'a/c',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
resp = check_shard_GET_override_filter(
|
||||
reversed(shard_ranges[:2]), 'a/c', state='unsharded',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
self.assertNotIn('X-Backend-Override-Shard-Name-Filter', resp.headers)
|
||||
resp = check_shard_GET_override_filter(
|
||||
reversed(shard_ranges[:2]), 'a/c', state='sharded',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
self.assertIsNone(
|
||||
resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
|
||||
ts_epoch = next(ts_iter)
|
||||
broker.enable_sharding(ts_epoch)
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
check_shard_GET(
|
||||
reversed(shard_ranges[:2]), 'a/c',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
resp = check_shard_GET_override_filter(
|
||||
reversed(shard_ranges[:2]), 'a/c', state='sharding',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
self.assertNotIn('X-Backend-Override-Shard-Name-Filter', resp.headers)
|
||||
resp = check_shard_GET_override_filter(
|
||||
reversed(shard_ranges[:2]), 'a/c', state='sharded',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
self.assertIsNone(
|
||||
resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
|
||||
# in sharded state the server *will* override the marker and reverse
|
||||
# params and return listing shard ranges for entire namespace if
|
||||
# X-Backend-Override-Shard-Name-Filter == 'sharded'
|
||||
self.assertTrue(broker.set_sharded_state())
|
||||
ts_now = next(ts_iter)
|
||||
with mock_timestamp_now(ts_now):
|
||||
extra_shard_range = broker.get_own_shard_range()
|
||||
extra_shard_range.lower = shard_ranges[2].upper
|
||||
extra_shard_range.upper = ShardRange.MAX
|
||||
check_shard_GET(
|
||||
reversed(shard_ranges[:2]), 'a/c',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
expected = shard_ranges[:3] + [extra_shard_range]
|
||||
resp = check_shard_GET_override_filter(
|
||||
reversed(shard_ranges[:2]), 'a/c', state='sharding',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
self.assertNotIn('X-Backend-Override-Shard-Name-Filter', resp.headers)
|
||||
resp = check_shard_GET_override_filter(
|
||||
expected, 'a/c', state='sharded',
|
||||
params='&states=listing&reverse=true&marker=egg')
|
||||
self.assertEqual(
|
||||
'true', resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
|
||||
# updating state excludes the first shard which has 'shrinking' state
|
||||
# but includes the fourth which has 'created' state
|
||||
extra_shard_range.lower = shard_ranges[3].upper
|
||||
check_shard_GET(
|
||||
shard_ranges[1:2], 'a/c',
|
||||
params='&states=updating&includes=egg')
|
||||
expected = shard_ranges[1:4] + [extra_shard_range]
|
||||
resp = check_shard_GET_override_filter(
|
||||
expected, 'a/c', state='sharded',
|
||||
params='&states=updating&includes=egg')
|
||||
self.assertEqual(
|
||||
'true', resp.headers.get('X-Backend-Override-Shard-Name-Filter'))
|
||||
|
||||
# delete a shard range
|
||||
shard_range = shard_ranges[1]
|
||||
shard_range.set_deleted(timestamp=next(ts_iter))
|
||||
|
@ -27,7 +27,7 @@ from swift.proxy.controllers.base import headers_to_container_info, \
|
||||
headers_to_account_info, headers_to_object_info, get_container_info, \
|
||||
get_cache_key, get_account_info, get_info, get_object_info, \
|
||||
Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \
|
||||
set_info_cache, NodeIter
|
||||
set_info_cache, NodeIter, headers_from_container_info
|
||||
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
|
||||
bytes_to_wsgi
|
||||
from swift.common import exceptions
|
||||
@ -470,6 +470,16 @@ class TestFuncs(BaseTest):
|
||||
u"\U0001F334".encode('utf8')),
|
||||
expected)
|
||||
|
||||
self.assertEqual(get_cache_key("account", "cont", shard="listing"),
|
||||
'shard-listing/account/cont')
|
||||
self.assertEqual(get_cache_key("account", "cont", shard="updating"),
|
||||
'shard-updating/account/cont')
|
||||
self.assertRaises(ValueError,
|
||||
get_cache_key, "account", shard="listing")
|
||||
self.assertRaises(ValueError,
|
||||
get_cache_key, "account", "cont", "obj",
|
||||
shard="listing")
|
||||
|
||||
def test_get_container_info_env(self):
|
||||
cache_key = get_cache_key("account", "cont")
|
||||
req = Request.blank(
|
||||
@ -509,6 +519,16 @@ class TestFuncs(BaseTest):
|
||||
check_not_in_cache(req, acct_cache_key)
|
||||
check_not_in_cache(req, cont_cache_key)
|
||||
|
||||
# check shard cache-keys
|
||||
shard_cache_key = get_cache_key('account', 'cont', shard='listing')
|
||||
shard_data = [{'shard': 'ranges'}]
|
||||
req.environ['swift.infocache'][shard_cache_key] = shard_data
|
||||
req.environ['swift.cache'].set(shard_cache_key, shard_data, time=600)
|
||||
check_in_cache(req, shard_cache_key)
|
||||
clear_info_cache('app-is-unused', req.environ, 'account', 'cont',
|
||||
shard='listing')
|
||||
check_not_in_cache(req, shard_cache_key)
|
||||
|
||||
def test_get_account_info_swift_source(self):
|
||||
app = FakeApp()
|
||||
req = Request.blank("/v1/a", environ={'swift.cache': FakeCache()})
|
||||
@ -718,6 +738,101 @@ class TestFuncs(BaseTest):
|
||||
resp,
|
||||
headers_to_container_info(headers.items(), 200))
|
||||
|
||||
def test_headers_from_container_info(self):
|
||||
self.assertIsNone(headers_from_container_info(None))
|
||||
self.assertIsNone(headers_from_container_info({}))
|
||||
|
||||
meta = {'fruit': 'cake'}
|
||||
sysmeta = {'green': 'land'}
|
||||
info = {
|
||||
'status': 200,
|
||||
'read_acl': 'my-read-acl',
|
||||
'write_acl': 'my-write-acl',
|
||||
'sync_to': 'my-sync-to',
|
||||
'sync_key': 'my-sync-key',
|
||||
'object_count': 99,
|
||||
'bytes': 999,
|
||||
'versions': 'my-versions',
|
||||
'storage_policy': '0',
|
||||
'cors': {
|
||||
'allow_origin': 'my-cors-origin',
|
||||
'expose_headers': 'my-cors-hdrs',
|
||||
'max_age': 'my-cors-age'},
|
||||
'created_at': '123.456_12',
|
||||
'put_timestamp': '234.567_34',
|
||||
'delete_timestamp': '345_67',
|
||||
'status_changed_at': '246.8_9',
|
||||
'meta': meta,
|
||||
'sysmeta': sysmeta,
|
||||
'sharding_state': 'unsharded'
|
||||
}
|
||||
|
||||
res = headers_from_container_info(info)
|
||||
|
||||
expected = {
|
||||
'X-Backend-Delete-Timestamp': '345_67',
|
||||
'X-Backend-Put-Timestamp': '234.567_34',
|
||||
'X-Backend-Sharding-State': 'unsharded',
|
||||
'X-Backend-Status-Changed-At': '246.8_9',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Timestamp': '123.456_12',
|
||||
'X-Container-Bytes-Used': '999',
|
||||
'X-Container-Meta-Fruit': 'cake',
|
||||
'X-Container-Object-Count': '99',
|
||||
'X-Container-Read': 'my-read-acl',
|
||||
'X-Container-Sync-Key': 'my-sync-key',
|
||||
'X-Container-Sync-To': 'my-sync-to',
|
||||
'X-Container-Sysmeta-Green': 'land',
|
||||
'X-Container-Write': 'my-write-acl',
|
||||
'X-Put-Timestamp': '0000000234.56700',
|
||||
'X-Storage-Policy': 'zero',
|
||||
'X-Timestamp': '0000000123.45600',
|
||||
'X-Versions-Location': 'my-versions',
|
||||
'X-Container-Meta-Access-Control-Allow-Origin': 'my-cors-origin',
|
||||
'X-Container-Meta-Access-Control-Expose-Headers': 'my-cors-hdrs',
|
||||
'X-Container-Meta-Access-Control-Max-Age': 'my-cors-age',
|
||||
}
|
||||
|
||||
self.assertEqual(expected, res)
|
||||
|
||||
for required in (
|
||||
'created_at', 'put_timestamp', 'delete_timestamp',
|
||||
'status_changed_at', 'storage_policy', 'object_count', 'bytes',
|
||||
'sharding_state'):
|
||||
incomplete_info = dict(info)
|
||||
incomplete_info.pop(required)
|
||||
self.assertIsNone(headers_from_container_info(incomplete_info))
|
||||
|
||||
for hdr, optional in (
|
||||
('X-Container-Read', 'read_acl'),
|
||||
('X-Container-Write', 'write_acl'),
|
||||
('X-Container-Sync-Key', 'sync_key'),
|
||||
('X-Container-Sync-To', 'sync_to'),
|
||||
('X-Versions-Location', 'versions'),
|
||||
('X-Container-Meta-Fruit', 'meta'),
|
||||
('X-Container-Sysmeta-Green', 'sysmeta'),
|
||||
):
|
||||
incomplete_info = dict(info)
|
||||
incomplete_info.pop(optional)
|
||||
incomplete_expected = dict(expected)
|
||||
incomplete_expected.pop(hdr)
|
||||
self.assertEqual(incomplete_expected,
|
||||
headers_from_container_info(incomplete_info))
|
||||
|
||||
for hdr, optional in (
|
||||
('Access-Control-Allow-Origin', 'allow_origin'),
|
||||
('Access-Control-Expose-Headers', 'expose_headers'),
|
||||
('Access-Control-Max-Age', 'max_age'),
|
||||
):
|
||||
incomplete_info = dict(info)
|
||||
incomplete_cors = dict(info['cors'])
|
||||
incomplete_cors.pop(optional)
|
||||
incomplete_info['cors'] = incomplete_cors
|
||||
incomplete_expected = dict(expected)
|
||||
incomplete_expected.pop('X-Container-Meta-' + hdr)
|
||||
self.assertEqual(incomplete_expected,
|
||||
headers_from_container_info(incomplete_info))
|
||||
|
||||
def test_container_info_needs_req(self):
|
||||
base = Controller(self.app)
|
||||
base.account_name = 'a'
|
||||
|
@ -580,6 +580,7 @@ class TestContainerController(TestRingBase):
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
expected_objects = all_objects
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
# pretend root object stats are not yet updated
|
||||
'X-Container-Object-Count': num_all_objects - 1,
|
||||
'X-Container-Bytes-Used': size_all_objects - 1,
|
||||
@ -918,6 +919,7 @@ class TestContainerController(TestRingBase):
|
||||
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
# pretend root object stats are not yet updated
|
||||
'X-Container-Object-Count': 6,
|
||||
'X-Container-Bytes-Used': 12,
|
||||
@ -1152,6 +1154,7 @@ class TestContainerController(TestRingBase):
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
# pretend root object stats are not yet updated
|
||||
'X-Container-Object-Count': num_all_objects - 1,
|
||||
'X-Container-Bytes-Used': size_all_objects - 1,
|
||||
@ -1258,6 +1261,7 @@ class TestContainerController(TestRingBase):
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
'X-Container-Object-Count': num_all_objects,
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
@ -1320,6 +1324,7 @@ class TestContainerController(TestRingBase):
|
||||
all_objects = sr_objs[1] + sr_objs[2]
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
'X-Container-Object-Count': len(all_objects),
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
@ -1359,6 +1364,7 @@ class TestContainerController(TestRingBase):
|
||||
all_objects = sr_objs[0] + sr_objs[1]
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
'X-Container-Object-Count': len(all_objects),
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
@ -1399,6 +1405,7 @@ class TestContainerController(TestRingBase):
|
||||
all_objects = sr_objs[0] + sr_objs[2]
|
||||
size_all_objects = sum([obj['bytes'] for obj in all_objects])
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
'X-Container-Object-Count': len(all_objects),
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
@ -1513,6 +1520,7 @@ class TestContainerController(TestRingBase):
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
'X-Container-Object-Count': num_all_objects,
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
@ -1595,6 +1603,7 @@ class TestContainerController(TestRingBase):
|
||||
num_all_objects = len(all_objects)
|
||||
limit = CONTAINER_LISTING_LIMIT
|
||||
root_resp_hdrs = {'X-Backend-Sharding-State': 'sharded',
|
||||
'X-Backend-Timestamp': '99',
|
||||
'X-Container-Object-Count': num_all_objects,
|
||||
'X-Container-Bytes-Used': size_all_objects,
|
||||
'X-Container-Meta-Flavour': 'peach',
|
||||
@ -1652,6 +1661,789 @@ class TestContainerController(TestRingBase):
|
||||
# root object count will overridden by actual length of listing
|
||||
self.check_response(resp, root_resp_hdrs)
|
||||
|
||||
def _build_request(self, headers, params, infocache=None):
|
||||
# helper to make a GET request with caches set in environ
|
||||
query_string = '?' + ';'.join('%s=%s' % (k, v)
|
||||
for k, v in params.items())
|
||||
container_path = '/v1/a/c' + query_string
|
||||
request = Request.blank(container_path, headers=headers)
|
||||
request.environ['swift.cache'] = self.memcache
|
||||
request.environ['swift.infocache'] = infocache if infocache else {}
|
||||
return request
|
||||
|
||||
def _check_response(self, resp, exp_shards, extra_hdrs):
|
||||
# helper to check a shard listing response
|
||||
actual_shards = json.loads(resp.body)
|
||||
self.assertEqual(exp_shards, actual_shards)
|
||||
exp_hdrs = dict(self.root_resp_hdrs)
|
||||
# x-put-timestamp is sent from backend but removed in proxy base
|
||||
# controller GETorHEAD_base so not expected in response from proxy
|
||||
exp_hdrs.pop('X-Put-Timestamp')
|
||||
self.assertIn('X-Timestamp', resp.headers)
|
||||
actual_timestamp = resp.headers.pop('X-Timestamp')
|
||||
exp_timestamp = exp_hdrs.pop('X-Timestamp')
|
||||
self.assertEqual(Timestamp(exp_timestamp),
|
||||
Timestamp(actual_timestamp))
|
||||
exp_hdrs.update(extra_hdrs)
|
||||
exp_hdrs.update(
|
||||
{'X-Storage-Policy': 'zero', # added in container controller
|
||||
'Content-Length':
|
||||
str(len(json.dumps(exp_shards).encode('ascii'))),
|
||||
}
|
||||
)
|
||||
# we expect this header to be removed by proxy
|
||||
exp_hdrs.pop('X-Backend-Override-Shard-Name-Filter', None)
|
||||
for ignored in ('x-account-container-count', 'x-object-meta-test',
|
||||
'x-delete-at', 'etag', 'x-works'):
|
||||
# FakeConn adds these
|
||||
resp.headers.pop(ignored, None)
|
||||
self.assertEqual(exp_hdrs, resp.headers)
|
||||
|
||||
def _capture_backend_request(self, req, resp_status, resp_body,
|
||||
resp_extra_hdrs, num_resp=1):
|
||||
self.assertGreater(num_resp, 0) # sanity check
|
||||
resp_hdrs = dict(self.root_resp_hdrs)
|
||||
resp_hdrs.update(resp_extra_hdrs)
|
||||
resp_status = [resp_status] * num_resp
|
||||
with mocked_http_conn(
|
||||
*resp_status, body_iter=[resp_body] * num_resp,
|
||||
headers=[resp_hdrs] * num_resp) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp_status[0], resp.status_int)
|
||||
self.assertEqual(num_resp, len(fake_conn.requests))
|
||||
return fake_conn.requests[0], resp
|
||||
|
||||
def _check_backend_req(self, req, backend_req, extra_params=None,
|
||||
extra_hdrs=None):
|
||||
self.assertEqual('a/c', backend_req['path'][7:])
|
||||
|
||||
expected_params = {'states': 'listing', 'format': 'json'}
|
||||
if extra_params:
|
||||
expected_params.update(extra_params)
|
||||
if six.PY2:
|
||||
backend_params = dict(urllib.parse.parse_qsl(
|
||||
backend_req['qs'], True))
|
||||
else:
|
||||
backend_params = dict(urllib.parse.parse_qsl(
|
||||
backend_req['qs'], True, encoding='latin1'))
|
||||
self.assertEqual(expected_params, backend_params)
|
||||
|
||||
backend_hdrs = backend_req['headers']
|
||||
self.assertIsNotNone(backend_hdrs.pop('Referer', None))
|
||||
self.assertIsNotNone(backend_hdrs.pop('X-Timestamp', None))
|
||||
self.assertTrue(backend_hdrs.pop('User-Agent', '').startswith(
|
||||
'proxy-server'))
|
||||
expected_headers = {
|
||||
'Connection': 'close',
|
||||
'Host': 'localhost:80',
|
||||
'X-Trans-Id': req.headers['X-Trans-Id']}
|
||||
if extra_hdrs:
|
||||
expected_headers.update(extra_hdrs)
|
||||
self.assertEqual(expected_headers, backend_hdrs)
|
||||
for k, v in expected_headers.items():
|
||||
self.assertIn(k, backend_hdrs)
|
||||
self.assertEqual(v, backend_hdrs.get(k))
|
||||
|
||||
def _setup_shard_range_stubs(self):
|
||||
self.memcache = FakeMemcache()
|
||||
shard_bounds = (('', 'ham'), ('ham', 'pie'), ('pie', ''))
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), lower, upper)
|
||||
for lower, upper in shard_bounds]
|
||||
self.sr_dicts = [dict(sr) for sr in shard_ranges]
|
||||
self._stub_shards_dump = json.dumps(self.sr_dicts).encode('ascii')
|
||||
self.root_resp_hdrs = {
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Content-Type': 'application/json',
|
||||
'Last-Modified': 'Thu, 01 Jan 1970 00:00:03 GMT',
|
||||
'X-Backend-Timestamp': '2',
|
||||
'X-Backend-Put-Timestamp': '3',
|
||||
'X-Backend-Delete-Timestamp': '0',
|
||||
'X-Backend-Status-Changed-At': '0',
|
||||
'X-Timestamp': '2',
|
||||
'X-Put-Timestamp': '3',
|
||||
'X-Container-Object-Count': '6',
|
||||
'X-Container-Bytes-Used': '12',
|
||||
'X-Backend-Storage-Policy-Index': '0'}
|
||||
|
||||
def _do_test_caching(self, record_type, exp_recheck_listing):
|
||||
# this test gest shard ranges into cache and then reads from cache
|
||||
sharding_state = 'sharded'
|
||||
self.memcache.delete_all()
|
||||
self.memcache.clear_calls()
|
||||
# container is sharded but proxy does not have that state cached;
|
||||
# expect a backend request and expect shard ranges to be cached
|
||||
self.memcache.clear_calls()
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump,
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true'})
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': record_type,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[('get', 'container/a/c', None, None),
|
||||
('set', 'shard-listing/a/c', self.sr_dicts,
|
||||
exp_recheck_listing),
|
||||
('set', 'container/a/c', mock.ANY, 60)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(self.sr_dicts, self.memcache.calls[1][2])
|
||||
self.assertEqual(sharding_state,
|
||||
self.memcache.calls[2][2]['sharding_state'])
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
|
||||
# container is sharded and proxy does have that state cached and
|
||||
# also has shard ranges cached; expect a read from cache
|
||||
self.memcache.clear_calls()
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
resp = req.get_response(self.app)
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Cached-Results': 'true',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[('get', 'container/a/c', None, None),
|
||||
('get', 'shard-listing/a/c', None, None)],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
|
||||
# delete the container; check that shard ranges are evicted from cache
|
||||
self.memcache.clear_calls()
|
||||
infocache = {}
|
||||
req = Request.blank('/v1/a/c', method='DELETE')
|
||||
req.environ['swift.cache'] = self.memcache
|
||||
req.environ['swift.infocache'] = infocache
|
||||
self._capture_backend_request(req, 204, b'', {},
|
||||
num_resp=self.CONTAINER_REPLICAS)
|
||||
self.assertEqual(
|
||||
[('delete', 'container/a/c', None, None),
|
||||
('delete', 'shard-listing/a/c', None, None)],
|
||||
self.memcache.calls)
|
||||
|
||||
def test_GET_shard_ranges(self):
|
||||
self._setup_shard_range_stubs()
|
||||
# expect shard ranges cache time to be default value of 600
|
||||
self._do_test_caching('shard', 600)
|
||||
# expect shard ranges cache time to be configured value of 120
|
||||
self.app.recheck_listing_shard_ranges = 120
|
||||
self._do_test_caching('shard', 120)
|
||||
|
||||
def mock_get_from_shards(self, req, resp):
|
||||
# for the purposes of these tests we override _get_from_shards so
|
||||
# that the response contains the shard listing even though the
|
||||
# record_type is 'auto'; these tests are verifying the content and
|
||||
# caching of the backend shard range response so we're not
|
||||
# interested in gathering object from the shards
|
||||
return resp
|
||||
|
||||
with mock.patch('swift.proxy.controllers.container.'
|
||||
'ContainerController._get_from_shards',
|
||||
mock_get_from_shards):
|
||||
self.app.recheck_listing_shard_ranges = 600
|
||||
self._do_test_caching('auto', 600)
|
||||
|
||||
def test_GET_shard_ranges_404_response(self):
|
||||
# pre-warm cache with container info but not shard ranges so that the
|
||||
# backend request tries to get a cacheable listing, but backend 404's
|
||||
self._setup_shard_range_stubs()
|
||||
self.memcache.delete_all()
|
||||
info = headers_to_container_info(self.root_resp_hdrs)
|
||||
info['status'] = 200
|
||||
info['sharding_state'] = 'sharded'
|
||||
self.memcache.set('container/a/c', info)
|
||||
self.memcache.clear_calls()
|
||||
req = self._build_request({'X-Backend-Record-Type': 'shard'},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 404, b'', {}, num_resp=2 * self.CONTAINER_REPLICAS)
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
self.assertNotIn('X-Backend-Cached-Results', resp.headers)
|
||||
# Note: container metadata is updated in cache but shard ranges are not
|
||||
# deleted from cache
|
||||
self.assertEqual(
|
||||
[('get', 'container/a/c', None, None),
|
||||
('get', 'shard-listing/a/c', None, None),
|
||||
('set', 'container/a/c', mock.ANY, 6.0)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(404, self.memcache.calls[2][2]['status'])
|
||||
self.assertEqual(b'', resp.body)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type):
|
||||
# pre-warm cache with container metadata and shard ranges and verify
|
||||
# that shard range listing are read from cache when appropriate
|
||||
self.memcache.delete_all()
|
||||
info = headers_to_container_info(self.root_resp_hdrs)
|
||||
info['status'] = 200
|
||||
info['sharding_state'] = 'sharded'
|
||||
self.memcache.set('container/a/c', info)
|
||||
self.memcache.set('shard-listing/a/c', self.sr_dicts)
|
||||
self.memcache.clear_calls()
|
||||
|
||||
req_hdrs = {'X-Backend-Record-Type': record_type}
|
||||
req = self._build_request(req_hdrs, params, {})
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(
|
||||
[('get', 'container/a/c', None, None),
|
||||
('get', 'shard-listing/a/c', None, None)],
|
||||
self.memcache.calls)
|
||||
return resp
|
||||
|
||||
def test_GET_shard_ranges_read_from_cache(self):
|
||||
self._setup_shard_range_stubs()
|
||||
exp_hdrs = {'X-Backend-Cached-Results': 'true',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'}
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing'}, 'shard')
|
||||
self._check_response(resp, self.sr_dicts, exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'reverse': 'true'}, 'shard')
|
||||
exp_shards = list(self.sr_dicts)
|
||||
exp_shards.reverse()
|
||||
self._check_response(resp, exp_shards, exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'marker': 'jam'}, 'shard')
|
||||
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
|
||||
'shard')
|
||||
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'includes': 'egg'}, 'shard')
|
||||
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
|
||||
|
||||
# override _get_from_shards so that the response contains the shard
|
||||
# listing that we want to verify even though the record_type is 'auto'
|
||||
def mock_get_from_shards(self, req, resp):
|
||||
return resp
|
||||
|
||||
with mock.patch('swift.proxy.controllers.container.'
|
||||
'ContainerController._get_from_shards',
|
||||
mock_get_from_shards):
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'reverse': 'true'}, 'auto')
|
||||
exp_shards = list(self.sr_dicts)
|
||||
exp_shards.reverse()
|
||||
self._check_response(resp, exp_shards, exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'marker': 'jam'}, 'auto')
|
||||
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
|
||||
'auto')
|
||||
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_read_from_cache(
|
||||
{'states': 'listing', 'includes': 'egg'}, 'auto')
|
||||
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
|
||||
|
||||
def _do_test_GET_shard_ranges_write_to_cache(self, params, record_type):
|
||||
# verify that shard range listing are written to cache when appropriate
|
||||
self.memcache.delete_all()
|
||||
self.memcache.clear_calls()
|
||||
# set request up for cacheable listing
|
||||
req_hdrs = {'X-Backend-Record-Type': record_type}
|
||||
req = self._build_request(req_hdrs, params, {})
|
||||
# response indicates cacheable listing
|
||||
resp_hdrs = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'}
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump, resp_hdrs)
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_params=params,
|
||||
extra_hdrs={'X-Backend-Record-Type': record_type,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
|
||||
expected_hdrs.update(resp_hdrs)
|
||||
self.assertEqual(
|
||||
[('get', 'container/a/c', None, None),
|
||||
('set', 'shard-listing/a/c', self.sr_dicts, 600),
|
||||
('set', 'container/a/c', mock.ANY, 60)],
|
||||
self.memcache.calls)
|
||||
# shards were cached
|
||||
self.assertEqual(self.sr_dicts, self.memcache.calls[1][2])
|
||||
self.assertEqual('sharded',
|
||||
self.memcache.calls[2][2]['sharding_state'])
|
||||
return resp
|
||||
|
||||
def test_GET_shard_ranges_write_to_cache(self):
|
||||
self._setup_shard_range_stubs()
|
||||
exp_hdrs = {'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'}
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing'}, 'shard')
|
||||
self._check_response(resp, self.sr_dicts, exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'reverse': 'true'}, 'shard')
|
||||
exp_shards = list(self.sr_dicts)
|
||||
exp_shards.reverse()
|
||||
self._check_response(resp, exp_shards, exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'marker': 'jam'}, 'shard')
|
||||
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
|
||||
'shard')
|
||||
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'includes': 'egg'}, 'shard')
|
||||
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
|
||||
|
||||
# override _get_from_shards so that the response contains the shard
|
||||
# listing that we want to verify even though the record_type is 'auto'
|
||||
def mock_get_from_shards(self, req, resp):
|
||||
return resp
|
||||
|
||||
with mock.patch('swift.proxy.controllers.container.'
|
||||
'ContainerController._get_from_shards',
|
||||
mock_get_from_shards):
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'reverse': 'true'}, 'auto')
|
||||
exp_shards = list(self.sr_dicts)
|
||||
exp_shards.reverse()
|
||||
self._check_response(resp, exp_shards, exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'marker': 'jam'}, 'auto')
|
||||
self._check_response(resp, self.sr_dicts[1:], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'marker': 'jam', 'end_marker': 'kale'},
|
||||
'auto')
|
||||
self._check_response(resp, self.sr_dicts[1:2], exp_hdrs)
|
||||
|
||||
resp = self._do_test_GET_shard_ranges_write_to_cache(
|
||||
{'states': 'listing', 'includes': 'egg'}, 'auto')
|
||||
self._check_response(resp, self.sr_dicts[:1], exp_hdrs)
|
||||
|
||||
def test_GET_shard_ranges_write_to_cache_with_x_newest(self):
|
||||
# when x-newest is sent, verify that there is no cache lookup to check
|
||||
# sharding state but then backend requests are made requesting complete
|
||||
# shard list which can be cached
|
||||
self._setup_shard_range_stubs()
|
||||
self.memcache.delete_all()
|
||||
self.memcache.clear_calls()
|
||||
req_hdrs = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true'}
|
||||
params = {'states': 'listing'}
|
||||
req = self._build_request(req_hdrs, params, {})
|
||||
resp_hdrs = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'}
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump, resp_hdrs,
|
||||
num_resp=2 * self.CONTAINER_REPLICAS)
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
|
||||
expected_hdrs.update(resp_hdrs)
|
||||
self._check_response(resp, self.sr_dicts, expected_hdrs)
|
||||
self.assertEqual(
|
||||
[('set', 'shard-listing/a/c', self.sr_dicts, 600),
|
||||
('set', 'container/a/c', mock.ANY, 60)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(self.sr_dicts, self.memcache.calls[0][2])
|
||||
self.assertEqual('sharded',
|
||||
self.memcache.calls[1][2]['sharding_state'])
|
||||
|
||||
def _do_test_GET_shard_ranges_no_cache_write(self, resp_hdrs):
|
||||
# verify that there is a cache lookup to check container info but then
|
||||
# a backend request is made requesting complete shard list, but do not
|
||||
# expect shard ranges to be cached; check that marker, end_marker etc
|
||||
# are passed to backend
|
||||
self.memcache.clear_calls()
|
||||
req = self._build_request(
|
||||
{'X-Backend-Record-Type': 'shard'},
|
||||
{'states': 'listing', 'marker': 'egg', 'end_marker': 'jam',
|
||||
'reverse': 'true'}, {})
|
||||
resp_shards = self.sr_dicts[:2]
|
||||
resp_shards.reverse()
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, json.dumps(resp_shards).encode('ascii'),
|
||||
resp_hdrs)
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_params={'marker': 'egg', 'end_marker': 'jam',
|
||||
'reverse': 'true'},
|
||||
extra_hdrs={'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
expected_shards = self.sr_dicts[:2]
|
||||
expected_shards.reverse()
|
||||
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
|
||||
expected_hdrs.update(resp_hdrs)
|
||||
self._check_response(resp, expected_shards, expected_hdrs)
|
||||
# container metadata is looked up in memcache for sharding state
|
||||
# container metadata is set in memcache
|
||||
self.assertEqual(
|
||||
[('get', 'container/a/c', None, None),
|
||||
('set', 'container/a/c', mock.ANY, 60)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(resp.headers.get('X-Backend-Sharding-State'),
|
||||
self.memcache.calls[1][2]['sharding_state'])
|
||||
self.memcache.delete_all()
|
||||
|
||||
def test_GET_shard_ranges_no_cache_write_with_cached_container_info(self):
|
||||
# pre-warm cache with container info, but verify that shard range cache
|
||||
# lookup is only attempted when the cached sharding state and status
|
||||
# are suitable, and full set of headers can be constructed from cache;
|
||||
# Note: backend response has state unsharded so no shard ranges cached
|
||||
self._setup_shard_range_stubs()
|
||||
|
||||
def do_test(info):
|
||||
self._setup_shard_range_stubs()
|
||||
self.memcache.set('container/a/c', info)
|
||||
# expect the same outcomes as if there was no cached container info
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'unsharded'})
|
||||
|
||||
# setup a default 'good' info
|
||||
info = headers_to_container_info(self.root_resp_hdrs)
|
||||
info['status'] = 200
|
||||
info['sharding_state'] = 'sharded'
|
||||
do_test(dict(info, status=404))
|
||||
do_test(dict(info, sharding_state='unsharded'))
|
||||
do_test(dict(info, sharding_state='sharding'))
|
||||
do_test(dict(info, sharding_state='collapsed'))
|
||||
do_test(dict(info, sharding_state='unexpected'))
|
||||
|
||||
stale_info = dict(info)
|
||||
stale_info.pop('created_at')
|
||||
do_test(stale_info)
|
||||
|
||||
stale_info = dict(info)
|
||||
stale_info.pop('put_timestamp')
|
||||
do_test(stale_info)
|
||||
|
||||
stale_info = dict(info)
|
||||
stale_info.pop('delete_timestamp')
|
||||
do_test(stale_info)
|
||||
|
||||
stale_info = dict(info)
|
||||
stale_info.pop('status_changed_at')
|
||||
do_test(stale_info)
|
||||
|
||||
def test_GET_shard_ranges_no_cache_write_for_non_sharded_states(self):
|
||||
# verify that shard ranges are not written to cache when container
|
||||
# state returned by backend is not 'sharded'; we don't expect
|
||||
# 'X-Backend-Override-Shard-Name-Filter': 'true' to be returned unless
|
||||
# the sharding state is 'sharded' but include it in this test to check
|
||||
# that the state is checked by proxy controller
|
||||
self._setup_shard_range_stubs()
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'unsharded'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharding'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'collapsed'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'unexpected'})
|
||||
|
||||
def test_GET_shard_ranges_no_cache_write_for_incomplete_listing(self):
|
||||
# verify that shard ranges are not written to cache when container
|
||||
# response does not acknowledge x-backend-override-shard-name-filter
|
||||
# e.g. container server not upgraded
|
||||
self._setup_shard_range_stubs()
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'false',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'rogue',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
|
||||
def test_GET_shard_ranges_no_cache_write_for_object_listing(self):
|
||||
# verify that shard ranges are not written to cache when container
|
||||
# response does not return shard ranges
|
||||
self._setup_shard_range_stubs()
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'object',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'other',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Record-Type': 'true',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self._do_test_GET_shard_ranges_no_cache_write(
|
||||
{'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
|
||||
def _do_test_GET_shard_ranges_bad_response_body(self, resp_body):
|
||||
# verify that resp body is not cached if shard range parsing fails;
|
||||
# check the original unparseable response body is returned
|
||||
self._setup_shard_range_stubs()
|
||||
self.memcache.clear_calls()
|
||||
req = self._build_request(
|
||||
{'X-Backend-Record-Type': 'shard'},
|
||||
{'states': 'listing'}, {})
|
||||
resp_hdrs = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true',
|
||||
'X-Backend-Sharding-State': 'sharded'}
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, json.dumps(resp_body).encode('ascii'),
|
||||
resp_hdrs)
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
expected_hdrs = {'X-Backend-Recheck-Container-Existence': '60'}
|
||||
expected_hdrs.update(resp_hdrs)
|
||||
self._check_response(resp, resp_body, expected_hdrs)
|
||||
# container metadata is looked up in memcache for sharding state
|
||||
# container metadata is set in memcache
|
||||
self.assertEqual(
|
||||
[('get', 'container/a/c', None, None),
|
||||
('set', 'container/a/c', mock.ANY, 60)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(resp.headers.get('X-Backend-Sharding-State'),
|
||||
self.memcache.calls[1][2]['sharding_state'])
|
||||
self.memcache.delete_all()
|
||||
|
||||
def test_GET_shard_ranges_bad_response_body(self):
|
||||
self._do_test_GET_shard_ranges_bad_response_body(
|
||||
{'bad': 'data', 'not': ' a list'})
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(error_lines), error_lines)
|
||||
self.assertIn('Problem with listing response', error_lines[0])
|
||||
|
||||
self.logger.clear()
|
||||
self._do_test_GET_shard_ranges_bad_response_body(
|
||||
[{'not': ' a shard range'}])
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(error_lines), error_lines)
|
||||
self.assertIn('Failed to get shard ranges', error_lines[0])
|
||||
|
||||
self.logger.clear()
|
||||
self._do_test_GET_shard_ranges_bad_response_body(
|
||||
'not json')
|
||||
error_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(error_lines), error_lines)
|
||||
self.assertIn('Problem with listing response', error_lines[0])
|
||||
|
||||
def _do_test_GET_shards_no_cache(self, sharding_state, req_params,
|
||||
req_hdrs=None):
|
||||
# verify that a shard GET request does not lookup in cache or attempt
|
||||
# to cache shard ranges fetched from backend
|
||||
self.memcache.delete_all()
|
||||
self.memcache.clear_calls()
|
||||
req_params.update(dict(marker='egg', end_marker='jam'))
|
||||
hdrs = {'X-Backend-Record-Type': 'shard'}
|
||||
if req_hdrs:
|
||||
hdrs.update(req_hdrs)
|
||||
req = self._build_request(hdrs, req_params, {})
|
||||
resp_shards = self.sr_dicts[:2]
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, json.dumps(resp_shards).encode('ascii'),
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self._check_backend_req(
|
||||
req, backend_req, extra_hdrs=hdrs, extra_params=req_params)
|
||||
expected_shards = self.sr_dicts[:2]
|
||||
self._check_response(resp, expected_shards, {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
# container metadata from backend response is set in memcache
|
||||
self.assertEqual(
|
||||
[('set', 'container/a/c', mock.ANY, 60)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(sharding_state,
|
||||
self.memcache.calls[0][2]['sharding_state'])
|
||||
|
||||
def test_GET_shard_ranges_no_cache_recheck_listing_shard_ranges(self):
|
||||
# verify that a GET for shards does not lookup or store in cache when
|
||||
# cache expiry time is set to zero
|
||||
self._setup_shard_range_stubs()
|
||||
self.app.recheck_listing_shard_ranges = 0
|
||||
self._do_test_GET_shards_no_cache('unsharded', {'states': 'listing'})
|
||||
self._do_test_GET_shards_no_cache('sharding', {'states': 'listing'})
|
||||
self._do_test_GET_shards_no_cache('sharded', {'states': 'listing'})
|
||||
self._do_test_GET_shards_no_cache('collapsed', {'states': 'listing'})
|
||||
self._do_test_GET_shards_no_cache('unexpected', {'states': 'listing'})
|
||||
|
||||
def test_GET_shard_ranges_no_cache_when_requesting_updating_shards(self):
|
||||
# verify that a GET for shards in updating states does not lookup or
|
||||
# store in cache
|
||||
self._setup_shard_range_stubs()
|
||||
self._do_test_GET_shards_no_cache('unsharded', {'states': 'updating'})
|
||||
self._do_test_GET_shards_no_cache('sharding', {'states': 'updating'})
|
||||
self._do_test_GET_shards_no_cache('sharded', {'states': 'updating'})
|
||||
self._do_test_GET_shards_no_cache('collapsed', {'states': 'updating'})
|
||||
self._do_test_GET_shards_no_cache('unexpected', {'states': 'updating'})
|
||||
|
||||
def test_GET_shard_ranges_no_cache_when_include_deleted_shards(self):
|
||||
# verify that a GET for shards in listing states does not lookup or
|
||||
# store in cache if x-backend-include-deleted is true
|
||||
self._setup_shard_range_stubs()
|
||||
self._do_test_GET_shards_no_cache(
|
||||
'unsharded', {'states': 'listing'},
|
||||
{'X-Backend-Include-Deleted': 'true'})
|
||||
self._do_test_GET_shards_no_cache(
|
||||
'sharding', {'states': 'listing'},
|
||||
{'X-Backend-Include-Deleted': 'true'})
|
||||
self._do_test_GET_shards_no_cache(
|
||||
'sharded', {'states': 'listing'},
|
||||
{'X-Backend-Include-Deleted': 'true'})
|
||||
self._do_test_GET_shards_no_cache(
|
||||
'collapsed', {'states': 'listing'},
|
||||
{'X-Backend-Include-Deleted': 'true'})
|
||||
self._do_test_GET_shards_no_cache(
|
||||
'unexpected', {'states': 'listing'},
|
||||
{'X-Backend-Include-Deleted': 'true'})
|
||||
|
||||
def test_GET_objects_makes_no_cache_lookup(self):
|
||||
# verify that an object GET request does not lookup container metadata
|
||||
# in cache
|
||||
self._setup_shard_range_stubs()
|
||||
self.memcache.delete_all()
|
||||
self.memcache.clear_calls()
|
||||
req_hdrs = {'X-Backend-Record-Type': 'object'}
|
||||
# we would not expect states=listing to be used with an object request
|
||||
# but include it here to verify that it is ignored
|
||||
req = self._build_request(req_hdrs, {'states': 'listing'}, {})
|
||||
resp_body = json.dumps(['object listing']).encode('ascii')
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, resp_body,
|
||||
{'X-Backend-Record-Type': 'object',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs=req_hdrs)
|
||||
self._check_response(resp, ['object listing'], {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'object',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
# container metadata from backend response is set in memcache
|
||||
self.assertEqual(
|
||||
[('set', 'container/a/c', mock.ANY, 60)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual('sharded',
|
||||
self.memcache.calls[0][2]['sharding_state'])
|
||||
|
||||
def test_GET_shard_ranges_no_memcache_available(self):
|
||||
self._setup_shard_range_stubs()
|
||||
self.memcache.clear_calls()
|
||||
hdrs = {'X-Backend-Record-Type': 'shard'}
|
||||
params = {'states': 'listing'}
|
||||
req = self._build_request(hdrs, params, {})
|
||||
req.environ['swift.cache'] = None
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump,
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self._check_backend_req(
|
||||
req, backend_req, extra_params=params, extra_hdrs=hdrs)
|
||||
expected_shards = self.sr_dicts
|
||||
self._check_response(resp, expected_shards, {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': 'sharded'})
|
||||
self.assertEqual([], self.memcache.calls) # sanity check
|
||||
|
||||
def test_cache_clearing(self):
|
||||
# verify that both metadata and shard ranges are purged form memcache
|
||||
# on PUT, POST and DELETE
|
||||
def do_test(method, resp_status, num_resp):
|
||||
self.assertGreater(num_resp, 0) # sanity check
|
||||
memcache = FakeMemcache()
|
||||
cont_key = get_cache_key('a', 'c')
|
||||
shard_key = get_cache_key('a', 'c', shard='listing')
|
||||
memcache.set(cont_key, 'container info', 60)
|
||||
memcache.set(shard_key, 'shard ranges', 600)
|
||||
req = Request.blank('/v1/a/c', method=method)
|
||||
req.environ['swift.cache'] = memcache
|
||||
self.assertIn(cont_key, req.environ['swift.cache'].store)
|
||||
self.assertIn(shard_key, req.environ['swift.cache'].store)
|
||||
resp_status = [resp_status] * num_resp
|
||||
with mocked_http_conn(
|
||||
*resp_status, body_iter=[b''] * num_resp,
|
||||
headers=[{}] * num_resp):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp_status[0], resp.status_int)
|
||||
self.assertNotIn(cont_key, req.environ['swift.cache'].store)
|
||||
self.assertNotIn(shard_key, req.environ['swift.cache'].store)
|
||||
do_test('DELETE', 204, self.CONTAINER_REPLICAS)
|
||||
do_test('POST', 204, self.CONTAINER_REPLICAS)
|
||||
do_test('PUT', 202, self.CONTAINER_REPLICAS)
|
||||
|
||||
def test_GET_bad_requests(self):
|
||||
# verify that the proxy controller enforces checks on request params
|
||||
req = Request.blank(
|
||||
'/v1/a/c?limit=%d' % (CONTAINER_LISTING_LIMIT + 1))
|
||||
self.assertEqual(412, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?delimiter=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?marker=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?end_marker=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?prefix=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?format=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?path=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?includes=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
req = Request.blank('/v1/a/c?states=%ff')
|
||||
self.assertEqual(400, req.get_response(self.app).status_int)
|
||||
|
||||
|
||||
@patch_policies(
|
||||
[StoragePolicy(0, 'zero', True, object_ring=FakeRing(replicas=4))])
|
||||
|
@ -622,6 +622,23 @@ class TestProxyServerConfiguration(unittest.TestCase):
|
||||
set(app.cors_expose_headers))
|
||||
self.assertFalse(app.strict_cors_mode)
|
||||
|
||||
def test_memcache_recheck_options(self):
|
||||
# check default options
|
||||
app = self._make_app({})
|
||||
self.assertEqual(app.recheck_account_existence, 60)
|
||||
self.assertEqual(app.recheck_container_existence, 60)
|
||||
self.assertEqual(app.recheck_updating_shard_ranges, 3600)
|
||||
self.assertEqual(app.recheck_listing_shard_ranges, 600)
|
||||
# check custom options
|
||||
app = self._make_app({'recheck_account_existence': '30',
|
||||
'recheck_container_existence': '40',
|
||||
'recheck_updating_shard_ranges': '1800',
|
||||
'recheck_listing_shard_ranges': ' 900'})
|
||||
self.assertEqual(app.recheck_account_existence, 30)
|
||||
self.assertEqual(app.recheck_container_existence, 40)
|
||||
self.assertEqual(app.recheck_updating_shard_ranges, 1800)
|
||||
self.assertEqual(app.recheck_listing_shard_ranges, 900)
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
||||
class TestProxyServer(unittest.TestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user