memcached: Give callers the option to accept errors

Auth middlewares in particular may want to *know* when there's a
communication breakdown as opposed to a cache miss.

Update our shard-range cache stats to acknowlegde the distinction.

Drive-by: Log an error if all memcached servers are error-limited.

Change-Id: Ic8d0915235d11124d06ec940c5be9a2edbe85c83
This commit is contained in:
Tim Burke 2022-04-26 16:12:49 -07:00
parent b621a6f932
commit 9bed525bfb
7 changed files with 165 additions and 26 deletions

View File

@ -258,6 +258,7 @@ class MemcacheRing(object):
"""
pos = bisect(self._sorted, key)
served = []
any_yielded = False
while len(served) < self._tries:
pos = (pos + 1) % len(self._sorted)
server = self._ring[self._sorted[pos]]
@ -270,6 +271,7 @@ class MemcacheRing(object):
try:
with MemcachePoolTimeout(self._pool_timeout):
fp, sock = self._client_cache[server].get()
any_yielded = True
yield server, fp, sock
except MemcachePoolTimeout as e:
self._exception_occurred(
@ -281,13 +283,15 @@ class MemcacheRing(object):
# object.
self._exception_occurred(
server, e, action='connecting', sock=sock)
if not any_yielded:
self.logger.error('All memcached servers error-limited')
def _return_conn(self, server, fp, sock):
"""Returns a server connection to the pool."""
self._client_cache[server].put((fp, sock))
def set(self, key, value, serialize=True, time=0,
min_compress_len=0):
min_compress_len=0, raise_on_error=False):
"""
Set a key/value pair in memcache
@ -301,6 +305,8 @@ class MemcacheRing(object):
added to keep the signature compatible with
python-memcached interface. This
implementation ignores it.
:param raise_on_error: if True, propagate Timeouts and other errors.
By default, errors are ignored.
"""
key = md5hash(key)
timeout = sanitize_timeout(time)
@ -340,14 +346,19 @@ class MemcacheRing(object):
return
except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp)
if raise_on_error:
raise MemcacheConnectionError(
"No memcached connections succeeded.")
def get(self, key):
def get(self, key, raise_on_error=False):
"""
Gets the object specified by key. It will also unserialize the object
before returning if it is serialized in memcache with JSON, or if it
is pickled and unpickling is allowed.
:param key: key
:param raise_on_error: if True, propagate Timeouts and other errors.
By default, errors are treated as cache misses.
:returns: value of the key in memcache
"""
key = md5hash(key)
@ -378,6 +389,9 @@ class MemcacheRing(object):
return value
except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp)
if raise_on_error:
raise MemcacheConnectionError(
"No memcached connections succeeded.")
def incr(self, key, delta=1, time=0):
"""

View File

@ -40,6 +40,7 @@ from eventlet import sleep
from eventlet.timeout import Timeout
import six
from swift.common.memcached import MemcacheConnectionError
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
@ -2400,9 +2401,13 @@ class Controller(object):
if skip_chance and random.random() < skip_chance:
self.logger.increment('shard_updating.cache.skip')
else:
cached_ranges = memcache.get(cache_key)
self.logger.increment('shard_updating.cache.%s' % (
'hit' if cached_ranges else 'miss'))
try:
cached_ranges = memcache.get(
cache_key, raise_on_error=True)
cache_state = 'hit' if cached_ranges else 'miss'
except MemcacheConnectionError:
cache_state = 'error'
self.logger.increment('shard_updating.cache.%s' % cache_state)
if cached_ranges:
shard_ranges = [

View File

@ -20,6 +20,7 @@ import random
import six
from six.moves.urllib.parse import unquote
from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import public, private, csv_append, Timestamp, \
config_true_value, ShardRange, cache_from_env, filter_shard_ranges
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
@ -152,9 +153,14 @@ class ContainerController(Controller):
if skip_chance and random.random() < skip_chance:
self.logger.increment('shard_listing.cache.skip')
else:
cached_ranges = memcache.get(cache_key)
self.logger.increment('shard_listing.cache.%s' % (
'hit' if cached_ranges else 'miss'))
try:
cached_ranges = memcache.get(
cache_key, raise_on_error=True)
cache_state = 'hit' if cached_ranges else 'miss'
except MemcacheConnectionError:
cache_state = 'error'
self.logger.increment(
'shard_listing.cache.%s' % cache_state)
if cached_ranges is not None:
infocache[cache_key] = tuple(cached_ranges)

View File

@ -410,17 +410,22 @@ def track(f):
class FakeMemcache(object):
def __init__(self):
def __init__(self, error_on_set=None, error_on_get=None):
self.store = {}
self.calls = []
self.error_on_incr = False
self.error_on_get = error_on_get or []
self.error_on_set = error_on_set or []
self.init_incr_return_neg = False
def clear_calls(self):
del self.calls[:]
@track
def get(self, key):
def get(self, key, raise_on_error=False):
if self.error_on_get and self.error_on_get.pop(0):
if raise_on_error:
raise MemcacheConnectionError()
return self.store.get(key)
@property
@ -428,7 +433,10 @@ class FakeMemcache(object):
return self.store.keys
@track
def set(self, key, value, serialize=True, time=0):
def set(self, key, value, serialize=True, time=0, raise_on_error=False):
if self.error_on_set and self.error_on_set.pop(0):
if raise_on_error:
raise MemcacheConnectionError()
if serialize:
value = json.loads(json.dumps(value))
else:

View File

@ -32,6 +32,7 @@ from eventlet import GreenPool, sleep, Queue
from eventlet.pools import Pool
from swift.common import memcached
from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import md5, human_readable
from mock import patch, MagicMock
from test.debug_logger import debug_logger
@ -581,19 +582,27 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe',
] * 11 + [
'Error limiting server 1.2.3.4:11211'
] * 10 + [
'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe',
'Error limiting server 1.2.3.4:11211',
'All memcached servers error-limited',
])
self.logger.clear()
# continued requests just keep bypassing memcache
for _ in range(12):
memcache_client.set('some_key', [1, 2, 3])
self.assertEqual(self.logger.get_lines_for_level('error'), [])
self.assertEqual(self.logger.get_lines_for_level('error'), [
'All memcached servers error-limited',
] * 12)
self.logger.clear()
# and get()s are all a "cache miss"
self.assertIsNone(memcache_client.get('some_key'))
self.assertEqual(self.logger.get_lines_for_level('error'), [])
self.assertEqual(self.logger.get_lines_for_level('error'), [
'All memcached servers error-limited',
])
def test_error_disabled(self):
memcache_client = memcached.MemcacheRing(
@ -611,6 +620,44 @@ class TestMemcached(unittest.TestCase):
'[Errno 32] Broken pipe',
] * 20)
def test_error_raising(self):
memcache_client = memcached.MemcacheRing(
['1.2.3.4:11211'], logger=self.logger, error_limit_time=0)
mock1 = ExplodingMockMemcached()
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock1, mock1)] * 20)
# expect exception when requested...
with self.assertRaises(MemcacheConnectionError):
memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe',
])
self.logger.clear()
with self.assertRaises(MemcacheConnectionError):
memcache_client.get('some_key', raise_on_error=True)
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe',
])
self.logger.clear()
# ...but default is no exception
memcache_client.set('some_key', [1, 2, 3])
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe',
])
self.logger.clear()
memcache_client.get('some_key')
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe',
])
def test_error_limiting_custom_config(self):
def do_calls(time_step, num_calls, **memcache_kwargs):
self.logger.clear()
@ -632,8 +679,11 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
] * 11 + [
'Error limiting server 1.2.3.5:11211'
] * 10 + [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
'Error limiting server 1.2.3.5:11211',
'All memcached servers error-limited',
])
# with default error_limit_time of 60, one call per 6 secs, error limit
@ -650,8 +700,11 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
] * 11 + [
'Error limiting server 1.2.3.5:11211'
] * 10 + [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
'Error limiting server 1.2.3.5:11211',
'All memcached servers error-limited',
])
# with error_limit_time of 70, one call per 6 secs, error_limit_count
@ -660,8 +713,11 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
] * 12 + [
'Error limiting server 1.2.3.5:11211'
] * 11 + [
'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe',
'Error limiting server 1.2.3.5:11211',
'All memcached servers error-limited',
])
def test_delete(self):

View File

@ -2148,7 +2148,7 @@ class TestContainerController(TestRingBase):
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.get('shard-listing/a/c'),
mock.call.get('shard-listing/a/c', raise_on_error=True),
mock.call.set('shard-listing/a/c', self.sr_dicts,
time=exp_recheck_listing),
# Since there was a backend request, we go ahead and cache
@ -2177,7 +2177,7 @@ class TestContainerController(TestRingBase):
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.get('shard-listing/a/c')],
mock.call.get('shard-listing/a/c', raise_on_error=True)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
@ -2237,7 +2237,7 @@ class TestContainerController(TestRingBase):
'X-Backend-Sharding-State': sharding_state})
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.get('shard-listing/a/c')],
mock.call.get('shard-listing/a/c', raise_on_error=True)],
self.memcache.calls)
self.assertIn('swift.infocache', req.environ)
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
@ -2390,7 +2390,7 @@ class TestContainerController(TestRingBase):
# deleted from cache
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.get('shard-listing/a/c'),
mock.call.get('shard-listing/a/c', raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=6.0)],
self.memcache.calls)
self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
@ -2400,6 +2400,39 @@ class TestContainerController(TestRingBase):
'container.shard_listing.backend.404': 1},
self.logger.get_increment_counts())
def test_GET_shard_ranges_read_from_cache_error(self):
self._setup_shard_range_stubs()
self.memcache = FakeMemcache()
self.memcache.delete_all()
self.logger.clear()
info = headers_to_container_info(self.root_resp_hdrs)
info['status'] = 200
info['sharding_state'] = 'sharded'
self.memcache.set('container/a/c', info)
self.memcache.clear_calls()
self.memcache.error_on_get = [False, True]
req = self._build_request({'X-Backend-Record-Type': 'shard'},
{'states': 'listing'}, {})
backend_req, resp = self._capture_backend_request(
req, 404, b'', {}, num_resp=2 * self.CONTAINER_REPLICAS)
self._check_backend_req(
req, backend_req,
extra_hdrs={'X-Backend-Record-Type': 'shard',
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
self.assertNotIn('X-Backend-Cached-Results', resp.headers)
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.get('shard-listing/a/c', raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=6.0)],
self.memcache.calls)
self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
self.assertEqual(b'', resp.body)
self.assertEqual(404, resp.status_int)
self.assertEqual({'container.shard_listing.cache.error': 1,
'container.shard_listing.backend.404': 1},
self.logger.get_increment_counts())
def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type):
# pre-warm cache with container metadata and shard ranges and verify
# that shard range listing are read from cache when appropriate
@ -2417,7 +2450,7 @@ class TestContainerController(TestRingBase):
resp = req.get_response(self.app)
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.get('shard-listing/a/c')],
mock.call.get('shard-listing/a/c', raise_on_error=True)],
self.memcache.calls)
self.assertEqual({'container.shard_listing.cache.hit': 1},
self.logger.get_increment_counts())

View File

@ -4413,6 +4413,23 @@ class TestReplicatedObjectController(
expected[device] = '10.0.0.%d:100%d' % (i, i)
self.assertEqual(container_headers, expected)
# shard lookup in memcache may error...
req = Request.blank(
'/v1/a/c/o', {'swift.cache': cache},
method=method, body='', headers={'Content-Type': 'text/plain'})
cache.error_on_get = [False, True]
with mock.patch('random.random', return_value=1.0), \
mocked_http_conn(*status_codes, headers=resp_headers,
body=body):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
stats = self.app.logger.get_increment_counts()
self.assertEqual({'object.shard_updating.cache.skip': 1,
'object.shard_updating.cache.hit': 1,
'object.shard_updating.cache.error': 1,
'object.shard_updating.backend.200': 2}, stats)
do_test('POST', 'sharding')
do_test('POST', 'sharded')
do_test('DELETE', 'sharding')