proxy: Add a chance to skip memcache when looking for shard ranges
By having some small portion of calls skip cache and go straight to disk, we can ensure the cache is always kept fresh and never expires (at least, for active containers). Previously, when shard ranges fell out of cache there would frequently be a thundering herd that could overwhelm the container server, leading to 503s served to clients or an increase in async pendings. Include metrics for hit/miss/skip rates. Change-Id: I6d74719fb41665f787375a08184c1969c86ce2cf Related-Bug: #1883324
This commit is contained in:
parent
8ef530d795
commit
8c6ccb5fd4
@ -144,6 +144,15 @@ use = egg:swift#proxy
|
||||
# so this value should be set less than recheck_updating_shard_ranges.
|
||||
# recheck_listing_shard_ranges = 600
|
||||
#
|
||||
# For particularly active containers, having information age out of cache can
|
||||
# be quite painful: suddenly thousands of requests per second all miss and
|
||||
# have to go to disk. By (rarely) going direct to disk regardless of whether
|
||||
# data is present in memcache, we can periodically refresh the data in memcache
|
||||
# without causing a thundering herd. Values around 0.0 - 0.1 (i.e., one in
|
||||
# every thousand requests skips cache, or fewer) are recommended.
|
||||
# container_updating_shard_ranges_skip_cache_pct = 0.0
|
||||
# container_listing_shard_ranges_skip_cache_pct = 0.0
|
||||
#
|
||||
# object_chunk_size = 65536
|
||||
# client_chunk_size = 65536
|
||||
#
|
||||
|
@ -33,6 +33,7 @@ import functools
|
||||
import inspect
|
||||
import itertools
|
||||
import operator
|
||||
import random
|
||||
from copy import deepcopy
|
||||
from sys import exc_info
|
||||
|
||||
@ -2384,9 +2385,14 @@ class Controller(object):
|
||||
|
||||
cached_ranges = infocache.get(cache_key)
|
||||
if cached_ranges is None and memcache:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_updating.cache.%s'
|
||||
% ('hit' if cached_ranges else 'miss'))
|
||||
skip_chance = \
|
||||
self.app.container_updating_shard_ranges_skip_cache
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.app.logger.increment('shard_updating.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_updating.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
|
||||
if cached_ranges:
|
||||
shard_ranges = [
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import json
|
||||
import math
|
||||
import random
|
||||
|
||||
import six
|
||||
from six.moves.urllib.parse import unquote
|
||||
@ -150,7 +151,15 @@ class ContainerController(Controller):
|
||||
shard='listing')
|
||||
cached_ranges = infocache.get(cache_key)
|
||||
if cached_ranges is None and memcache:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
skip_chance = \
|
||||
self.app.container_listing_shard_ranges_skip_cache
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.app.logger.increment('shard_listing.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_listing.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
|
||||
if cached_ranges is not None:
|
||||
infocache[cache_key] = tuple(cached_ranges)
|
||||
# shard ranges can be returned from cache
|
||||
|
@ -35,7 +35,7 @@ from swift.common.utils import Watchdog, get_logger, \
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||
affinity_key_function, affinity_locality_predicate, list_from_csv, \
|
||||
register_swift_info, parse_prefixed_conf, config_auto_int_value, \
|
||||
config_request_node_count_value
|
||||
config_request_node_count_value, config_percent_value
|
||||
from swift.common.constraints import check_utf8, valid_api_version
|
||||
from swift.proxy.controllers import AccountController, ContainerController, \
|
||||
ObjectControllerRouter, InfoController
|
||||
@ -227,6 +227,12 @@ class Application(object):
|
||||
self.recheck_account_existence = \
|
||||
int(conf.get('recheck_account_existence',
|
||||
DEFAULT_RECHECK_ACCOUNT_EXISTENCE))
|
||||
self.container_updating_shard_ranges_skip_cache = \
|
||||
config_percent_value(conf.get(
|
||||
'container_updating_shard_ranges_skip_cache_pct', 0))
|
||||
self.container_listing_shard_ranges_skip_cache = \
|
||||
config_percent_value(conf.get(
|
||||
'container_listing_shard_ranges_skip_cache_pct', 0))
|
||||
self.allow_account_management = \
|
||||
config_true_value(conf.get('allow_account_management', 'no'))
|
||||
self.container_ring = container_ring or Ring(swift_dir,
|
||||
|
@ -2090,10 +2090,11 @@ class TestContainerController(TestRingBase):
|
||||
# this test gets shard ranges into cache and then reads from cache
|
||||
sharding_state = 'sharded'
|
||||
self.memcache.delete_all()
|
||||
self.memcache.clear_calls()
|
||||
|
||||
# container is sharded but proxy does not have that state cached;
|
||||
# expect a backend request and expect shard ranges to be cached
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
@ -2121,10 +2122,51 @@ class TestContainerController(TestRingBase):
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
[])
|
||||
|
||||
# container is sharded and proxy has that state cached, but
|
||||
# no shard ranges cached; expect a cache miss and write-back
|
||||
self.memcache.delete('shard-listing/a/c')
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump,
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true'})
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': record_type,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c'),
|
||||
mock.call.set('shard-listing/a/c', self.sr_dicts,
|
||||
time=exp_recheck_listing),
|
||||
# Since there was a backend request, we go ahead and cache
|
||||
# container info, too
|
||||
mock.call.set('container/a/c', mock.ANY, time=60)],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.miss'])
|
||||
|
||||
# container is sharded and proxy does have that state cached and
|
||||
# also has shard ranges cached; expect a read from cache
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
resp = req.get_response(self.app)
|
||||
@ -2140,6 +2182,70 @@ class TestContainerController(TestRingBase):
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.hit'])
|
||||
|
||||
# if there's a chance to skip cache, maybe we go to disk again...
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
self.app.container_listing_shard_ranges_skip_cache = 0.10
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
with mock.patch('random.random', return_value=0.05):
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump,
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true'})
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': record_type,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.set('shard-listing/a/c', self.sr_dicts,
|
||||
time=exp_recheck_listing),
|
||||
# Since there was a backend request, we go ahead and cache
|
||||
# container info, too
|
||||
mock.call.set('container/a/c', mock.ANY, time=60)],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.skip'])
|
||||
|
||||
# ... or maybe we serve from cache
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
with mock.patch('random.random', return_value=0.11):
|
||||
resp = req.get_response(self.app)
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Cached-Results': 'true',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c')],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.hit'])
|
||||
# put this back the way we found it for later subtests
|
||||
self.app.container_listing_shard_ranges_skip_cache = 0.0
|
||||
|
||||
# delete the container; check that shard ranges are evicted from cache
|
||||
self.memcache.clear_calls()
|
||||
|
@ -639,6 +639,18 @@ class TestProxyServerConfiguration(unittest.TestCase):
|
||||
self.assertEqual(app.recheck_updating_shard_ranges, 1800)
|
||||
self.assertEqual(app.recheck_listing_shard_ranges, 900)
|
||||
|
||||
def test_memcache_skip_options(self):
|
||||
# check default options
|
||||
app = self._make_app({})
|
||||
self.assertEqual(app.container_listing_shard_ranges_skip_cache, 0)
|
||||
self.assertEqual(app.container_updating_shard_ranges_skip_cache, 0)
|
||||
# check custom options
|
||||
app = self._make_app({
|
||||
'container_listing_shard_ranges_skip_cache_pct': '0.01',
|
||||
'container_updating_shard_ranges_skip_cache_pct': '0.1'})
|
||||
self.assertEqual(app.container_listing_shard_ranges_skip_cache, 0.0001)
|
||||
self.assertEqual(app.container_updating_shard_ranges_skip_cache, 0.001)
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
||||
class TestProxyServer(unittest.TestCase):
|
||||
@ -4198,6 +4210,140 @@ class TestReplicatedObjectController(
|
||||
do_test('PUT', 'sharding')
|
||||
do_test('PUT', 'sharded')
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||
])
|
||||
def test_backend_headers_update_shard_container_can_skip_cache(self):
|
||||
# verify that when container is sharded the backend container update is
|
||||
# directed to the shard container
|
||||
# reset the router post patch_policies
|
||||
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
|
||||
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
|
||||
self.app.recheck_updating_shard_ranges = 3600
|
||||
self.app.container_updating_shard_ranges_skip_cache = 0.001
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger = debug_logger('proxy-ut') # clean capture state
|
||||
cached_shard_ranges = [
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_nope', utils.Timestamp.now(), '', 'l'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_uhn_uh', utils.Timestamp.now(), 'l', 'u'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_no_way', utils.Timestamp.now(), 'u', ''),
|
||||
]
|
||||
cache = FakeMemcache()
|
||||
cache.set('shard-updating/a/c', tuple(
|
||||
dict(shard_range) for shard_range in cached_shard_ranges))
|
||||
|
||||
# sanity check: we can get the old shard from cache
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o', {'swift.cache': cache},
|
||||
method=method, body='', headers={'Content-Type': 'text/plain'})
|
||||
# acc HEAD, cont HEAD, obj POSTs
|
||||
# we want the container_info response to say policy index of 1 and
|
||||
# sharding state
|
||||
status_codes = (200, 200, 202, 202, 202)
|
||||
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
|
||||
'x-backend-sharding-state': sharding_state,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
with mock.patch('random.random', return_value=1), \
|
||||
mocked_http_conn(*status_codes, headers=resp_headers):
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.hit': 1}, stats)
|
||||
|
||||
# cached shard ranges are still there
|
||||
cache_key = 'shard-updating/a/c'
|
||||
self.assertIn(cache_key, req.environ['swift.cache'].store)
|
||||
self.assertEqual(req.environ['swift.cache'].store[cache_key],
|
||||
[dict(sr) for sr in cached_shard_ranges])
|
||||
self.assertIn(cache_key, req.environ.get('swift.infocache'))
|
||||
self.assertEqual(req.environ['swift.infocache'][cache_key],
|
||||
tuple(dict(sr) for sr in cached_shard_ranges))
|
||||
|
||||
# ...but we have some chance to skip cache
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o', {'swift.cache': cache},
|
||||
method=method, body='', headers={'Content-Type': 'text/plain'})
|
||||
# cont shard GET, obj POSTs
|
||||
status_codes = (200, 202, 202, 202)
|
||||
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
|
||||
'x-backend-sharding-state': sharding_state,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
shard_ranges = [
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
|
||||
]
|
||||
body = json.dumps([
|
||||
dict(shard_range)
|
||||
for shard_range in shard_ranges]).encode('ascii')
|
||||
with mock.patch('random.random', return_value=0), \
|
||||
mocked_http_conn(*status_codes, headers=resp_headers,
|
||||
body=body) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.skip': 1,
|
||||
'shard_updating.cache.hit': 1,
|
||||
'shard_updating.backend.200': 1}, stats)
|
||||
|
||||
backend_requests = fake_conn.requests
|
||||
container_request_shard = backend_requests[0]
|
||||
self._check_request(
|
||||
container_request_shard, method='GET', path='/sda/0/a/c',
|
||||
params={'states': 'updating'},
|
||||
headers={'X-Backend-Record-Type': 'shard'})
|
||||
|
||||
# and skipping cache will refresh it
|
||||
cache_key = 'shard-updating/a/c'
|
||||
self.assertIn(cache_key, req.environ['swift.cache'].store)
|
||||
self.assertEqual(req.environ['swift.cache'].store[cache_key],
|
||||
[dict(sr) for sr in shard_ranges])
|
||||
self.assertIn(cache_key, req.environ.get('swift.infocache'))
|
||||
self.assertEqual(req.environ['swift.infocache'][cache_key],
|
||||
tuple(dict(sr) for sr in shard_ranges))
|
||||
|
||||
# make sure backend requests included expected container headers
|
||||
container_headers = {}
|
||||
|
||||
for request in backend_requests[1:]:
|
||||
req_headers = request['headers']
|
||||
device = req_headers['x-container-device']
|
||||
container_headers[device] = req_headers['x-container-host']
|
||||
expectations = {
|
||||
'method': method,
|
||||
'path': '/0/a/c/o',
|
||||
'headers': {
|
||||
'X-Container-Partition': '0',
|
||||
'Host': 'localhost:80',
|
||||
'Referer': '%s http://localhost/v1/a/c/o' % method,
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
|
||||
},
|
||||
}
|
||||
self._check_request(request, **expectations)
|
||||
|
||||
expected = {}
|
||||
for i, device in enumerate(['sda', 'sdb', 'sdc']):
|
||||
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
||||
self.assertEqual(container_headers, expected)
|
||||
|
||||
do_test('POST', 'sharding')
|
||||
do_test('POST', 'sharded')
|
||||
do_test('DELETE', 'sharding')
|
||||
do_test('DELETE', 'sharded')
|
||||
do_test('PUT', 'sharding')
|
||||
do_test('PUT', 'sharded')
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||
|
Loading…
x
Reference in New Issue
Block a user