sharding: Cache shard ranges for object writes

Previously, we issued a GET to the root container for every object PUT,
POST, and DELETE. This puts load on the container server, potentially
leading to timeouts, error limiting, and erroneous 404s (!).

Now, cache the complete set of 'updating' shards, and find the shard for
this particular update in the proxy. Add a new config option,
recheck_updating_shard_ranges, to control the cache time; it defaults to
one hour. Set to 0 to fall back to previous behavior.

Note that we should be able to tolerate stale shard data just fine; we
already have to worry about async pendings that got written down with
one shard but may not get processed until that shard has itself sharded
or shrunk into another shard.

Also note that memcache has a default value limit of 1MiB, which may be
exceeded if a container has thousands of shards. In that case, set()
will act like a delete(), causing increased memcache churn but otherwise
preserving existing behavior. In the future, we may want to add support
for gzipping the cached shard ranges as they should compress well.

Change-Id: Ic7a732146ea19a47669114ad5dbee0bacbe66919
Closes-Bug: 1781291
This commit is contained in:
Tim Burke 2019-06-24 12:25:33 -07:00
parent e62f07d988
commit a1af3811a7
6 changed files with 319 additions and 15 deletions

View File

@ -129,6 +129,13 @@ use = egg:swift#proxy
# log_handoffs = true # log_handoffs = true
# recheck_account_existence = 60 # recheck_account_existence = 60
# recheck_container_existence = 60 # recheck_container_existence = 60
#
# How long the proxy should cache a set of shard ranges for a container.
# Note that stale shard range info should be fine; updates will still
# eventually make their way to the correct shard. As a result, you can
# usually set this much higher than the existence checks above.
# recheck_updating_shard_ranges = 3600
#
# object_chunk_size = 65536 # object_chunk_size = 65536
# client_chunk_size = 65536 # client_chunk_size = 65536
# #

View File

@ -45,7 +45,7 @@ from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, config_true_value, \ from swift.common.utils import Timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, \ GreenAsyncPile, quorum_size, parse_content_type, \
document_iters_to_http_response_body, ShardRange document_iters_to_http_response_body, ShardRange, find_shard_range
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common import constraints from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
@ -67,6 +67,7 @@ from swift.common.storage_policy import POLICIES
DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds
DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds
DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds
def update_headers(response, headers): def update_headers(response, headers):
@ -443,7 +444,7 @@ def get_account_info(env, app, swift_source=None):
return info return info
def get_cache_key(account, container=None, obj=None): def get_cache_key(account, container=None, obj=None, shard=None):
""" """
Get the keys for both memcache and env['swift.infocache'] (cache_key) Get the keys for both memcache and env['swift.infocache'] (cache_key)
where info about accounts, containers, and objects is cached where info about accounts, containers, and objects is cached
@ -451,6 +452,9 @@ def get_cache_key(account, container=None, obj=None):
:param account: The name of the account :param account: The name of the account
:param container: The name of the container (or None if account) :param container: The name of the container (or None if account)
:param obj: The name of the object (or None if account or container) :param obj: The name of the object (or None if account or container)
:param shard: Sharding state for the container query; typically 'updating'
or 'listing' (Requires account and container; cannot use
with obj)
:returns: a (native) string cache_key :returns: a (native) string cache_key
""" """
if six.PY2: if six.PY2:
@ -468,7 +472,13 @@ def get_cache_key(account, container=None, obj=None):
container = to_native(container) container = to_native(container)
obj = to_native(obj) obj = to_native(obj)
if shard:
if not (account and container):
raise ValueError('Shard cache key requires account and container')
if obj: if obj:
raise ValueError('Shard cache key cannot have obj')
cache_key = 'shard-%s/%s/%s' % (shard, account, container)
elif obj:
if not (account and container): if not (account and container):
raise ValueError('Object cache key requires account and container') raise ValueError('Object cache key requires account and container')
cache_key = 'object/%s/%s/%s' % (account, container, obj) cache_key = 'object/%s/%s/%s' % (account, container, obj)
@ -2191,3 +2201,55 @@ class Controller(object):
"Failed to get shard ranges from %s: invalid data: %r", "Failed to get shard ranges from %s: invalid data: %r",
req.path_qs, err) req.path_qs, err)
return None return None
def _get_update_shard(self, req, account, container, obj):
"""
Find the appropriate shard range for an object update.
Note that this fetches and caches (in both the per-request infocache
and memcache, if available) all shard ranges for the given root
container so we won't have to contact the container DB for every write.
:param req: original Request instance.
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param obj: object getting updated.
:return: an instance of :class:`swift.common.utils.ShardRange`,
or None if the update should go back to the root
"""
if not self.app.recheck_updating_shard_ranges:
# caching is disabled; fall back to old behavior
shard_ranges = self._get_shard_ranges(
req, account, container, states='updating', includes=obj)
if not shard_ranges:
return None
return shard_ranges[0]
cache_key = get_cache_key(account, container, shard='updating')
infocache = req.environ.setdefault('swift.infocache', {})
memcache = getattr(self.app, 'memcache', None) or req.environ.get(
'swift.cache')
cached_ranges = infocache.get(cache_key)
if cached_ranges is None and memcache:
cached_ranges = memcache.get(cache_key)
if cached_ranges:
shard_ranges = [
ShardRange.from_dict(shard_range)
for shard_range in cached_ranges]
else:
shard_ranges = self._get_shard_ranges(
req, account, container, states='updating')
if shard_ranges:
cached_ranges = [dict(sr) for sr in shard_ranges]
# went to disk; cache it
if memcache:
memcache.set(cache_key, cached_ranges,
time=self.app.recheck_updating_shard_ranges)
if not shard_ranges:
return None
infocache[cache_key] = tuple(cached_ranges)
return find_shard_range(obj, shard_ranges)

View File

@ -271,13 +271,12 @@ class BaseObjectController(Controller):
# find the sharded container to which we'll send the update # find the sharded container to which we'll send the update
db_state = container_info.get('sharding_state', 'unsharded') db_state = container_info.get('sharding_state', 'unsharded')
if db_state in ('sharded', 'sharding'): if db_state in ('sharded', 'sharding'):
shard_ranges = self._get_shard_ranges( shard_range = self._get_update_shard(
req, self.account_name, self.container_name, req, self.account_name, self.container_name, self.object_name)
includes=self.object_name, states='updating') if shard_range:
if shard_ranges:
partition, nodes = self.app.container_ring.get_nodes( partition, nodes = self.app.container_ring.get_nodes(
shard_ranges[0].account, shard_ranges[0].container) shard_range.account, shard_range.container)
return partition, nodes, shard_ranges[0].name return partition, nodes, shard_range.name
return container_info['partition'], container_info['nodes'], None return container_info['partition'], container_info['nodes'], None

View File

@ -40,7 +40,8 @@ from swift.common.constraints import check_utf8, valid_api_version
from swift.proxy.controllers import AccountController, ContainerController, \ from swift.proxy.controllers import AccountController, ContainerController, \
ObjectControllerRouter, InfoController ObjectControllerRouter, InfoController
from swift.proxy.controllers.base import get_container_info, NodeIter, \ from swift.proxy.controllers.base import get_container_info, NodeIter, \
DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE, \
DEFAULT_RECHECK_UPDATING_SHARD_RANGES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \ HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \
@ -202,6 +203,9 @@ class Application(object):
self.recheck_container_existence = \ self.recheck_container_existence = \
int(conf.get('recheck_container_existence', int(conf.get('recheck_container_existence',
DEFAULT_RECHECK_CONTAINER_EXISTENCE)) DEFAULT_RECHECK_CONTAINER_EXISTENCE))
self.recheck_updating_shard_ranges = \
int(conf.get('recheck_updating_shard_ranges',
DEFAULT_RECHECK_UPDATING_SHARD_RANGES))
self.recheck_account_existence = \ self.recheck_account_existence = \
int(conf.get('recheck_account_existence', int(conf.get('recheck_account_existence',
DEFAULT_RECHECK_ACCOUNT_EXISTENCE)) DEFAULT_RECHECK_ACCOUNT_EXISTENCE))

View File

@ -20,15 +20,16 @@ import uuid
from nose import SkipTest from nose import SkipTest
from swift.common import direct_client from swift.common import direct_client, utils
from swift.common.manager import Manager
from swift.common.memcached import MemcacheRing
from swift.common.direct_client import DirectClientException from swift.common.direct_client import DirectClientException
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
quorum_size, config_true_value, Timestamp quorum_size, config_true_value, Timestamp
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
from swift.common import utils
from swift.common.manager import Manager
from swiftclient import client, get_auth, ClientException from swiftclient import client, get_auth, ClientException
from swift.proxy.controllers.base import get_cache_key
from swift.proxy.controllers.obj import num_container_updates from swift.proxy.controllers.obj import num_container_updates
from test import annotate_failure from test import annotate_failure
from test.probe.brain import BrainSplitter from test.probe.brain import BrainSplitter
@ -116,6 +117,7 @@ class BaseTestContainerSharding(ReplProbeTest):
self.brain.put_container(policy_index=int(self.policy)) self.brain.put_container(policy_index=int(self.policy))
self.sharders = Manager(['container-sharder']) self.sharders = Manager(['container-sharder'])
self.internal_client = self.make_internal_client() self.internal_client = self.make_internal_client()
self.memcache = MemcacheRing(['127.0.0.1:11211'])
def stop_container_servers(self, node_numbers=None): def stop_container_servers(self, node_numbers=None):
if node_numbers: if node_numbers:
@ -835,6 +837,9 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_container_listing(more_obj_names + obj_names) self.assert_container_listing(more_obj_names + obj_names)
self.assert_container_object_count(len(more_obj_names + obj_names)) self.assert_container_object_count(len(more_obj_names + obj_names))
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
# add another object that lands in the first of the new sub-shards # add another object that lands in the first of the new sub-shards
self.put_objects(['alpha']) self.put_objects(['alpha'])
@ -1217,6 +1222,10 @@ class TestContainerSharding(BaseTestContainerSharding):
# now look up the shard target for subsequent updates # now look up the shard target for subsequent updates
self.assert_container_listing(obj_names) self.assert_container_listing(obj_names)
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
# delete objects from first shard range # delete objects from first shard range
first_shard_objects = [obj_name for obj_name in obj_names first_shard_objects = [obj_name for obj_name in obj_names
if obj_name <= orig_shard_ranges[0].upper] if obj_name <= orig_shard_ranges[0].upper]
@ -1243,6 +1252,11 @@ class TestContainerSharding(BaseTestContainerSharding):
# to a GET for a redirect target, the object update will default to # to a GET for a redirect target, the object update will default to
# being targeted at the root container # being targeted at the root container
self.stop_container_servers() self.stop_container_servers()
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
self.put_objects([beta]) self.put_objects([beta])
self.brain.servers.start() self.brain.servers.start()
async_pendings = self.gather_async_pendings( async_pendings = self.gather_async_pendings(
@ -1746,6 +1760,9 @@ class TestContainerSharding(BaseTestContainerSharding):
shard_part, shard_nodes = self.get_part_and_node_numbers( shard_part, shard_nodes = self.get_part_and_node_numbers(
shard_ranges[1]) shard_ranges[1])
self.brain.servers.stop(number=shard_nodes[2]) self.brain.servers.stop(number=shard_nodes[2])
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
self.delete_objects(['alpha']) self.delete_objects(['alpha'])
self.put_objects(['beta']) self.put_objects(['beta'])
self.assert_container_listing(['beta']) self.assert_container_listing(['beta'])

View File

@ -3631,12 +3631,13 @@ class TestReplicatedObjectController(
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
StoragePolicy(1, 'one', object_ring=FakeRing()), StoragePolicy(1, 'one', object_ring=FakeRing()),
]) ])
def test_backend_headers_update_shard_container(self): def test_backend_headers_update_shard_container_no_cache(self):
# verify that when container is sharded the backend container update is # verify that when container is sharded the backend container update is
# directed to the shard container # directed to the shard container
# reset the router post patch_policies # reset the router post patch_policies
self.app.obj_controller_router = proxy_server.ObjectControllerRouter() self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
self.app.recheck_updating_shard_ranges = 0
def do_test(method, sharding_state): def do_test(method, sharding_state):
self.app.memcache.store = {} self.app.memcache.store = {}
@ -3686,7 +3687,8 @@ class TestReplicatedObjectController(
container_request_shard = backend_requests[2] container_request_shard = backend_requests[2]
check_request( check_request(
container_request_shard, method='GET', path='/sda/0/a/c', container_request_shard, method='GET', path='/sda/0/a/c',
params={'includes': 'o'}) params={'includes': 'o', 'states': 'updating'},
headers={'X-Backend-Record-Type': 'shard'})
# make sure backend requests included expected container headers # make sure backend requests included expected container headers
container_headers = {} container_headers = {}
@ -3720,6 +3722,219 @@ class TestReplicatedObjectController(
do_test('PUT', 'sharding') do_test('PUT', 'sharding')
do_test('PUT', 'sharded') do_test('PUT', 'sharded')
@patch_policies([
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
StoragePolicy(1, 'one', object_ring=FakeRing()),
])
def test_backend_headers_update_shard_container_with_empty_cache(self):
# verify that when container is sharded the backend container update is
# directed to the shard container
# reset the router post patch_policies
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
self.app.recheck_updating_shard_ranges = 3600
def do_test(method, sharding_state):
self.app.memcache.store = {}
req = Request.blank('/v1/a/c/o', {}, method=method, body='',
headers={'Content-Type': 'text/plain'})
# we want the container_info response to say policy index of 1 and
# sharding state
# acc HEAD, cont HEAD, cont shard GET, obj POSTs
status_codes = (200, 200, 200, 202, 202, 202)
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
'x-backend-sharding-state': sharding_state,
'X-Backend-Record-Type': 'shard'}
shard_ranges = [
utils.ShardRange(
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
utils.ShardRange(
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
utils.ShardRange(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
body = json.dumps([
dict(shard_range)
for shard_range in shard_ranges]).encode('ascii')
with mocked_http_conn(*status_codes, headers=resp_headers,
body=body) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
backend_requests = fake_conn.requests
def check_request(req, method, path, headers=None, params=None):
self.assertEqual(method, req['method'])
# caller can ignore leading path parts
self.assertTrue(req['path'].endswith(path),
'expected path to end with %s, it was %s' % (
path, req['path']))
headers = headers or {}
# caller can ignore some headers
for k, v in headers.items():
self.assertEqual(req['headers'][k], v,
'Expected %s but got %s for key %s' %
(v, req['headers'][k], k))
params = params or {}
req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {}
for k, v in params.items():
self.assertEqual(req_params[k], v,
'Expected %s but got %s for key %s' %
(v, req_params[k], k))
account_request = backend_requests[0]
check_request(account_request, method='HEAD', path='/sda/0/a')
container_request = backend_requests[1]
check_request(container_request, method='HEAD', path='/sda/0/a/c')
container_request_shard = backend_requests[2]
check_request(
container_request_shard, method='GET', path='/sda/0/a/c',
params={'states': 'updating'},
headers={'X-Backend-Record-Type': 'shard'})
cache_key = 'shard-updating/a/c'
self.assertIn(cache_key, self.app.memcache.store)
self.assertEqual(self.app.memcache.store[cache_key],
[dict(sr) for sr in shard_ranges])
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
# make sure backend requests included expected container headers
container_headers = {}
for request in backend_requests[3:]:
req_headers = request['headers']
device = req_headers['x-container-device']
container_headers[device] = req_headers['x-container-host']
expectations = {
'method': method,
'path': '/0/a/c/o',
'headers': {
'X-Container-Partition': '0',
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)
expected = {}
for i, device in enumerate(['sda', 'sdb', 'sdc']):
expected[device] = '10.0.0.%d:100%d' % (i, i)
self.assertEqual(container_headers, expected)
do_test('POST', 'sharding')
do_test('POST', 'sharded')
do_test('DELETE', 'sharding')
do_test('DELETE', 'sharded')
do_test('PUT', 'sharding')
do_test('PUT', 'sharded')
@patch_policies([
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
StoragePolicy(1, 'one', object_ring=FakeRing()),
])
def test_backend_headers_update_shard_container_with_live_cache(self):
# verify that when container is sharded the backend container update is
# directed to the shard container
# reset the router post patch_policies
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
self.app.recheck_updating_shard_ranges = 3600
def do_test(method, sharding_state):
shard_ranges = [
utils.ShardRange(
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
utils.ShardRange(
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
utils.ShardRange(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
self.app.memcache.store = {'shard-updating/a/c': tuple(
dict(shard_range) for shard_range in shard_ranges)}
req = Request.blank('/v1/a/c/o', {}, method=method, body='',
headers={'Content-Type': 'text/plain'})
# we want the container_info response to say policy index of 1 and
# sharding state
# acc HEAD, cont HEAD, obj POSTs
status_codes = (200, 200, 202, 202, 202)
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
'x-backend-sharding-state': sharding_state,
'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(*status_codes,
headers=resp_headers) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
backend_requests = fake_conn.requests
def check_request(req, method, path, headers=None, params=None):
self.assertEqual(method, req['method'])
# caller can ignore leading path parts
self.assertTrue(req['path'].endswith(path),
'expected path to end with %s, it was %s' % (
path, req['path']))
headers = headers or {}
# caller can ignore some headers
for k, v in headers.items():
self.assertEqual(req['headers'][k], v,
'Expected %s but got %s for key %s' %
(v, req['headers'][k], k))
params = params or {}
req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {}
for k, v in params.items():
self.assertEqual(req_params[k], v,
'Expected %s but got %s for key %s' %
(v, req_params[k], k))
account_request = backend_requests[0]
check_request(account_request, method='HEAD', path='/sda/0/a')
container_request = backend_requests[1]
check_request(container_request, method='HEAD', path='/sda/0/a/c')
# infocache gets populated from memcache
cache_key = 'shard-updating/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
# make sure backend requests included expected container headers
container_headers = {}
for request in backend_requests[2:]:
req_headers = request['headers']
device = req_headers['x-container-device']
container_headers[device] = req_headers['x-container-host']
expectations = {
'method': method,
'path': '/0/a/c/o',
'headers': {
'X-Container-Partition': '0',
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)
expected = {}
for i, device in enumerate(['sda', 'sdb', 'sdc']):
expected[device] = '10.0.0.%d:100%d' % (i, i)
self.assertEqual(container_headers, expected)
do_test('POST', 'sharding')
do_test('POST', 'sharded')
do_test('DELETE', 'sharding')
do_test('DELETE', 'sharded')
do_test('PUT', 'sharding')
do_test('PUT', 'sharded')
def test_DELETE(self): def test_DELETE(self):
with save_globals(): with save_globals():
def test_status_map(statuses, expected): def test_status_map(statuses, expected):