Merge "memcached: Give callers the option to accept errors"
This commit is contained in:
commit
eeb5533457
@ -254,6 +254,7 @@ class MemcacheRing(object):
|
||||
"""
|
||||
pos = bisect(self._sorted, key)
|
||||
served = []
|
||||
any_yielded = False
|
||||
while len(served) < self._tries:
|
||||
pos = (pos + 1) % len(self._sorted)
|
||||
server = self._ring[self._sorted[pos]]
|
||||
@ -266,6 +267,7 @@ class MemcacheRing(object):
|
||||
try:
|
||||
with MemcachePoolTimeout(self._pool_timeout):
|
||||
fp, sock = self._client_cache[server].get()
|
||||
any_yielded = True
|
||||
yield server, fp, sock
|
||||
except MemcachePoolTimeout as e:
|
||||
self._exception_occurred(
|
||||
@ -277,13 +279,15 @@ class MemcacheRing(object):
|
||||
# object.
|
||||
self._exception_occurred(
|
||||
server, e, action='connecting', sock=sock)
|
||||
if not any_yielded:
|
||||
self.logger.error('All memcached servers error-limited')
|
||||
|
||||
def _return_conn(self, server, fp, sock):
|
||||
"""Returns a server connection to the pool."""
|
||||
self._client_cache[server].put((fp, sock))
|
||||
|
||||
def set(self, key, value, serialize=True, time=0,
|
||||
min_compress_len=0):
|
||||
min_compress_len=0, raise_on_error=False):
|
||||
"""
|
||||
Set a key/value pair in memcache
|
||||
|
||||
@ -296,6 +300,8 @@ class MemcacheRing(object):
|
||||
added to keep the signature compatible with
|
||||
python-memcached interface. This
|
||||
implementation ignores it.
|
||||
:param raise_on_error: if True, propagate Timeouts and other errors.
|
||||
By default, errors are ignored.
|
||||
"""
|
||||
key = md5hash(key)
|
||||
timeout = sanitize_timeout(time)
|
||||
@ -332,13 +338,18 @@ class MemcacheRing(object):
|
||||
return
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
||||
if raise_on_error:
|
||||
raise MemcacheConnectionError(
|
||||
"No memcached connections succeeded.")
|
||||
|
||||
def get(self, key):
|
||||
def get(self, key, raise_on_error=False):
|
||||
"""
|
||||
Gets the object specified by key. It will also unserialize the object
|
||||
before returning if it is serialized in memcache with JSON.
|
||||
|
||||
:param key: key
|
||||
:param raise_on_error: if True, propagate Timeouts and other errors.
|
||||
By default, errors are treated as cache misses.
|
||||
:returns: value of the key in memcache
|
||||
"""
|
||||
key = md5hash(key)
|
||||
@ -366,6 +377,9 @@ class MemcacheRing(object):
|
||||
return value
|
||||
except (Exception, Timeout) as e:
|
||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
||||
if raise_on_error:
|
||||
raise MemcacheConnectionError(
|
||||
"No memcached connections succeeded.")
|
||||
|
||||
def incr(self, key, delta=1, time=0):
|
||||
"""
|
||||
|
@ -40,6 +40,7 @@ from eventlet import sleep
|
||||
from eventlet.timeout import Timeout
|
||||
import six
|
||||
|
||||
from swift.common.memcached import MemcacheConnectionError
|
||||
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
|
||||
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
|
||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||
@ -2400,9 +2401,13 @@ class Controller(object):
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.logger.increment('shard_updating.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.logger.increment('shard_updating.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
try:
|
||||
cached_ranges = memcache.get(
|
||||
cache_key, raise_on_error=True)
|
||||
cache_state = 'hit' if cached_ranges else 'miss'
|
||||
except MemcacheConnectionError:
|
||||
cache_state = 'error'
|
||||
self.logger.increment('shard_updating.cache.%s' % cache_state)
|
||||
|
||||
if cached_ranges:
|
||||
shard_ranges = [
|
||||
|
@ -19,6 +19,7 @@ import random
|
||||
import six
|
||||
from six.moves.urllib.parse import unquote
|
||||
|
||||
from swift.common.memcached import MemcacheConnectionError
|
||||
from swift.common.utils import public, private, csv_append, Timestamp, \
|
||||
config_true_value, ShardRange, cache_from_env, filter_shard_ranges
|
||||
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
|
||||
@ -151,9 +152,14 @@ class ContainerController(Controller):
|
||||
if skip_chance and random.random() < skip_chance:
|
||||
self.logger.increment('shard_listing.cache.skip')
|
||||
else:
|
||||
cached_ranges = memcache.get(cache_key)
|
||||
self.logger.increment('shard_listing.cache.%s' % (
|
||||
'hit' if cached_ranges else 'miss'))
|
||||
try:
|
||||
cached_ranges = memcache.get(
|
||||
cache_key, raise_on_error=True)
|
||||
cache_state = 'hit' if cached_ranges else 'miss'
|
||||
except MemcacheConnectionError:
|
||||
cache_state = 'error'
|
||||
self.logger.increment(
|
||||
'shard_listing.cache.%s' % cache_state)
|
||||
|
||||
if cached_ranges is not None:
|
||||
infocache[cache_key] = tuple(cached_ranges)
|
||||
|
@ -410,17 +410,22 @@ def track(f):
|
||||
|
||||
class FakeMemcache(object):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, error_on_set=None, error_on_get=None):
|
||||
self.store = {}
|
||||
self.calls = []
|
||||
self.error_on_incr = False
|
||||
self.error_on_get = error_on_get or []
|
||||
self.error_on_set = error_on_set or []
|
||||
self.init_incr_return_neg = False
|
||||
|
||||
def clear_calls(self):
|
||||
del self.calls[:]
|
||||
|
||||
@track
|
||||
def get(self, key):
|
||||
def get(self, key, raise_on_error=False):
|
||||
if self.error_on_get and self.error_on_get.pop(0):
|
||||
if raise_on_error:
|
||||
raise MemcacheConnectionError()
|
||||
return self.store.get(key)
|
||||
|
||||
@property
|
||||
@ -428,7 +433,10 @@ class FakeMemcache(object):
|
||||
return self.store.keys
|
||||
|
||||
@track
|
||||
def set(self, key, value, serialize=True, time=0):
|
||||
def set(self, key, value, serialize=True, time=0, raise_on_error=False):
|
||||
if self.error_on_set and self.error_on_set.pop(0):
|
||||
if raise_on_error:
|
||||
raise MemcacheConnectionError()
|
||||
if serialize:
|
||||
value = json.loads(json.dumps(value))
|
||||
else:
|
||||
|
@ -32,6 +32,7 @@ from eventlet import GreenPool, sleep, Queue
|
||||
from eventlet.pools import Pool
|
||||
|
||||
from swift.common import memcached
|
||||
from swift.common.memcached import MemcacheConnectionError
|
||||
from swift.common.utils import md5, human_readable
|
||||
from mock import patch, MagicMock
|
||||
from test.debug_logger import debug_logger
|
||||
@ -581,19 +582,27 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 11 + [
|
||||
'Error limiting server 1.2.3.4:11211'
|
||||
] * 10 + [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.4:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
# continued requests just keep bypassing memcache
|
||||
for _ in range(12):
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [])
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'All memcached servers error-limited',
|
||||
] * 12)
|
||||
self.logger.clear()
|
||||
|
||||
# and get()s are all a "cache miss"
|
||||
self.assertIsNone(memcache_client.get('some_key'))
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [])
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
|
||||
def test_error_disabled(self):
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
@ -611,6 +620,44 @@ class TestMemcached(unittest.TestCase):
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 20)
|
||||
|
||||
def test_error_raising(self):
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211'], logger=self.logger, error_limit_time=0)
|
||||
mock1 = ExplodingMockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock1, mock1)] * 20)
|
||||
|
||||
# expect exception when requested...
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
with self.assertRaises(MemcacheConnectionError):
|
||||
memcache_client.get('some_key', raise_on_error=True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
# ...but default is no exception
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
self.logger.clear()
|
||||
|
||||
memcache_client.get('some_key')
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.4:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
|
||||
def test_error_limiting_custom_config(self):
|
||||
def do_calls(time_step, num_calls, **memcache_kwargs):
|
||||
self.logger.clear()
|
||||
@ -632,8 +679,11 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 11 + [
|
||||
'Error limiting server 1.2.3.5:11211'
|
||||
] * 10 + [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
|
||||
# with default error_limit_time of 60, one call per 6 secs, error limit
|
||||
@ -650,8 +700,11 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 11 + [
|
||||
'Error limiting server 1.2.3.5:11211'
|
||||
] * 10 + [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
|
||||
# with error_limit_time of 70, one call per 6 secs, error_limit_count
|
||||
@ -660,8 +713,11 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
] * 12 + [
|
||||
'Error limiting server 1.2.3.5:11211'
|
||||
] * 11 + [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
'Error limiting server 1.2.3.5:11211',
|
||||
'All memcached servers error-limited',
|
||||
])
|
||||
|
||||
def test_delete(self):
|
||||
|
@ -2148,7 +2148,7 @@ class TestContainerController(TestRingBase):
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c'),
|
||||
mock.call.get('shard-listing/a/c', raise_on_error=True),
|
||||
mock.call.set('shard-listing/a/c', self.sr_dicts,
|
||||
time=exp_recheck_listing),
|
||||
# Since there was a backend request, we go ahead and cache
|
||||
@ -2177,7 +2177,7 @@ class TestContainerController(TestRingBase):
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c')],
|
||||
mock.call.get('shard-listing/a/c', raise_on_error=True)],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
@ -2237,7 +2237,7 @@ class TestContainerController(TestRingBase):
|
||||
'X-Backend-Sharding-State': sharding_state})
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c')],
|
||||
mock.call.get('shard-listing/a/c', raise_on_error=True)],
|
||||
self.memcache.calls)
|
||||
self.assertIn('swift.infocache', req.environ)
|
||||
self.assertIn('shard-listing/a/c', req.environ['swift.infocache'])
|
||||
@ -2390,7 +2390,7 @@ class TestContainerController(TestRingBase):
|
||||
# deleted from cache
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c'),
|
||||
mock.call.get('shard-listing/a/c', raise_on_error=True),
|
||||
mock.call.set('container/a/c', mock.ANY, time=6.0)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
|
||||
@ -2400,6 +2400,39 @@ class TestContainerController(TestRingBase):
|
||||
'container.shard_listing.backend.404': 1},
|
||||
self.logger.get_increment_counts())
|
||||
|
||||
def test_GET_shard_ranges_read_from_cache_error(self):
|
||||
self._setup_shard_range_stubs()
|
||||
self.memcache = FakeMemcache()
|
||||
self.memcache.delete_all()
|
||||
self.logger.clear()
|
||||
info = headers_to_container_info(self.root_resp_hdrs)
|
||||
info['status'] = 200
|
||||
info['sharding_state'] = 'sharded'
|
||||
self.memcache.set('container/a/c', info)
|
||||
self.memcache.clear_calls()
|
||||
self.memcache.error_on_get = [False, True]
|
||||
|
||||
req = self._build_request({'X-Backend-Record-Type': 'shard'},
|
||||
{'states': 'listing'}, {})
|
||||
backend_req, resp = self._capture_backend_request(
|
||||
req, 404, b'', {}, num_resp=2 * self.CONTAINER_REPLICAS)
|
||||
self._check_backend_req(
|
||||
req, backend_req,
|
||||
extra_hdrs={'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Shard-Name-Filter': 'sharded'})
|
||||
self.assertNotIn('X-Backend-Cached-Results', resp.headers)
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c', raise_on_error=True),
|
||||
mock.call.set('container/a/c', mock.ANY, time=6.0)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual(404, self.memcache.calls[2][1][1]['status'])
|
||||
self.assertEqual(b'', resp.body)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
self.assertEqual({'container.shard_listing.cache.error': 1,
|
||||
'container.shard_listing.backend.404': 1},
|
||||
self.logger.get_increment_counts())
|
||||
|
||||
def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type):
|
||||
# pre-warm cache with container metadata and shard ranges and verify
|
||||
# that shard range listing are read from cache when appropriate
|
||||
@ -2417,7 +2450,7 @@ class TestContainerController(TestRingBase):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(
|
||||
[mock.call.get('container/a/c'),
|
||||
mock.call.get('shard-listing/a/c')],
|
||||
mock.call.get('shard-listing/a/c', raise_on_error=True)],
|
||||
self.memcache.calls)
|
||||
self.assertEqual({'container.shard_listing.cache.hit': 1},
|
||||
self.logger.get_increment_counts())
|
||||
|
@ -4413,6 +4413,23 @@ class TestReplicatedObjectController(
|
||||
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
||||
self.assertEqual(container_headers, expected)
|
||||
|
||||
# shard lookup in memcache may error...
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o', {'swift.cache': cache},
|
||||
method=method, body='', headers={'Content-Type': 'text/plain'})
|
||||
cache.error_on_get = [False, True]
|
||||
with mock.patch('random.random', return_value=1.0), \
|
||||
mocked_http_conn(*status_codes, headers=resp_headers,
|
||||
body=body):
|
||||
resp = req.get_response(self.app)
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
stats = self.app.logger.get_increment_counts()
|
||||
self.assertEqual({'object.shard_updating.cache.skip': 1,
|
||||
'object.shard_updating.cache.hit': 1,
|
||||
'object.shard_updating.cache.error': 1,
|
||||
'object.shard_updating.backend.200': 2}, stats)
|
||||
|
||||
do_test('POST', 'sharding')
|
||||
do_test('POST', 'sharded')
|
||||
do_test('DELETE', 'sharding')
|
||||
|
Loading…
Reference in New Issue
Block a user