Merge "swift_proxy: add memcache skip success/error stats for shard range."
This commit is contained in:
commit
645f05f836
@ -735,6 +735,34 @@ def _get_info_from_infocache(env, account, container=None):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def record_cache_op_metrics(
|
||||||
|
logger, op_type, cache_state, resp=None):
|
||||||
|
"""
|
||||||
|
Record a single cache operation into its corresponding metrics.
|
||||||
|
|
||||||
|
:param logger: the metrics logger
|
||||||
|
:param op_type: the name of the operation type, includes 'shard_listing',
|
||||||
|
'shard_updating', and etc.
|
||||||
|
:param cache_state: the state of this cache operation. When it's
|
||||||
|
'infocache_hit' or memcache 'hit', expect it succeeded and 'resp'
|
||||||
|
will be None; for all other cases like memcache 'miss' or 'skip'
|
||||||
|
which will make to backend, expect a valid 'resp'.
|
||||||
|
:param resp: the response from backend for all cases except cache hits.
|
||||||
|
"""
|
||||||
|
if cache_state == 'infocache_hit':
|
||||||
|
logger.increment('%s.infocache.hit' % op_type)
|
||||||
|
elif cache_state == 'hit':
|
||||||
|
# memcache hits.
|
||||||
|
logger.increment('%s.cache.hit' % op_type)
|
||||||
|
else:
|
||||||
|
# the cases of cache_state is memcache miss, error, skip, force_skip
|
||||||
|
# or disabled.
|
||||||
|
if resp is not None:
|
||||||
|
# Note: currently there is no case that 'resp' will be None.
|
||||||
|
logger.increment(
|
||||||
|
'%s.cache.%s.%d' % (op_type, cache_state, resp.status_int))
|
||||||
|
|
||||||
|
|
||||||
def _get_info_from_memcache(app, env, account, container=None):
|
def _get_info_from_memcache(app, env, account, container=None):
|
||||||
"""
|
"""
|
||||||
Get cached account or container information from memcache
|
Get cached account or container information from memcache
|
||||||
@ -2344,8 +2372,8 @@ class Controller(object):
|
|||||||
req.path_qs, err)
|
req.path_qs, err)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _get_shard_ranges(self, req, account, container, includes=None,
|
def _get_shard_ranges(
|
||||||
states=None):
|
self, req, account, container, includes=None, states=None):
|
||||||
"""
|
"""
|
||||||
Fetch shard ranges from given `account/container`. If `includes` is
|
Fetch shard ranges from given `account/container`. If `includes` is
|
||||||
given then the shard range for that object name is requested, otherwise
|
given then the shard range for that object name is requested, otherwise
|
||||||
@ -2372,6 +2400,38 @@ class Controller(object):
|
|||||||
req, account, container, headers=headers, params=params)
|
req, account, container, headers=headers, params=params)
|
||||||
return self._parse_shard_ranges(req, listing, response), response
|
return self._parse_shard_ranges(req, listing, response), response
|
||||||
|
|
||||||
|
def _get_cached_updating_shard_ranges(
|
||||||
|
self, infocache, memcache, cache_key):
|
||||||
|
"""
|
||||||
|
Fetch cached shard ranges from infocache and memcache.
|
||||||
|
|
||||||
|
:param infocache: the infocache instance.
|
||||||
|
:param memcache: an instance of a memcache client,
|
||||||
|
:class:`swift.common.memcached.MemcacheRing`.
|
||||||
|
:param cache_key: the cache key for both infocache and memcache.
|
||||||
|
:return: a tuple of (list of shard ranges in dict format, cache state)
|
||||||
|
"""
|
||||||
|
cached_ranges = infocache.get(cache_key)
|
||||||
|
if cached_ranges:
|
||||||
|
cache_state = 'infocache_hit'
|
||||||
|
else:
|
||||||
|
if memcache:
|
||||||
|
skip_chance = \
|
||||||
|
self.app.container_updating_shard_ranges_skip_cache
|
||||||
|
if skip_chance and random.random() < skip_chance:
|
||||||
|
cache_state = 'skip'
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
cached_ranges = memcache.get(
|
||||||
|
cache_key, raise_on_error=True)
|
||||||
|
cache_state = 'hit' if cached_ranges else 'miss'
|
||||||
|
except MemcacheConnectionError:
|
||||||
|
cache_state = 'error'
|
||||||
|
else:
|
||||||
|
cache_state = 'disabled'
|
||||||
|
cached_ranges = cached_ranges or []
|
||||||
|
return cached_ranges, cache_state
|
||||||
|
|
||||||
def _get_update_shard(self, req, account, container, obj):
|
def _get_update_shard(self, req, account, container, obj):
|
||||||
"""
|
"""
|
||||||
Find the appropriate shard range for an object update.
|
Find the appropriate shard range for an object update.
|
||||||
@ -2388,52 +2448,37 @@ class Controller(object):
|
|||||||
or None if the update should go back to the root
|
or None if the update should go back to the root
|
||||||
"""
|
"""
|
||||||
if not self.app.recheck_updating_shard_ranges:
|
if not self.app.recheck_updating_shard_ranges:
|
||||||
# caching is disabled; fall back to old behavior
|
# caching is disabled
|
||||||
|
cache_state = 'disabled'
|
||||||
|
# legacy behavior requests container server for includes=obj
|
||||||
shard_ranges, response = self._get_shard_ranges(
|
shard_ranges, response = self._get_shard_ranges(
|
||||||
req, account, container, states='updating', includes=obj)
|
req, account, container, states='updating', includes=obj)
|
||||||
self.logger.increment(
|
else:
|
||||||
'shard_updating.backend.%s' % response.status_int)
|
# try to get from cache
|
||||||
if not shard_ranges:
|
response = None
|
||||||
return None
|
|
||||||
return shard_ranges[0]
|
|
||||||
|
|
||||||
cache_key = get_cache_key(account, container, shard='updating')
|
cache_key = get_cache_key(account, container, shard='updating')
|
||||||
infocache = req.environ.setdefault('swift.infocache', {})
|
infocache = req.environ.setdefault('swift.infocache', {})
|
||||||
memcache = cache_from_env(req.environ, True)
|
memcache = cache_from_env(req.environ, True)
|
||||||
|
(cached_ranges, cache_state
|
||||||
cached_ranges = infocache.get(cache_key)
|
) = self._get_cached_updating_shard_ranges(
|
||||||
if cached_ranges is None and memcache:
|
infocache, memcache, cache_key)
|
||||||
skip_chance = \
|
|
||||||
self.app.container_updating_shard_ranges_skip_cache
|
|
||||||
if skip_chance and random.random() < skip_chance:
|
|
||||||
self.logger.increment('shard_updating.cache.skip')
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
cached_ranges = memcache.get(
|
|
||||||
cache_key, raise_on_error=True)
|
|
||||||
cache_state = 'hit' if cached_ranges else 'miss'
|
|
||||||
except MemcacheConnectionError:
|
|
||||||
cache_state = 'error'
|
|
||||||
self.logger.increment('shard_updating.cache.%s' % cache_state)
|
|
||||||
|
|
||||||
if cached_ranges:
|
if cached_ranges:
|
||||||
shard_ranges = [
|
# found cached shard ranges in either infocache or memcache
|
||||||
ShardRange.from_dict(shard_range)
|
infocache[cache_key] = tuple(cached_ranges)
|
||||||
|
shard_ranges = [ShardRange.from_dict(shard_range)
|
||||||
for shard_range in cached_ranges]
|
for shard_range in cached_ranges]
|
||||||
else:
|
else:
|
||||||
|
# pull full set of updating shards from backend
|
||||||
shard_ranges, response = self._get_shard_ranges(
|
shard_ranges, response = self._get_shard_ranges(
|
||||||
req, account, container, states='updating')
|
req, account, container, states='updating')
|
||||||
self.logger.increment(
|
|
||||||
'shard_updating.backend.%s' % response.status_int)
|
|
||||||
if shard_ranges:
|
if shard_ranges:
|
||||||
cached_ranges = [dict(sr) for sr in shard_ranges]
|
cached_ranges = [dict(sr) for sr in shard_ranges]
|
||||||
# went to disk; cache it
|
infocache[cache_key] = tuple(cached_ranges)
|
||||||
if memcache:
|
if memcache:
|
||||||
memcache.set(cache_key, cached_ranges,
|
memcache.set(
|
||||||
|
cache_key, cached_ranges,
|
||||||
time=self.app.recheck_updating_shard_ranges)
|
time=self.app.recheck_updating_shard_ranges)
|
||||||
|
|
||||||
if not shard_ranges:
|
record_cache_op_metrics(
|
||||||
return None
|
self.logger, 'shard_updating', cache_state, response)
|
||||||
|
return find_shard_range(obj, shard_ranges or [])
|
||||||
infocache[cache_key] = tuple(cached_ranges)
|
|
||||||
return find_shard_range(obj, shard_ranges)
|
|
||||||
|
@ -28,7 +28,8 @@ from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
|
|||||||
constrain_req_limit, validate_container_params
|
constrain_req_limit, validate_container_params
|
||||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||||
cors_validation, set_info_cache, clear_info_cache, _get_info_from_caches, \
|
cors_validation, set_info_cache, clear_info_cache, _get_info_from_caches, \
|
||||||
get_cache_key, headers_from_container_info, update_headers
|
record_cache_op_metrics, get_cache_key, headers_from_container_info, \
|
||||||
|
update_headers
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
|
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
|
||||||
HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, Response
|
HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, Response
|
||||||
@ -126,12 +127,7 @@ class ContainerController(Controller):
|
|||||||
shard_ranges.reverse()
|
shard_ranges.reverse()
|
||||||
return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
|
return json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii')
|
||||||
|
|
||||||
def _get_shard_ranges_from_cache(self, req, info):
|
def _get_shard_ranges_from_cache(self, req, headers):
|
||||||
headers = headers_from_container_info(info)
|
|
||||||
if not headers:
|
|
||||||
# only use cached values if all required headers available
|
|
||||||
return None
|
|
||||||
|
|
||||||
infocache = req.environ.setdefault('swift.infocache', {})
|
infocache = req.environ.setdefault('swift.infocache', {})
|
||||||
memcache = cache_from_env(req.environ, True)
|
memcache = cache_from_env(req.environ, True)
|
||||||
cache_key = get_cache_key(self.account_name,
|
cache_key = get_cache_key(self.account_name,
|
||||||
@ -141,13 +137,14 @@ class ContainerController(Controller):
|
|||||||
resp_body = None
|
resp_body = None
|
||||||
cached_range_dicts = infocache.get(cache_key)
|
cached_range_dicts = infocache.get(cache_key)
|
||||||
if cached_range_dicts:
|
if cached_range_dicts:
|
||||||
|
cache_state = 'infocache_hit'
|
||||||
resp_body = self._make_shard_ranges_response_body(
|
resp_body = self._make_shard_ranges_response_body(
|
||||||
req, cached_range_dicts)
|
req, cached_range_dicts)
|
||||||
elif memcache:
|
elif memcache:
|
||||||
skip_chance = \
|
skip_chance = \
|
||||||
self.app.container_listing_shard_ranges_skip_cache
|
self.app.container_listing_shard_ranges_skip_cache
|
||||||
if skip_chance and random.random() < skip_chance:
|
if skip_chance and random.random() < skip_chance:
|
||||||
self.logger.increment('shard_listing.cache.skip')
|
cache_state = 'skip'
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
cached_range_dicts = memcache.get(
|
cached_range_dicts = memcache.get(
|
||||||
@ -160,8 +157,6 @@ class ContainerController(Controller):
|
|||||||
cache_state = 'miss'
|
cache_state = 'miss'
|
||||||
except MemcacheConnectionError:
|
except MemcacheConnectionError:
|
||||||
cache_state = 'error'
|
cache_state = 'error'
|
||||||
self.logger.increment(
|
|
||||||
'shard_listing.cache.%s' % cache_state)
|
|
||||||
|
|
||||||
if resp_body is None:
|
if resp_body is None:
|
||||||
resp = None
|
resp = None
|
||||||
@ -182,7 +177,7 @@ class ContainerController(Controller):
|
|||||||
resp.accept_ranges = 'bytes'
|
resp.accept_ranges = 'bytes'
|
||||||
resp.content_type = 'application/json'
|
resp.content_type = 'application/json'
|
||||||
|
|
||||||
return resp
|
return resp, cache_state
|
||||||
|
|
||||||
def _store_shard_ranges_in_cache(self, req, resp):
|
def _store_shard_ranges_in_cache(self, req, resp):
|
||||||
# parse shard ranges returned from backend, store them in infocache and
|
# parse shard ranges returned from backend, store them in infocache and
|
||||||
@ -243,23 +238,71 @@ class ContainerController(Controller):
|
|||||||
req, cached_range_dicts)
|
req, cached_range_dicts)
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
def _record_shard_listing_cache_metrics(
|
||||||
|
self, cache_state, resp, resp_record_type, info):
|
||||||
|
"""
|
||||||
|
Record a single cache operation by shard listing into its
|
||||||
|
corresponding metrics.
|
||||||
|
|
||||||
|
:param cache_state: the state of this cache operation, includes
|
||||||
|
infocache_hit, memcache hit, miss, error, skip, force_skip
|
||||||
|
and disabled.
|
||||||
|
:param resp: the response from either backend or cache hit.
|
||||||
|
:param resp_record_type: indicates the type of response record, e.g.
|
||||||
|
'shard' for shard range listing, 'object' for object listing.
|
||||||
|
:param info: the cached container info.
|
||||||
|
"""
|
||||||
|
should_record = False
|
||||||
|
if is_success(resp.status_int):
|
||||||
|
if resp_record_type == 'shard':
|
||||||
|
# Here we either got shard ranges by hitting the cache, or we
|
||||||
|
# got shard ranges from backend successfully for cache_state
|
||||||
|
# other than cache hit. Note: it's possible that later we find
|
||||||
|
# that shard ranges can't be parsed.
|
||||||
|
should_record = True
|
||||||
|
elif (info and is_success(info['status'])
|
||||||
|
and info.get('sharding_state') == 'sharded'):
|
||||||
|
# The shard listing request failed when getting shard ranges from
|
||||||
|
# backend.
|
||||||
|
# Note: In the absence of 'info' we cannot assume the container is
|
||||||
|
# sharded, so we don't increment the metric if 'info' is None. Even
|
||||||
|
# when we have valid info, we can't be sure that the container is
|
||||||
|
# sharded, but we assume info was correct and increment the failure
|
||||||
|
# metrics.
|
||||||
|
should_record = True
|
||||||
|
# else:
|
||||||
|
# The request failed, but in the absence of info we cannot assume
|
||||||
|
# the container is sharded, so we don't increment the metric.
|
||||||
|
|
||||||
|
if should_record:
|
||||||
|
record_cache_op_metrics(
|
||||||
|
self.logger, 'shard_listing', cache_state, resp)
|
||||||
|
|
||||||
def _GET_using_cache(self, req, info):
|
def _GET_using_cache(self, req, info):
|
||||||
# It may be possible to fulfil the request from cache: we only reach
|
# It may be possible to fulfil the request from cache: we only reach
|
||||||
# here if request record_type is 'shard' or 'auto', so if the container
|
# here if request record_type is 'shard' or 'auto', so if the container
|
||||||
# state is 'sharded' then look for cached shard ranges. However, if
|
# state is 'sharded' then look for cached shard ranges. However, if
|
||||||
# X-Newest is true then we always fetch from the backend servers.
|
# X-Newest is true then we always fetch from the backend servers.
|
||||||
get_newest = config_true_value(req.headers.get('x-newest', False))
|
headers = headers_from_container_info(info)
|
||||||
if get_newest:
|
if config_true_value(req.headers.get('x-newest', False)):
|
||||||
|
cache_state = 'force_skip'
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'Skipping shard cache lookup (x-newest) for %s', req.path_qs)
|
'Skipping shard cache lookup (x-newest) for %s', req.path_qs)
|
||||||
elif (info and is_success(info['status']) and
|
elif (headers and info and is_success(info['status']) and
|
||||||
info.get('sharding_state') == 'sharded'):
|
info.get('sharding_state') == 'sharded'):
|
||||||
# container is sharded so we may have the shard ranges cached
|
# container is sharded so we may have the shard ranges cached; only
|
||||||
resp = self._get_shard_ranges_from_cache(req, info)
|
# use cached values if all required backend headers available.
|
||||||
|
resp, cache_state = self._get_shard_ranges_from_cache(req, headers)
|
||||||
if resp:
|
if resp:
|
||||||
return resp
|
return resp, cache_state
|
||||||
# The request was not fulfilled from cache so send to backend server
|
else:
|
||||||
return self._get_shard_ranges_from_backend(req)
|
# container metadata didn't support a cache lookup, this could be
|
||||||
|
# the case that container metadata was not in cache and we don't
|
||||||
|
# know if the container was sharded, or the case that the sharding
|
||||||
|
# state in metadata indicates the container was unsharded.
|
||||||
|
cache_state = 'bypass'
|
||||||
|
# The request was not fulfilled from cache so send to backend server.
|
||||||
|
return self._get_shard_ranges_from_backend(req), cache_state
|
||||||
|
|
||||||
def GETorHEAD(self, req):
|
def GETorHEAD(self, req):
|
||||||
"""Handler for HTTP GET/HEAD requests."""
|
"""Handler for HTTP GET/HEAD requests."""
|
||||||
@ -303,6 +346,7 @@ class ContainerController(Controller):
|
|||||||
info = None
|
info = None
|
||||||
may_get_listing_shards = False
|
may_get_listing_shards = False
|
||||||
|
|
||||||
|
sr_cache_state = None
|
||||||
if (may_get_listing_shards and
|
if (may_get_listing_shards and
|
||||||
self.app.recheck_listing_shard_ranges > 0
|
self.app.recheck_listing_shard_ranges > 0
|
||||||
and memcache
|
and memcache
|
||||||
@ -313,34 +357,17 @@ class ContainerController(Controller):
|
|||||||
# to the proxy (it is used from sharder to container servers) but
|
# to the proxy (it is used from sharder to container servers) but
|
||||||
# it is included in the conditions just in case because we don't
|
# it is included in the conditions just in case because we don't
|
||||||
# cache deleted shard ranges.
|
# cache deleted shard ranges.
|
||||||
resp = self._GET_using_cache(req, info)
|
resp, sr_cache_state = self._GET_using_cache(req, info)
|
||||||
else:
|
else:
|
||||||
resp = self._GETorHEAD_from_backend(req)
|
resp = self._GETorHEAD_from_backend(req)
|
||||||
|
if may_get_listing_shards and (
|
||||||
|
not self.app.recheck_listing_shard_ranges or not memcache):
|
||||||
|
sr_cache_state = 'disabled'
|
||||||
|
|
||||||
resp_record_type = resp.headers.get('X-Backend-Record-Type', '')
|
resp_record_type = resp.headers.get('X-Backend-Record-Type', '')
|
||||||
cached_results = config_true_value(
|
if sr_cache_state:
|
||||||
resp.headers.get('x-backend-cached-results'))
|
self._record_shard_listing_cache_metrics(
|
||||||
|
sr_cache_state, resp, resp_record_type, info)
|
||||||
if may_get_listing_shards and not cached_results:
|
|
||||||
if is_success(resp.status_int):
|
|
||||||
if resp_record_type == 'shard':
|
|
||||||
# We got shard ranges from backend so increment the success
|
|
||||||
# metric. Note: it's possible that later we find that shard
|
|
||||||
# ranges can't be parsed
|
|
||||||
self.logger.increment(
|
|
||||||
'shard_listing.backend.%s' % resp.status_int)
|
|
||||||
elif info:
|
|
||||||
if (is_success(info['status'])
|
|
||||||
and info.get('sharding_state') == 'sharded'):
|
|
||||||
# We expected to get shard ranges from backend, but the
|
|
||||||
# request failed. We can't be sure that the container is
|
|
||||||
# sharded but we assume info was correct and increment the
|
|
||||||
# failure metric
|
|
||||||
self.logger.increment(
|
|
||||||
'shard_listing.backend.%s' % resp.status_int)
|
|
||||||
# else:
|
|
||||||
# The request failed, but in the absence of info we cannot assume
|
|
||||||
# the container is sharded, so we don't increment the metric
|
|
||||||
|
|
||||||
if all((req.method == "GET", record_type == 'auto',
|
if all((req.method == "GET", record_type == 'auto',
|
||||||
resp_record_type.lower() == 'shard')):
|
resp_record_type.lower() == 'shard')):
|
||||||
|
@ -27,7 +27,8 @@ from swift.proxy.controllers.base import headers_to_container_info, \
|
|||||||
headers_to_account_info, headers_to_object_info, get_container_info, \
|
headers_to_account_info, headers_to_object_info, get_container_info, \
|
||||||
get_cache_key, get_account_info, get_info, get_object_info, \
|
get_cache_key, get_account_info, get_info, get_object_info, \
|
||||||
Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \
|
Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \
|
||||||
set_info_cache, NodeIter, headers_from_container_info
|
set_info_cache, NodeIter, headers_from_container_info, \
|
||||||
|
record_cache_op_metrics
|
||||||
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
|
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
|
||||||
bytes_to_wsgi
|
bytes_to_wsgi
|
||||||
from swift.common import exceptions
|
from swift.common import exceptions
|
||||||
@ -530,6 +531,40 @@ class TestFuncs(BaseTest):
|
|||||||
shard='listing')
|
shard='listing')
|
||||||
check_not_in_cache(req, shard_cache_key)
|
check_not_in_cache(req, shard_cache_key)
|
||||||
|
|
||||||
|
def test_record_cache_op_metrics(self):
|
||||||
|
record_cache_op_metrics(
|
||||||
|
self.logger, 'shard_listing', 'infocache_hit')
|
||||||
|
self.assertEqual(
|
||||||
|
self.logger.get_increment_counts().get(
|
||||||
|
'shard_listing.infocache.hit'),
|
||||||
|
1)
|
||||||
|
record_cache_op_metrics(
|
||||||
|
self.logger, 'shard_listing', 'hit')
|
||||||
|
self.assertEqual(
|
||||||
|
self.logger.get_increment_counts().get(
|
||||||
|
'shard_listing.cache.hit'),
|
||||||
|
1)
|
||||||
|
resp = FakeResponse(status_int=200)
|
||||||
|
record_cache_op_metrics(
|
||||||
|
self.logger, 'shard_updating', 'skip', resp)
|
||||||
|
self.assertEqual(
|
||||||
|
self.logger.get_increment_counts().get(
|
||||||
|
'shard_updating.cache.skip.200'),
|
||||||
|
1)
|
||||||
|
resp = FakeResponse(status_int=503)
|
||||||
|
record_cache_op_metrics(
|
||||||
|
self.logger, 'shard_updating', 'disabled', resp)
|
||||||
|
self.assertEqual(
|
||||||
|
self.logger.get_increment_counts().get(
|
||||||
|
'shard_updating.cache.disabled.503'),
|
||||||
|
1)
|
||||||
|
|
||||||
|
# test a cache miss call without response, expect no metric recorded.
|
||||||
|
self.app.logger = mock.Mock()
|
||||||
|
record_cache_op_metrics(
|
||||||
|
self.logger, 'shard_updating', 'miss')
|
||||||
|
self.app.logger.increment.assert_not_called()
|
||||||
|
|
||||||
def test_get_account_info_swift_source(self):
|
def test_get_account_info_swift_source(self):
|
||||||
app = FakeApp()
|
app = FakeApp()
|
||||||
req = Request.blank("/v1/a", environ={'swift.cache': FakeCache()})
|
req = Request.blank("/v1/a", environ={'swift.cache': FakeCache()})
|
||||||
|
@ -2585,7 +2585,6 @@ class TestContainerController(TestRingBase):
|
|||||||
# this test gets shard ranges into cache and then reads from cache
|
# this test gets shard ranges into cache and then reads from cache
|
||||||
sharding_state = 'sharded'
|
sharding_state = 'sharded'
|
||||||
self.memcache.delete_all()
|
self.memcache.delete_all()
|
||||||
|
|
||||||
# container is sharded but proxy does not have that state cached;
|
# container is sharded but proxy does not have that state cached;
|
||||||
# expect a backend request and expect shard ranges to be cached
|
# expect a backend request and expect shard ranges to be cached
|
||||||
self.memcache.clear_calls()
|
self.memcache.clear_calls()
|
||||||
@ -2620,7 +2619,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||||
['container.info.cache.miss',
|
['container.info.cache.miss',
|
||||||
'container.shard_listing.backend.200'])
|
'container.shard_listing.cache.bypass.200'])
|
||||||
|
|
||||||
# container is sharded and proxy has that state cached, but
|
# container is sharded and proxy has that state cached, but
|
||||||
# no shard ranges cached; expect a cache miss and write-back
|
# no shard ranges cached; expect a cache miss and write-back
|
||||||
@ -2658,8 +2657,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||||
['container.info.cache.hit',
|
['container.info.cache.hit',
|
||||||
'container.shard_listing.cache.miss',
|
'container.shard_listing.cache.miss.200'])
|
||||||
'container.shard_listing.backend.200'])
|
|
||||||
|
|
||||||
# container is sharded and proxy does have that state cached and
|
# container is sharded and proxy does have that state cached and
|
||||||
# also has shard ranges cached; expect a read from cache
|
# also has shard ranges cached; expect a read from cache
|
||||||
@ -2720,8 +2718,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||||
['container.info.cache.hit',
|
['container.info.cache.hit',
|
||||||
'container.shard_listing.cache.skip',
|
'container.shard_listing.cache.skip.200'])
|
||||||
'container.shard_listing.backend.200'])
|
|
||||||
|
|
||||||
# ... or maybe we serve from cache
|
# ... or maybe we serve from cache
|
||||||
self.memcache.clear_calls()
|
self.memcache.clear_calls()
|
||||||
@ -2746,6 +2743,29 @@ class TestContainerController(TestRingBase):
|
|||||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||||
['container.info.cache.hit',
|
['container.info.cache.hit',
|
||||||
'container.shard_listing.cache.hit'])
|
'container.shard_listing.cache.hit'])
|
||||||
|
|
||||||
|
# test request to hit infocache.
|
||||||
|
self.memcache.clear_calls()
|
||||||
|
self.logger.clear()
|
||||||
|
req = self._build_request(
|
||||||
|
{'X-Backend-Record-Type': record_type},
|
||||||
|
{'states': 'listing'},
|
||||||
|
infocache=req.environ['swift.infocache'])
|
||||||
|
with mock.patch('random.random', return_value=0.11):
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self._check_response(resp, self.sr_dicts, {
|
||||||
|
'X-Backend-Cached-Results': 'true',
|
||||||
|
'X-Backend-Record-Type': 'shard',
|
||||||
|
'X-Backend-Sharding-State': sharding_state})
|
||||||
|
self.assertEqual([], self.memcache.calls)
|
||||||
|
self.assertIn('swift.infocache', req.environ)
|
||||||
|
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||||
|
self.assertEqual(tuple(self.sr_dicts),
|
||||||
|
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||||
|
self.assertEqual(
|
||||||
|
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||||
|
['container.shard_listing.infocache.hit'])
|
||||||
|
|
||||||
# put this back the way we found it for later subtests
|
# put this back the way we found it for later subtests
|
||||||
self.app.container_listing_shard_ranges_skip_cache = 0.0
|
self.app.container_listing_shard_ranges_skip_cache = 0.0
|
||||||
|
|
||||||
@ -2896,8 +2916,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual(b'', resp.body)
|
self.assertEqual(b'', resp.body)
|
||||||
self.assertEqual(404, resp.status_int)
|
self.assertEqual(404, resp.status_int)
|
||||||
self.assertEqual({'container.info.cache.hit': 1,
|
self.assertEqual({'container.info.cache.hit': 1,
|
||||||
'container.shard_listing.cache.miss': 1,
|
'container.shard_listing.cache.miss.404': 1},
|
||||||
'container.shard_listing.backend.404': 1},
|
|
||||||
self.logger.get_increment_counts())
|
self.logger.get_increment_counts())
|
||||||
|
|
||||||
def test_GET_shard_ranges_read_from_cache_error(self):
|
def test_GET_shard_ranges_read_from_cache_error(self):
|
||||||
@ -2930,8 +2949,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual(b'', resp.body)
|
self.assertEqual(b'', resp.body)
|
||||||
self.assertEqual(404, resp.status_int)
|
self.assertEqual(404, resp.status_int)
|
||||||
self.assertEqual({'container.info.cache.hit': 1,
|
self.assertEqual({'container.info.cache.hit': 1,
|
||||||
'container.shard_listing.cache.error': 1,
|
'container.shard_listing.cache.error.404': 1},
|
||||||
'container.shard_listing.backend.404': 1},
|
|
||||||
self.logger.get_increment_counts())
|
self.logger.get_increment_counts())
|
||||||
|
|
||||||
def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type):
|
def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type):
|
||||||
@ -3045,7 +3063,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual('sharded',
|
self.assertEqual('sharded',
|
||||||
self.memcache.calls[2][1][1]['sharding_state'])
|
self.memcache.calls[2][1][1]['sharding_state'])
|
||||||
self.assertEqual({'container.info.cache.miss': 1,
|
self.assertEqual({'container.info.cache.miss': 1,
|
||||||
'container.shard_listing.backend.200': 1},
|
'container.shard_listing.cache.bypass.200': 1},
|
||||||
self.logger.get_increment_counts())
|
self.logger.get_increment_counts())
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
@ -3139,7 +3157,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual('sharded',
|
self.assertEqual('sharded',
|
||||||
self.memcache.calls[2][1][1]['sharding_state'])
|
self.memcache.calls[2][1][1]['sharding_state'])
|
||||||
self.assertEqual({'container.info.cache.miss': 1,
|
self.assertEqual({'container.info.cache.miss': 1,
|
||||||
'container.shard_listing.backend.200': 1},
|
'container.shard_listing.cache.force_skip.200': 1},
|
||||||
self.logger.get_increment_counts())
|
self.logger.get_increment_counts())
|
||||||
|
|
||||||
def _do_test_GET_shard_ranges_no_cache_write(self, resp_hdrs):
|
def _do_test_GET_shard_ranges_no_cache_write(self, resp_hdrs):
|
||||||
@ -3312,7 +3330,7 @@ class TestContainerController(TestRingBase):
|
|||||||
self.assertEqual(resp.headers.get('X-Backend-Sharding-State'),
|
self.assertEqual(resp.headers.get('X-Backend-Sharding-State'),
|
||||||
self.memcache.calls[1][1][1]['sharding_state'])
|
self.memcache.calls[1][1][1]['sharding_state'])
|
||||||
self.assertEqual({'container.info.cache.miss': 1,
|
self.assertEqual({'container.info.cache.miss': 1,
|
||||||
'container.shard_listing.backend.200': 1},
|
'container.shard_listing.cache.bypass.200': 1},
|
||||||
self.logger.get_increment_counts())
|
self.logger.get_increment_counts())
|
||||||
self.memcache.delete_all()
|
self.memcache.delete_all()
|
||||||
|
|
||||||
|
@ -4206,7 +4206,9 @@ class TestReplicatedObjectController(
|
|||||||
|
|
||||||
self.assertEqual(resp.status_int, 202)
|
self.assertEqual(resp.status_int, 202)
|
||||||
stats = self.app.logger.get_increment_counts()
|
stats = self.app.logger.get_increment_counts()
|
||||||
self.assertEqual({'object.shard_updating.backend.200': 1}, stats)
|
self.assertEqual(
|
||||||
|
{'object.shard_updating.cache.disabled.200': 1},
|
||||||
|
stats)
|
||||||
backend_requests = fake_conn.requests
|
backend_requests = fake_conn.requests
|
||||||
# verify statsd prefix is not mutated
|
# verify statsd prefix is not mutated
|
||||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||||
@ -4299,8 +4301,8 @@ class TestReplicatedObjectController(
|
|||||||
stats = self.app.logger.get_increment_counts()
|
stats = self.app.logger.get_increment_counts()
|
||||||
self.assertEqual({'account.info.cache.miss': 1,
|
self.assertEqual({'account.info.cache.miss': 1,
|
||||||
'container.info.cache.miss': 1,
|
'container.info.cache.miss': 1,
|
||||||
'object.shard_updating.cache.miss': 1,
|
'object.shard_updating.cache.miss.200': 1},
|
||||||
'object.shard_updating.backend.200': 1}, stats)
|
stats)
|
||||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||||
|
|
||||||
backend_requests = fake_conn.requests
|
backend_requests = fake_conn.requests
|
||||||
@ -4450,6 +4452,99 @@ class TestReplicatedObjectController(
|
|||||||
do_test('PUT', 'sharding')
|
do_test('PUT', 'sharding')
|
||||||
do_test('PUT', 'sharded')
|
do_test('PUT', 'sharded')
|
||||||
|
|
||||||
|
@patch_policies([
|
||||||
|
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
||||||
|
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||||
|
])
|
||||||
|
def test_backend_headers_update_shard_container_with_live_infocache(self):
|
||||||
|
# verify that when container is sharded the backend container update is
|
||||||
|
# directed to the shard container
|
||||||
|
# reset the router post patch_policies
|
||||||
|
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
|
||||||
|
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
|
||||||
|
self.app.recheck_updating_shard_ranges = 3600
|
||||||
|
|
||||||
|
def do_test(method, sharding_state):
|
||||||
|
self.app.logger.clear() # clean capture state
|
||||||
|
shard_ranges = [
|
||||||
|
utils.ShardRange(
|
||||||
|
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
|
||||||
|
utils.ShardRange(
|
||||||
|
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
|
||||||
|
utils.ShardRange(
|
||||||
|
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
|
||||||
|
]
|
||||||
|
infocache = {
|
||||||
|
'shard-updating/a/c':
|
||||||
|
tuple(dict(shard_range) for shard_range in shard_ranges)}
|
||||||
|
req = Request.blank('/v1/a/c/o', {'swift.infocache': infocache},
|
||||||
|
method=method, body='',
|
||||||
|
headers={'Content-Type': 'text/plain'})
|
||||||
|
|
||||||
|
# we want the container_info response to say policy index of 1 and
|
||||||
|
# sharding state
|
||||||
|
# acc HEAD, cont HEAD, obj POSTs
|
||||||
|
status_codes = (200, 200, 202, 202, 202)
|
||||||
|
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
|
||||||
|
'x-backend-sharding-state': sharding_state,
|
||||||
|
'X-Backend-Record-Type': 'shard'}
|
||||||
|
with mocked_http_conn(*status_codes,
|
||||||
|
headers=resp_headers) as fake_conn:
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
|
||||||
|
# verify request hitted infocache.
|
||||||
|
self.assertEqual(resp.status_int, 202)
|
||||||
|
stats = self.app.logger.get_increment_counts()
|
||||||
|
self.assertEqual({'object.shard_updating.infocache.hit': 1}, stats)
|
||||||
|
# verify statsd prefix is not mutated
|
||||||
|
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||||
|
|
||||||
|
backend_requests = fake_conn.requests
|
||||||
|
account_request = backend_requests[0]
|
||||||
|
self._check_request(
|
||||||
|
account_request, method='HEAD', path='/sda/0/a')
|
||||||
|
container_request = backend_requests[1]
|
||||||
|
self._check_request(
|
||||||
|
container_request, method='HEAD', path='/sda/0/a/c')
|
||||||
|
|
||||||
|
# verify content in infocache.
|
||||||
|
cache_key = 'shard-updating/a/c'
|
||||||
|
self.assertIn(cache_key, req.environ.get('swift.infocache'))
|
||||||
|
self.assertEqual(req.environ['swift.infocache'][cache_key],
|
||||||
|
tuple(dict(sr) for sr in shard_ranges))
|
||||||
|
|
||||||
|
# make sure backend requests included expected container headers
|
||||||
|
container_headers = {}
|
||||||
|
|
||||||
|
for request in backend_requests[2:]:
|
||||||
|
req_headers = request['headers']
|
||||||
|
device = req_headers['x-container-device']
|
||||||
|
container_headers[device] = req_headers['x-container-host']
|
||||||
|
expectations = {
|
||||||
|
'method': method,
|
||||||
|
'path': '/0/a/c/o',
|
||||||
|
'headers': {
|
||||||
|
'X-Container-Partition': '0',
|
||||||
|
'Host': 'localhost:80',
|
||||||
|
'Referer': '%s http://localhost/v1/a/c/o' % method,
|
||||||
|
'X-Backend-Storage-Policy-Index': '1',
|
||||||
|
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
|
||||||
|
},
|
||||||
|
}
|
||||||
|
self._check_request(request, **expectations)
|
||||||
|
|
||||||
|
expected = {}
|
||||||
|
for i, device in enumerate(['sda', 'sdb', 'sdc']):
|
||||||
|
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
||||||
|
self.assertEqual(container_headers, expected)
|
||||||
|
|
||||||
|
do_test('POST', 'sharding')
|
||||||
|
do_test('POST', 'sharded')
|
||||||
|
do_test('DELETE', 'sharding')
|
||||||
|
do_test('DELETE', 'sharded')
|
||||||
|
do_test('PUT', 'sharding')
|
||||||
|
do_test('PUT', 'sharded')
|
||||||
|
|
||||||
@patch_policies([
|
@patch_policies([
|
||||||
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
||||||
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||||
@ -4538,9 +4633,8 @@ class TestReplicatedObjectController(
|
|||||||
'account.info.cache.hit': 1,
|
'account.info.cache.hit': 1,
|
||||||
'container.info.cache.miss': 1,
|
'container.info.cache.miss': 1,
|
||||||
'container.info.cache.hit': 1,
|
'container.info.cache.hit': 1,
|
||||||
'object.shard_updating.cache.skip': 1,
|
'object.shard_updating.cache.skip.200': 1,
|
||||||
'object.shard_updating.cache.hit': 1,
|
'object.shard_updating.cache.hit': 1}, stats)
|
||||||
'object.shard_updating.backend.200': 1}, stats)
|
|
||||||
# verify statsd prefix is not mutated
|
# verify statsd prefix is not mutated
|
||||||
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix'])
|
||||||
|
|
||||||
@ -4602,10 +4696,9 @@ class TestReplicatedObjectController(
|
|||||||
'account.info.cache.miss': 1,
|
'account.info.cache.miss': 1,
|
||||||
'container.info.cache.hit': 2,
|
'container.info.cache.hit': 2,
|
||||||
'container.info.cache.miss': 1,
|
'container.info.cache.miss': 1,
|
||||||
'object.shard_updating.cache.skip': 1,
|
'object.shard_updating.cache.skip.200': 1,
|
||||||
'object.shard_updating.cache.hit': 1,
|
'object.shard_updating.cache.hit': 1,
|
||||||
'object.shard_updating.cache.error': 1,
|
'object.shard_updating.cache.error.200': 1})
|
||||||
'object.shard_updating.backend.200': 2})
|
|
||||||
|
|
||||||
do_test('POST', 'sharding')
|
do_test('POST', 'sharding')
|
||||||
do_test('POST', 'sharded')
|
do_test('POST', 'sharded')
|
||||||
@ -4642,7 +4735,9 @@ class TestReplicatedObjectController(
|
|||||||
|
|
||||||
self.assertEqual(resp.status_int, 202)
|
self.assertEqual(resp.status_int, 202)
|
||||||
stats = self.app.logger.get_increment_counts()
|
stats = self.app.logger.get_increment_counts()
|
||||||
self.assertEqual({'object.shard_updating.backend.404': 1}, stats)
|
self.assertEqual(
|
||||||
|
{'object.shard_updating.cache.disabled.404': 1},
|
||||||
|
stats)
|
||||||
|
|
||||||
backend_requests = fake_conn.requests
|
backend_requests = fake_conn.requests
|
||||||
account_request = backend_requests[0]
|
account_request = backend_requests[0]
|
||||||
|
Loading…
Reference in New Issue
Block a user