diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 1a371e58b6..37719bf79d 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -258,6 +258,7 @@ class MemcacheRing(object): """ pos = bisect(self._sorted, key) served = [] + any_yielded = False while len(served) < self._tries: pos = (pos + 1) % len(self._sorted) server = self._ring[self._sorted[pos]] @@ -270,6 +271,7 @@ class MemcacheRing(object): try: with MemcachePoolTimeout(self._pool_timeout): fp, sock = self._client_cache[server].get() + any_yielded = True yield server, fp, sock except MemcachePoolTimeout as e: self._exception_occurred( @@ -281,13 +283,15 @@ class MemcacheRing(object): # object. self._exception_occurred( server, e, action='connecting', sock=sock) + if not any_yielded: + self.logger.error('All memcached servers error-limited') def _return_conn(self, server, fp, sock): """Returns a server connection to the pool.""" self._client_cache[server].put((fp, sock)) def set(self, key, value, serialize=True, time=0, - min_compress_len=0): + min_compress_len=0, raise_on_error=False): """ Set a key/value pair in memcache @@ -301,6 +305,8 @@ class MemcacheRing(object): added to keep the signature compatible with python-memcached interface. This implementation ignores it. + :param raise_on_error: if True, propagate Timeouts and other errors. + By default, errors are ignored. """ key = md5hash(key) timeout = sanitize_timeout(time) @@ -340,14 +346,19 @@ class MemcacheRing(object): return except (Exception, Timeout) as e: self._exception_occurred(server, e, sock=sock, fp=fp) + if raise_on_error: + raise MemcacheConnectionError( + "No memcached connections succeeded.") - def get(self, key): + def get(self, key, raise_on_error=False): """ Gets the object specified by key. It will also unserialize the object before returning if it is serialized in memcache with JSON, or if it is pickled and unpickling is allowed. :param key: key + :param raise_on_error: if True, propagate Timeouts and other errors. + By default, errors are treated as cache misses. :returns: value of the key in memcache """ key = md5hash(key) @@ -378,6 +389,9 @@ class MemcacheRing(object): return value except (Exception, Timeout) as e: self._exception_occurred(server, e, sock=sock, fp=fp) + if raise_on_error: + raise MemcacheConnectionError( + "No memcached connections succeeded.") def incr(self, key, delta=1, time=0): """ diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 825fbb7c2d..e0b95107e2 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -40,6 +40,7 @@ from eventlet import sleep from eventlet.timeout import Timeout import six +from swift.common.memcached import MemcacheConnectionError from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ @@ -2400,9 +2401,13 @@ class Controller(object): if skip_chance and random.random() < skip_chance: self.logger.increment('shard_updating.cache.skip') else: - cached_ranges = memcache.get(cache_key) - self.logger.increment('shard_updating.cache.%s' % ( - 'hit' if cached_ranges else 'miss')) + try: + cached_ranges = memcache.get( + cache_key, raise_on_error=True) + cache_state = 'hit' if cached_ranges else 'miss' + except MemcacheConnectionError: + cache_state = 'error' + self.logger.increment('shard_updating.cache.%s' % cache_state) if cached_ranges: shard_ranges = [ diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index 4ac00d0106..a21a55d22e 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -20,6 +20,7 @@ import random import six from six.moves.urllib.parse import unquote +from swift.common.memcached import MemcacheConnectionError from swift.common.utils import public, private, csv_append, Timestamp, \ config_true_value, ShardRange, cache_from_env, filter_shard_ranges from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT @@ -152,9 +153,14 @@ class ContainerController(Controller): if skip_chance and random.random() < skip_chance: self.logger.increment('shard_listing.cache.skip') else: - cached_ranges = memcache.get(cache_key) - self.logger.increment('shard_listing.cache.%s' % ( - 'hit' if cached_ranges else 'miss')) + try: + cached_ranges = memcache.get( + cache_key, raise_on_error=True) + cache_state = 'hit' if cached_ranges else 'miss' + except MemcacheConnectionError: + cache_state = 'error' + self.logger.increment( + 'shard_listing.cache.%s' % cache_state) if cached_ranges is not None: infocache[cache_key] = tuple(cached_ranges) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 40d6069337..2bea8ef467 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -410,17 +410,22 @@ def track(f): class FakeMemcache(object): - def __init__(self): + def __init__(self, error_on_set=None, error_on_get=None): self.store = {} self.calls = [] self.error_on_incr = False + self.error_on_get = error_on_get or [] + self.error_on_set = error_on_set or [] self.init_incr_return_neg = False def clear_calls(self): del self.calls[:] @track - def get(self, key): + def get(self, key, raise_on_error=False): + if self.error_on_get and self.error_on_get.pop(0): + if raise_on_error: + raise MemcacheConnectionError() return self.store.get(key) @property @@ -428,7 +433,10 @@ class FakeMemcache(object): return self.store.keys @track - def set(self, key, value, serialize=True, time=0): + def set(self, key, value, serialize=True, time=0, raise_on_error=False): + if self.error_on_set and self.error_on_set.pop(0): + if raise_on_error: + raise MemcacheConnectionError() if serialize: value = json.loads(json.dumps(value)) else: diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index 06e567d868..f53381760e 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -32,6 +32,7 @@ from eventlet import GreenPool, sleep, Queue from eventlet.pools import Pool from swift.common import memcached +from swift.common.memcached import MemcacheConnectionError from swift.common.utils import md5, human_readable from mock import patch, MagicMock from test.debug_logger import debug_logger @@ -581,19 +582,27 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.4:11211: ' '[Errno 32] Broken pipe', - ] * 11 + [ - 'Error limiting server 1.2.3.4:11211' + ] * 10 + [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.4:11211', + 'All memcached servers error-limited', ]) self.logger.clear() # continued requests just keep bypassing memcache for _ in range(12): memcache_client.set('some_key', [1, 2, 3]) - self.assertEqual(self.logger.get_lines_for_level('error'), []) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'All memcached servers error-limited', + ] * 12) + self.logger.clear() # and get()s are all a "cache miss" self.assertIsNone(memcache_client.get('some_key')) - self.assertEqual(self.logger.get_lines_for_level('error'), []) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'All memcached servers error-limited', + ]) def test_error_disabled(self): memcache_client = memcached.MemcacheRing( @@ -611,6 +620,44 @@ class TestMemcached(unittest.TestCase): '[Errno 32] Broken pipe', ] * 20) + def test_error_raising(self): + memcache_client = memcached.MemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, error_limit_time=0) + mock1 = ExplodingMockMemcached() + memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool( + [(mock1, mock1)] * 20) + + # expect exception when requested... + with self.assertRaises(MemcacheConnectionError): + memcache_client.set('some_key', [1, 2, 3], raise_on_error=True) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + self.logger.clear() + + with self.assertRaises(MemcacheConnectionError): + memcache_client.get('some_key', raise_on_error=True) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + self.logger.clear() + + # ...but default is no exception + memcache_client.set('some_key', [1, 2, 3]) + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + self.logger.clear() + + memcache_client.get('some_key') + self.assertEqual(self.logger.get_lines_for_level('error'), [ + 'Error talking to memcached: 1.2.3.4:11211: ' + '[Errno 32] Broken pipe', + ]) + def test_error_limiting_custom_config(self): def do_calls(time_step, num_calls, **memcache_kwargs): self.logger.clear() @@ -632,8 +679,11 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' '[Errno 32] Broken pipe', - ] * 11 + [ - 'Error limiting server 1.2.3.5:11211' + ] * 10 + [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.5:11211', + 'All memcached servers error-limited', ]) # with default error_limit_time of 60, one call per 6 secs, error limit @@ -650,8 +700,11 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' '[Errno 32] Broken pipe', - ] * 11 + [ - 'Error limiting server 1.2.3.5:11211' + ] * 10 + [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.5:11211', + 'All memcached servers error-limited', ]) # with error_limit_time of 70, one call per 6 secs, error_limit_count @@ -660,8 +713,11 @@ class TestMemcached(unittest.TestCase): self.assertEqual(self.logger.get_lines_for_level('error'), [ 'Error talking to memcached: 1.2.3.5:11211: ' '[Errno 32] Broken pipe', - ] * 12 + [ - 'Error limiting server 1.2.3.5:11211' + ] * 11 + [ + 'Error talking to memcached: 1.2.3.5:11211: ' + '[Errno 32] Broken pipe', + 'Error limiting server 1.2.3.5:11211', + 'All memcached servers error-limited', ]) def test_delete(self): diff --git a/test/unit/proxy/controllers/test_container.py b/test/unit/proxy/controllers/test_container.py index 27e888e1f1..ac73465da2 100644 --- a/test/unit/proxy/controllers/test_container.py +++ b/test/unit/proxy/controllers/test_container.py @@ -2148,7 +2148,7 @@ class TestContainerController(TestRingBase): 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c'), + mock.call.get('shard-listing/a/c', raise_on_error=True), mock.call.set('shard-listing/a/c', self.sr_dicts, time=exp_recheck_listing), # Since there was a backend request, we go ahead and cache @@ -2177,7 +2177,7 @@ class TestContainerController(TestRingBase): 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c')], + mock.call.get('shard-listing/a/c', raise_on_error=True)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) @@ -2237,7 +2237,7 @@ class TestContainerController(TestRingBase): 'X-Backend-Sharding-State': sharding_state}) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c')], + mock.call.get('shard-listing/a/c', raise_on_error=True)], self.memcache.calls) self.assertIn('swift.infocache', req.environ) self.assertIn('shard-listing/a/c', req.environ['swift.infocache']) @@ -2390,7 +2390,7 @@ class TestContainerController(TestRingBase): # deleted from cache self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c'), + mock.call.get('shard-listing/a/c', raise_on_error=True), mock.call.set('container/a/c', mock.ANY, time=6.0)], self.memcache.calls) self.assertEqual(404, self.memcache.calls[2][1][1]['status']) @@ -2400,6 +2400,39 @@ class TestContainerController(TestRingBase): 'container.shard_listing.backend.404': 1}, self.logger.get_increment_counts()) + def test_GET_shard_ranges_read_from_cache_error(self): + self._setup_shard_range_stubs() + self.memcache = FakeMemcache() + self.memcache.delete_all() + self.logger.clear() + info = headers_to_container_info(self.root_resp_hdrs) + info['status'] = 200 + info['sharding_state'] = 'sharded' + self.memcache.set('container/a/c', info) + self.memcache.clear_calls() + self.memcache.error_on_get = [False, True] + + req = self._build_request({'X-Backend-Record-Type': 'shard'}, + {'states': 'listing'}, {}) + backend_req, resp = self._capture_backend_request( + req, 404, b'', {}, num_resp=2 * self.CONTAINER_REPLICAS) + self._check_backend_req( + req, backend_req, + extra_hdrs={'X-Backend-Record-Type': 'shard', + 'X-Backend-Override-Shard-Name-Filter': 'sharded'}) + self.assertNotIn('X-Backend-Cached-Results', resp.headers) + self.assertEqual( + [mock.call.get('container/a/c'), + mock.call.get('shard-listing/a/c', raise_on_error=True), + mock.call.set('container/a/c', mock.ANY, time=6.0)], + self.memcache.calls) + self.assertEqual(404, self.memcache.calls[2][1][1]['status']) + self.assertEqual(b'', resp.body) + self.assertEqual(404, resp.status_int) + self.assertEqual({'container.shard_listing.cache.error': 1, + 'container.shard_listing.backend.404': 1}, + self.logger.get_increment_counts()) + def _do_test_GET_shard_ranges_read_from_cache(self, params, record_type): # pre-warm cache with container metadata and shard ranges and verify # that shard range listing are read from cache when appropriate @@ -2417,7 +2450,7 @@ class TestContainerController(TestRingBase): resp = req.get_response(self.app) self.assertEqual( [mock.call.get('container/a/c'), - mock.call.get('shard-listing/a/c')], + mock.call.get('shard-listing/a/c', raise_on_error=True)], self.memcache.calls) self.assertEqual({'container.shard_listing.cache.hit': 1}, self.logger.get_increment_counts()) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 5158a09cce..a03e8b43e4 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -4413,6 +4413,23 @@ class TestReplicatedObjectController( expected[device] = '10.0.0.%d:100%d' % (i, i) self.assertEqual(container_headers, expected) + # shard lookup in memcache may error... + req = Request.blank( + '/v1/a/c/o', {'swift.cache': cache}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + cache.error_on_get = [False, True] + with mock.patch('random.random', return_value=1.0), \ + mocked_http_conn(*status_codes, headers=resp_headers, + body=body): + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + stats = self.app.logger.get_increment_counts() + self.assertEqual({'object.shard_updating.cache.skip': 1, + 'object.shard_updating.cache.hit': 1, + 'object.shard_updating.cache.error': 1, + 'object.shard_updating.backend.200': 2}, stats) + do_test('POST', 'sharding') do_test('POST', 'sharded') do_test('DELETE', 'sharding')