Merge "proxy: Add a chance to skip memcache when looking for shard ranges"
This commit is contained in:
commit
4d48004483
@ -144,6 +144,15 @@ use = egg:swift#proxy
|
||||
# so this value should be set less than recheck_updating_shard_ranges.
|
||||
# recheck_listing_shard_ranges = 600
|
||||
#
|
||||
# For particularly active containers, having information age out of cache can
|
||||
# be quite painful: suddenly thousands of requests per second all miss and
|
||||
# have to go to disk. By (rarely) going direct to disk regardless of whether
|
||||
# data is present in memcache, we can periodically refresh the data in memcache
|
||||
# without causing a thundering herd. Values around 0.0 - 0.1 (i.e., one in
|
||||
# every thousand requests skips cache, or fewer) are recommended.
|
||||
# container_updating_shard_ranges_skip_cache_pct = 0.0
|
||||
# container_listing_shard_ranges_skip_cache_pct = 0.0
|
||||
#
|
||||
# object_chunk_size = 65536
|
||||
# client_chunk_size = 65536
|
||||
#
|
||||
|
@ -33,6 +33,7 @@ import functools
|
||||
import inspect
|
||||
import itertools
|
||||
import operator
|
||||
import random
|
||||
from copy import deepcopy
|
||||
from sys import exc_info
|
||||
|
||||
@ -2384,9 +2385,14 @@ class Controller(object):
|
||||
|
||||
cached_ranges = infocache.get(cache_key)
|
||||
if cached_ranges is None and memcache:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_updating.cache.%s'
|
||||
% ('hit' if cached_ranges else 'miss'))
|
||||
skip_chance = \
|
||||
self.app.container_updating_shard_ranges_skip_cache
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.app.logger.increment('shard_updating.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_updating.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
|
||||
if cached_ranges:
|
||||
shard_ranges = [
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import json
|
||||
import math
|
||||
import random
|
||||
|
||||
import six
|
||||
from six.moves.urllib.parse import unquote
|
||||
@ -150,7 +151,15 @@ class ContainerController(Controller):
|
||||
shard='listing')
|
||||
cached_ranges = infocache.get(cache_key)
|
||||
if cached_ranges is None and memcache:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
skip_chance = \
|
||||
self.app.container_listing_shard_ranges_skip_cache
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.app.logger.increment('shard_listing.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.app.logger.increment('shard_listing.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
|
||||
if cached_ranges is not None:
|
||||
infocache[cache_key] = tuple(cached_ranges)
|
||||
# shard ranges can be returned from cache
|
||||
|
@ -35,7 +35,7 @@ from swift.common.utils import Watchdog, get_logger, \
|
||||
get_remote_client, split_path, config_true_value, generate_trans_id, \
|
||||
affinity_key_function, affinity_locality_predicate, list_from_csv, \
|
||||
register_swift_info, parse_prefixed_conf, config_auto_int_value, \
|
||||
config_request_node_count_value
|
||||
config_request_node_count_value, config_percent_value
|
||||
from swift.common.constraints import check_utf8, valid_api_version
|
||||
from swift.proxy.controllers import AccountController, ContainerController, \
|
||||
ObjectControllerRouter, InfoController
|
||||
@ -227,6 +227,12 @@ class Application(object):
|
||||
self.recheck_account_existence = \
|
||||
int(conf.get('recheck_account_existence',
|
||||
DEFAULT_RECHECK_ACCOUNT_EXISTENCE))
|
||||
self.container_updating_shard_ranges_skip_cache = \
|
||||
config_percent_value(conf.get(
|
||||
'container_updating_shard_ranges_skip_cache_pct', 0))
|
||||
self.container_listing_shard_ranges_skip_cache = \
|
||||
config_percent_value(conf.get(
|
||||
'container_listing_shard_ranges_skip_cache_pct', 0))
|
||||
self.allow_account_management = \
|
||||
config_true_value(conf.get('allow_account_management', 'no'))
|
||||
self.container_ring = container_ring or Ring(swift_dir,
|
||||
|
@ -2090,10 +2090,11 @@ class TestContainerController(TestRingBase):
|
||||
# this test gets shard ranges into cache and then reads from cache
|
||||
sharding_state = 'sharded'
|
||||
self.memcache.delete_all()
|
||||
self.memcache.clear_calls()
|
||||
|
||||
# container is sharded but proxy does not have that state cached;
|
||||
# expect a backend request and expect shard ranges to be cached
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
@ -2121,10 +2122,51 @@ class TestContainerController(TestRingBase):
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
[])
|
||||
|
||||
# container is sharded and proxy has that state cached, but
|
||||
# no shard ranges cached; expect a cache miss and write-back
|
||||
self.memcache.delete('shard-listing/a/c')
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump,
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true'})
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': record_type,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c'),
|
||||
mock.call.set('shard-listing/a/c', self.sr_dicts,
|
||||
time=exp_recheck_listing),
|
||||
# Since there was a backend request, we go ahead and cache
|
||||
# container info, too
|
||||
mock.call.set('container/a/c', mock.ANY, time=60)],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.miss'])
|
||||
|
||||
# container is sharded and proxy does have that state cached and
|
||||
# also has shard ranges cached; expect a read from cache
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
resp = req.get_response(self.app)
|
||||
@ -2140,6 +2182,70 @@ class TestContainerController(TestRingBase):
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.hit'])
|
||||
|
||||
# if there's a chance to skip cache, maybe we go to disk again...
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
self.app.container_listing_shard_ranges_skip_cache = 0.10
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
with mock.patch('random.random', return_value=0.05):
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 200, self._stub_shards_dump,
|
||||
{'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'true'})
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': record_type,
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Recheck-Container-Existence': '60',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.set('shard-listing/a/c', self.sr_dicts,
|
||||
time=exp_recheck_listing),
|
||||
# Since there was a backend request, we go ahead and cache
|
||||
# container info, too
|
||||
mock.call.set('container/a/c', mock.ANY, time=60)],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.skip'])
|
||||
|
||||
# ... or maybe we serve from cache
|
||||
self.memcache.clear_calls()
|
||||
self.logger.logger.log_dict['increment'][:] = []
|
||||
req = self._build_request({'X-Backend-Record-Type': record_type},
|
||||
{'states': 'listing'}, {})
|
||||
with mock.patch('random.random', return_value=0.11):
|
||||
resp = req.get_response(self.app)
|
||||
self._check_response(resp, self.sr_dicts, {
|
||||
'X-Backend-Cached-Results': 'true',
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c')],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
self.assertEqual(tuple(self.sr_dicts),
|
||||
req.environ['swift.infocache']['shard-listing/a/c'])
|
||||
self.assertEqual(
|
||||
[x[0][0] for x in self.logger.logger.log_dict['increment']],
|
||||
['shard_listing.cache.hit'])
|
||||
# put this back the way we found it for later subtests
|
||||
self.app.container_listing_shard_ranges_skip_cache = 0.0
|
||||
|
||||
# delete the container; check that shard ranges are evicted from cache
|
||||
self.memcache.clear_calls()
|
||||
|
@ -639,6 +639,18 @@ class TestProxyServerConfiguration(unittest.TestCase):
|
||||
self.assertEqual(app.recheck_updating_shard_ranges, 1800)
|
||||
self.assertEqual(app.recheck_listing_shard_ranges, 900)
|
||||
|
||||
def test_memcache_skip_options(self):
|
||||
# check default options
|
||||
app = self._make_app({})
|
||||
self.assertEqual(app.container_listing_shard_ranges_skip_cache, 0)
|
||||
self.assertEqual(app.container_updating_shard_ranges_skip_cache, 0)
|
||||
# check custom options
|
||||
app = self._make_app({
|
||||
'container_listing_shard_ranges_skip_cache_pct': '0.01',
|
||||
'container_updating_shard_ranges_skip_cache_pct': '0.1'})
|
||||
self.assertEqual(app.container_listing_shard_ranges_skip_cache, 0.0001)
|
||||
self.assertEqual(app.container_updating_shard_ranges_skip_cache, 0.001)
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
||||
class TestProxyServer(unittest.TestCase):
|
||||
@ -4247,6 +4259,140 @@ class TestReplicatedObjectController(
|
||||
do_test('PUT', 'sharding')
|
||||
do_test('PUT', 'sharded')
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||
])
|
||||
def test_backend_headers_update_shard_container_can_skip_cache(self):
|
||||
# verify that when container is sharded the backend container update is
|
||||
# directed to the shard container
|
||||
# reset the router post patch_policies
|
||||
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
|
||||
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
|
||||
self.app.recheck_updating_shard_ranges = 3600
|
||||
self.app.container_updating_shard_ranges_skip_cache = 0.001
|
||||
|
||||
def do_test(method, sharding_state):
|
||||
self.app.logger = debug_logger('proxy-ut') # clean capture state
|
||||
cached_shard_ranges = [
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_nope', utils.Timestamp.now(), '', 'l'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_uhn_uh', utils.Timestamp.now(), 'l', 'u'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_no_way', utils.Timestamp.now(), 'u', ''),
|
||||
]
|
||||
cache = FakeMemcache()
|
||||
cache.set('shard-updating/a/c', tuple(
|
||||
dict(shard_range) for shard_range in cached_shard_ranges))
|
||||
|
||||
# sanity check: we can get the old shard from cache
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o', {'swift.cache': cache},
|
||||
method=method, body='', headers={'Content-Type': 'text/plain'})
|
||||
# acc HEAD, cont HEAD, obj POSTs
|
||||
# we want the container_info response to say policy index of 1 and
|
||||
# sharding state
|
||||
status_codes = (200, 200, 202, 202, 202)
|
||||
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
|
||||
'x-backend-sharding-state': sharding_state,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
with mock.patch('random.random', return_value=1), \
|
||||
mocked_http_conn(*status_codes, headers=resp_headers):
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.hit': 1}, stats)
|
||||
|
||||
# cached shard ranges are still there
|
||||
cache_key = 'shard-updating/a/c'
|
||||
self.assertIn(cache_key, req.environ['swift.cache'].store)
|
||||
self.assertEqual(req.environ['swift.cache'].store[cache_key],
|
||||
[dict(sr) for sr in cached_shard_ranges])
|
||||
self.assertIn(cache_key, req.environ.get('swift.infocache'))
|
||||
self.assertEqual(req.environ['swift.infocache'][cache_key],
|
||||
tuple(dict(sr) for sr in cached_shard_ranges))
|
||||
|
||||
# ...but we have some chance to skip cache
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o', {'swift.cache': cache},
|
||||
method=method, body='', headers={'Content-Type': 'text/plain'})
|
||||
# cont shard GET, obj POSTs
|
||||
status_codes = (200, 202, 202, 202)
|
||||
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
|
||||
'x-backend-sharding-state': sharding_state,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
shard_ranges = [
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
|
||||
utils.ShardRange(
|
||||
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
|
||||
]
|
||||
body = json.dumps([
|
||||
dict(shard_range)
|
||||
for shard_range in shard_ranges]).encode('ascii')
|
||||
with mock.patch('random.random', return_value=0), \
|
||||
mocked_http_conn(*status_codes, headers=resp_headers,
|
||||
body=body) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'shard_updating.cache.skip': 1,
|
||||
'shard_updating.cache.hit': 1,
|
||||
'shard_updating.backend.200': 1}, stats)
|
||||
|
||||
backend_requests = fake_conn.requests
|
||||
container_request_shard = backend_requests[0]
|
||||
self._check_request(
|
||||
container_request_shard, method='GET', path='/sda/0/a/c',
|
||||
params={'states': 'updating'},
|
||||
headers={'X-Backend-Record-Type': 'shard'})
|
||||
|
||||
# and skipping cache will refresh it
|
||||
cache_key = 'shard-updating/a/c'
|
||||
self.assertIn(cache_key, req.environ['swift.cache'].store)
|
||||
self.assertEqual(req.environ['swift.cache'].store[cache_key],
|
||||
[dict(sr) for sr in shard_ranges])
|
||||
self.assertIn(cache_key, req.environ.get('swift.infocache'))
|
||||
self.assertEqual(req.environ['swift.infocache'][cache_key],
|
||||
tuple(dict(sr) for sr in shard_ranges))
|
||||
|
||||
# make sure backend requests included expected container headers
|
||||
container_headers = {}
|
||||
|
||||
for request in backend_requests[1:]:
|
||||
req_headers = request['headers']
|
||||
device = req_headers['x-container-device']
|
||||
container_headers[device] = req_headers['x-container-host']
|
||||
expectations = {
|
||||
'method': method,
|
||||
'path': '/0/a/c/o',
|
||||
'headers': {
|
||||
'X-Container-Partition': '0',
|
||||
'Host': 'localhost:80',
|
||||
'Referer': '%s http://localhost/v1/a/c/o' % method,
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
|
||||
},
|
||||
}
|
||||
self._check_request(request, **expectations)
|
||||
|
||||
expected = {}
|
||||
for i, device in enumerate(['sda', 'sdb', 'sdc']):
|
||||
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
||||
self.assertEqual(container_headers, expected)
|
||||
|
||||
do_test('POST', 'sharding')
|
||||
do_test('POST', 'sharded')
|
||||
do_test('DELETE', 'sharding')
|
||||
do_test('DELETE', 'sharded')
|
||||
do_test('PUT', 'sharding')
|
||||
do_test('PUT', 'sharded')
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||
|
Loading…
x
Reference in New Issue
Block a user