Merge "memcached: log user provided keys in exception error logging."

This commit is contained in:
Zuul 2023-05-17 23:20:25 +00:00 committed by Gerrit Code Review
commit 667f733cb9
2 changed files with 133 additions and 68 deletions

View File

@ -117,6 +117,13 @@ def set_msg(key, flags, timeout, value):
]) + (b'\r\n' + value + b'\r\n') ]) + (b'\r\n' + value + b'\r\n')
# get the prefix of a user provided memcache key by removing the content after
# the last '/', all current usages within swift are using prefix, such as
# "shard-updating-v2", "nvratelimit" and etc.
def get_key_prefix(key):
return key.rsplit('/', 1)[0]
class MemcacheConnectionError(Exception): class MemcacheConnectionError(Exception):
pass pass
@ -216,18 +223,24 @@ class MemcacheRing(object):
def memcache_servers(self): def memcache_servers(self):
return list(self._client_cache.keys()) return list(self._client_cache.keys())
def _exception_occurred(self, server, e, action='talking', def _exception_occurred(self, server, e, key_prefix, action='talking',
sock=None, fp=None, got_connection=True): sock=None, fp=None, got_connection=True):
if isinstance(e, Timeout): if isinstance(e, Timeout):
self.logger.error("Timeout %(action)s to memcached: %(server)s", self.logger.error(
{'action': action, 'server': server}) "Timeout %(action)s to memcached: %(server)s"
": with key_prefix %(key_prefix)s",
{'action': action, 'server': server, 'key_prefix': key_prefix})
elif isinstance(e, (socket.error, MemcacheConnectionError)): elif isinstance(e, (socket.error, MemcacheConnectionError)):
self.logger.error( self.logger.error(
"Error %(action)s to memcached: %(server)s: %(err)s", "Error %(action)s to memcached: %(server)s: "
{'action': action, 'server': server, 'err': e}) "with key_prefix %(key_prefix)s: %(err)s",
{'action': action, 'server': server, 'err': e,
'key_prefix': key_prefix})
else: else:
self.logger.exception("Error %(action)s to memcached: %(server)s", self.logger.exception("Error %(action)s to memcached: %(server)s"
{'action': action, 'server': server}) ": with key_prefix %(key_prefix)s",
{'action': action, 'server': server,
'key_prefix': key_prefix})
try: try:
if fp: if fp:
fp.close() fp.close()
@ -257,14 +270,17 @@ class MemcacheRing(object):
self._error_limited[server] = now + self._error_limit_duration self._error_limited[server] = now + self._error_limit_duration
self.logger.error('Error limiting server %s', server) self.logger.error('Error limiting server %s', server)
def _get_conns(self, key): def _get_conns(self, key_prefix, hash_key):
""" """
Retrieves a server conn from the pool, or connects a new one. Retrieves a server conn from the pool, or connects a new one.
Chooses the server based on a consistent hash of "key". Chooses the server based on a consistent hash of "key".
:param key_prefix: the prefix of user provided key.
:param hash_key: the consistent hash of user key, or server key for
set_multi and get_multi.
:return: generator to serve memcached connection :return: generator to serve memcached connection
""" """
pos = bisect(self._sorted, key) pos = bisect(self._sorted, hash_key)
served = [] served = []
any_yielded = False any_yielded = False
while len(served) < self._tries: while len(served) < self._tries:
@ -283,14 +299,14 @@ class MemcacheRing(object):
yield server, fp, sock yield server, fp, sock
except MemcachePoolTimeout as e: except MemcachePoolTimeout as e:
self._exception_occurred( self._exception_occurred(
server, e, action='getting a connection', server, e, key_prefix, action='getting a connection',
got_connection=False) got_connection=False)
except (Exception, Timeout) as e: except (Exception, Timeout) as e:
# Typically a Timeout exception caught here is the one raised # Typically a Timeout exception caught here is the one raised
# by the create() method of this server's MemcacheConnPool # by the create() method of this server's MemcacheConnPool
# object. # object.
self._exception_occurred( self._exception_occurred(
server, e, action='connecting', sock=sock) server, e, key_prefix, action='connecting', sock=sock)
if not any_yielded: if not any_yielded:
self.logger.error('All memcached servers error-limited') self.logger.error('All memcached servers error-limited')
@ -318,7 +334,8 @@ class MemcacheRing(object):
:param raise_on_error: if True, propagate Timeouts and other errors. :param raise_on_error: if True, propagate Timeouts and other errors.
By default, errors are ignored. By default, errors are ignored.
""" """
key = md5hash(key) key_prefix = get_key_prefix(key)
hash_key = md5hash(key)
timeout = sanitize_timeout(time) timeout = sanitize_timeout(time)
flags = 0 flags = 0
if serialize: if serialize:
@ -329,10 +346,10 @@ class MemcacheRing(object):
elif not isinstance(value, bytes): elif not isinstance(value, bytes):
value = str(value).encode('utf-8') value = str(value).encode('utf-8')
for (server, fp, sock) in self._get_conns(key): for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try: try:
with Timeout(self._io_timeout): with Timeout(self._io_timeout):
sock.sendall(set_msg(key, flags, timeout, value)) sock.sendall(set_msg(hash_key, flags, timeout, value))
# Wait for the set to complete # Wait for the set to complete
msg = fp.readline().strip() msg = fp.readline().strip()
if msg != b'STORED': if msg != b'STORED':
@ -352,7 +369,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return return
except (Exception, Timeout) as e: except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp) self._exception_occurred(
server, e, key_prefix, sock=sock, fp=fp)
if raise_on_error: if raise_on_error:
raise MemcacheConnectionError( raise MemcacheConnectionError(
"No memcached connections succeeded.") "No memcached connections succeeded.")
@ -368,19 +386,20 @@ class MemcacheRing(object):
By default, errors are treated as cache misses. By default, errors are treated as cache misses.
:returns: value of the key in memcache :returns: value of the key in memcache
""" """
key = md5hash(key) key_prefix = get_key_prefix(key)
hash_key = md5hash(key)
value = None value = None
for (server, fp, sock) in self._get_conns(key): for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try: try:
with Timeout(self._io_timeout): with Timeout(self._io_timeout):
sock.sendall(b'get ' + key + b'\r\n') sock.sendall(b'get ' + hash_key + b'\r\n')
line = fp.readline().strip().split() line = fp.readline().strip().split()
while True: while True:
if not line: if not line:
raise MemcacheConnectionError('incomplete read') raise MemcacheConnectionError('incomplete read')
if line[0].upper() == b'END': if line[0].upper() == b'END':
break break
if line[0].upper() == b'VALUE' and line[1] == key: if line[0].upper() == b'VALUE' and line[1] == hash_key:
size = int(line[3]) size = int(line[3])
value = fp.read(size) value = fp.read(size)
if int(line[2]) & PICKLE_FLAG: if int(line[2]) & PICKLE_FLAG:
@ -392,7 +411,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return value return value
except (Exception, Timeout) as e: except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp) self._exception_occurred(
server, e, key_prefix, sock=sock, fp=fp)
if raise_on_error: if raise_on_error:
raise MemcacheConnectionError( raise MemcacheConnectionError(
"No memcached connections succeeded.") "No memcached connections succeeded.")
@ -415,17 +435,18 @@ class MemcacheRing(object):
:returns: result of incrementing :returns: result of incrementing
:raises MemcacheConnectionError: :raises MemcacheConnectionError:
""" """
key = md5hash(key) key_prefix = get_key_prefix(key)
hash_key = md5hash(key)
command = b'incr' command = b'incr'
if delta < 0: if delta < 0:
command = b'decr' command = b'decr'
delta = str(abs(int(delta))).encode('ascii') delta = str(abs(int(delta))).encode('ascii')
timeout = sanitize_timeout(time) timeout = sanitize_timeout(time)
for (server, fp, sock) in self._get_conns(key): for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try: try:
with Timeout(self._io_timeout): with Timeout(self._io_timeout):
sock.sendall(b' '.join([ sock.sendall(b' '.join([
command, key, delta]) + b'\r\n') command, hash_key, delta]) + b'\r\n')
line = fp.readline().strip().split() line = fp.readline().strip().split()
if not line: if not line:
raise MemcacheConnectionError('incomplete read') raise MemcacheConnectionError('incomplete read')
@ -433,14 +454,16 @@ class MemcacheRing(object):
add_val = delta add_val = delta
if command == b'decr': if command == b'decr':
add_val = b'0' add_val = b'0'
sock.sendall(b' '.join([ sock.sendall(
b'add', key, b'0', str(timeout).encode('ascii'), b' '.join(
[b'add', hash_key, b'0', str(timeout).encode(
'ascii'),
str(len(add_val)).encode('ascii') str(len(add_val)).encode('ascii')
]) + b'\r\n' + add_val + b'\r\n') ]) + b'\r\n' + add_val + b'\r\n')
line = fp.readline().strip().split() line = fp.readline().strip().split()
if line[0].upper() == b'NOT_STORED': if line[0].upper() == b'NOT_STORED':
sock.sendall(b' '.join([ sock.sendall(b' '.join([
command, key, delta]) + b'\r\n') command, hash_key, delta]) + b'\r\n')
line = fp.readline().strip().split() line = fp.readline().strip().split()
ret = int(line[0].strip()) ret = int(line[0].strip())
else: else:
@ -450,7 +473,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return ret return ret
except (Exception, Timeout) as e: except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp) self._exception_occurred(
server, e, key_prefix, sock=sock, fp=fp)
raise MemcacheConnectionError("No Memcached connections succeeded.") raise MemcacheConnectionError("No Memcached connections succeeded.")
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW)
@ -478,18 +502,20 @@ class MemcacheRing(object):
:param server_key: key to use in determining which server in the ring :param server_key: key to use in determining which server in the ring
is used is used
""" """
key = md5hash(key) key_prefix = get_key_prefix(key)
server_key = md5hash(server_key) if server_key else key hash_key = md5hash(key)
for (server, fp, sock) in self._get_conns(server_key): server_key = md5hash(server_key) if server_key else hash_key
for (server, fp, sock) in self._get_conns(key_prefix, server_key):
try: try:
with Timeout(self._io_timeout): with Timeout(self._io_timeout):
sock.sendall(b'delete ' + key + b'\r\n') sock.sendall(b'delete ' + hash_key + b'\r\n')
# Wait for the delete to complete # Wait for the delete to complete
fp.readline() fp.readline()
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return return
except (Exception, Timeout) as e: except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp) self._exception_occurred(
server, e, key_prefix, sock=sock, fp=fp)
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
def set_multi(self, mapping, server_key, serialize=True, time=0, def set_multi(self, mapping, server_key, serialize=True, time=0,
@ -508,7 +534,8 @@ class MemcacheRing(object):
python-memcached interface. This implementation python-memcached interface. This implementation
ignores it ignores it
""" """
server_key = md5hash(server_key) key_prefix = get_key_prefix(server_key)
hash_key = md5hash(server_key)
timeout = sanitize_timeout(time) timeout = sanitize_timeout(time)
msg = [] msg = []
for key, value in mapping.items(): for key, value in mapping.items():
@ -520,7 +547,7 @@ class MemcacheRing(object):
value = json.dumps(value).encode('ascii') value = json.dumps(value).encode('ascii')
flags |= JSON_FLAG flags |= JSON_FLAG
msg.append(set_msg(key, flags, timeout, value)) msg.append(set_msg(key, flags, timeout, value))
for (server, fp, sock) in self._get_conns(server_key): for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
try: try:
with Timeout(self._io_timeout): with Timeout(self._io_timeout):
sock.sendall(b''.join(msg)) sock.sendall(b''.join(msg))
@ -530,7 +557,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return return
except (Exception, Timeout) as e: except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp) self._exception_occurred(
server, e, key_prefix, sock=sock, fp=fp)
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH) @memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
def get_multi(self, keys, server_key): def get_multi(self, keys, server_key):
@ -542,12 +570,13 @@ class MemcacheRing(object):
is used is used
:returns: list of values :returns: list of values
""" """
key_prefix = get_key_prefix(server_key)
server_key = md5hash(server_key) server_key = md5hash(server_key)
keys = [md5hash(key) for key in keys] hash_keys = [md5hash(key) for key in keys]
for (server, fp, sock) in self._get_conns(server_key): for (server, fp, sock) in self._get_conns(key_prefix, server_key):
try: try:
with Timeout(self._io_timeout): with Timeout(self._io_timeout):
sock.sendall(b'get ' + b' '.join(keys) + b'\r\n') sock.sendall(b'get ' + b' '.join(hash_keys) + b'\r\n')
line = fp.readline().strip().split() line = fp.readline().strip().split()
responses = {} responses = {}
while True: while True:
@ -566,7 +595,7 @@ class MemcacheRing(object):
fp.readline() fp.readline()
line = fp.readline().strip().split() line = fp.readline().strip().split()
values = [] values = []
for key in keys: for key in hash_keys:
if key in responses: if key in responses:
values.append(responses[key]) values.append(responses[key])
else: else:
@ -574,7 +603,8 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return values return values
except (Exception, Timeout) as e: except (Exception, Timeout) as e:
self._exception_occurred(server, e, sock=sock, fp=fp) self._exception_occurred(
server, e, key_prefix, sock=sock, fp=fp)
def load_memcache(conf, logger): def load_memcache(conf, logger):

View File

@ -34,7 +34,8 @@ from eventlet.pools import Pool
from eventlet.green import ssl from eventlet.green import ssl
from swift.common import memcached from swift.common import memcached
from swift.common.memcached import MemcacheConnectionError from swift.common.memcached import MemcacheConnectionError, md5hash, \
get_key_prefix
from swift.common.utils import md5, human_readable from swift.common.utils import md5, human_readable
from mock import patch, MagicMock from mock import patch, MagicMock
from test.debug_logger import debug_logger from test.debug_logger import debug_logger
@ -200,6 +201,21 @@ class TestMemcached(unittest.TestCase):
def setUp(self): def setUp(self):
self.logger = debug_logger() self.logger = debug_logger()
def test_get_key_prefix(self):
self.assertEqual(
get_key_prefix("shard-updating-v2/a/c"),
"shard-updating-v2/a")
self.assertEqual(
get_key_prefix("shard-listing-v2/accout/container3"),
"shard-listing-v2/accout")
self.assertEqual(
get_key_prefix("auth_reseller_name/token/X58E34EL2SDFLEY3"),
"auth_reseller_name/token")
self.assertEqual(
get_key_prefix("nvratelimit/v2/wf/2345392374"),
"nvratelimit/v2/wf")
self.assertEqual(get_key_prefix("some_key"), "some_key")
def test_logger_kwarg(self): def test_logger_kwarg(self):
server_socket = '%s:%s' % ('[::1]', 11211) server_socket = '%s:%s' % ('[::1]', 11211)
client = memcached.MemcacheRing([server_socket]) client = memcached.MemcacheRing([server_socket])
@ -219,7 +235,7 @@ class TestMemcached(unittest.TestCase):
self.assertIs(client._client_cache[server]._tls_context, context) self.assertIs(client._client_cache[server]._tls_context, context)
key = uuid4().hex.encode('ascii') key = uuid4().hex.encode('ascii')
list(client._get_conns(key)) list(client._get_conns('test', key))
context.wrap_socket.assert_called_once() context.wrap_socket.assert_called_once()
def test_get_conns(self): def test_get_conns(self):
@ -241,7 +257,7 @@ class TestMemcached(unittest.TestCase):
one = two = True one = two = True
while one or two: # Run until we match hosts one and two while one or two: # Run until we match hosts one and two
key = uuid4().hex.encode('ascii') key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key): for conn in memcache_client._get_conns('test', key):
if 'b' not in getattr(conn[1], 'mode', ''): if 'b' not in getattr(conn[1], 'mode', ''):
self.assertIsInstance(conn[1], ( self.assertIsInstance(conn[1], (
io.RawIOBase, io.BufferedIOBase)) io.RawIOBase, io.BufferedIOBase))
@ -268,7 +284,7 @@ class TestMemcached(unittest.TestCase):
memcache_client = memcached.MemcacheRing([server_socket], memcache_client = memcached.MemcacheRing([server_socket],
logger=self.logger) logger=self.logger)
key = uuid4().hex.encode('ascii') key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key): for conn in memcache_client._get_conns('test', key):
peer_sockaddr = conn[2].getpeername() peer_sockaddr = conn[2].getpeername()
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
self.assertEqual(peer_socket, server_socket) self.assertEqual(peer_socket, server_socket)
@ -290,7 +306,7 @@ class TestMemcached(unittest.TestCase):
memcache_client = memcached.MemcacheRing([server_host], memcache_client = memcached.MemcacheRing([server_host],
logger=self.logger) logger=self.logger)
key = uuid4().hex.encode('ascii') key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key): for conn in memcache_client._get_conns('test', key):
peer_sockaddr = conn[2].getpeername() peer_sockaddr = conn[2].getpeername()
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1]) peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
self.assertEqual(peer_socket, server_socket) self.assertEqual(peer_socket, server_socket)
@ -319,7 +335,7 @@ class TestMemcached(unittest.TestCase):
memcache_client = memcached.MemcacheRing([server_socket], memcache_client = memcached.MemcacheRing([server_socket],
logger=self.logger) logger=self.logger)
key = uuid4().hex.encode('ascii') key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key): for conn in memcache_client._get_conns('test', key):
peer_sockaddr = conn[2].getpeername() peer_sockaddr = conn[2].getpeername()
peer_socket = '%s:%s' % (peer_sockaddr[0], peer_socket = '%s:%s' % (peer_sockaddr[0],
peer_sockaddr[1]) peer_sockaddr[1])
@ -345,7 +361,7 @@ class TestMemcached(unittest.TestCase):
memcache_client = memcached.MemcacheRing([server_socket], memcache_client = memcached.MemcacheRing([server_socket],
logger=self.logger) logger=self.logger)
key = uuid4().hex.encode('ascii') key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key): for conn in memcache_client._get_conns('test', key):
peer_sockaddr = conn[2].getpeername() peer_sockaddr = conn[2].getpeername()
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_socket = '[%s]:%s' % (peer_sockaddr[0],
peer_sockaddr[1]) peer_sockaddr[1])
@ -539,7 +555,7 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(mock1.exploded, True) self.assertEqual(mock1.exploded, True)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
]) ])
self.logger.clear() self.logger.clear()
@ -548,7 +564,7 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(mock1.exploded, True) self.assertEqual(mock1.exploded, True)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
]) ])
# Check that we really did call create() twice # Check that we really did call create() twice
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks, self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
@ -571,7 +587,7 @@ class TestMemcached(unittest.TestCase):
# to .4 # to .4
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
] * 11 + [ ] * 11 + [
'Error limiting server 1.2.3.5:11211' 'Error limiting server 1.2.3.5:11211'
]) ])
@ -583,10 +599,10 @@ class TestMemcached(unittest.TestCase):
# as we keep going, eventually .4 gets error limited, too # as we keep going, eventually .4 gets error limited, too
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: ' 'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
] * 10 + [ ] * 10 + [
'Error talking to memcached: 1.2.3.4:11211: ' 'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
'Error limiting server 1.2.3.4:11211', 'Error limiting server 1.2.3.4:11211',
'All memcached servers error-limited', 'All memcached servers error-limited',
]) ])
@ -619,7 +635,7 @@ class TestMemcached(unittest.TestCase):
# to .4 # to .4
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: ' 'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
] * 20) ] * 20)
def test_error_raising(self): def test_error_raising(self):
@ -634,7 +650,7 @@ class TestMemcached(unittest.TestCase):
memcache_client.set('some_key', [1, 2, 3], raise_on_error=True) memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: ' 'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
]) ])
self.logger.clear() self.logger.clear()
@ -642,7 +658,17 @@ class TestMemcached(unittest.TestCase):
memcache_client.get('some_key', raise_on_error=True) memcache_client.get('some_key', raise_on_error=True)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: ' 'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
])
self.logger.clear()
with self.assertRaises(MemcacheConnectionError):
memcache_client.set(
'shard-updating-v2/acc/container', [1, 2, 3],
raise_on_error=True)
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe',
]) ])
self.logger.clear() self.logger.clear()
@ -650,14 +676,21 @@ class TestMemcached(unittest.TestCase):
memcache_client.set('some_key', [1, 2, 3]) memcache_client.set('some_key', [1, 2, 3])
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: ' 'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
]) ])
self.logger.clear() self.logger.clear()
memcache_client.get('some_key') memcache_client.get('some_key')
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: ' 'Error talking to memcached: 1.2.3.4:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
])
self.logger.clear()
memcache_client.set('shard-updating-v2/acc/container', [1, 2, 3])
self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.4:11211: '
'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe',
]) ])
def test_error_limiting_custom_config(self): def test_error_limiting_custom_config(self):
@ -680,10 +713,10 @@ class TestMemcached(unittest.TestCase):
do_calls(5, 12) do_calls(5, 12)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
] * 10 + [ ] * 10 + [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
'Error limiting server 1.2.3.5:11211', 'Error limiting server 1.2.3.5:11211',
'All memcached servers error-limited', 'All memcached servers error-limited',
]) ])
@ -693,7 +726,7 @@ class TestMemcached(unittest.TestCase):
do_calls(6, 20) do_calls(6, 20)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
] * 20) ] * 20)
# with error_limit_time of 66, one call per 6 secs, twelfth one # with error_limit_time of 66, one call per 6 secs, twelfth one
@ -701,10 +734,10 @@ class TestMemcached(unittest.TestCase):
do_calls(6, 12, error_limit_time=66) do_calls(6, 12, error_limit_time=66)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
] * 10 + [ ] * 10 + [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
'Error limiting server 1.2.3.5:11211', 'Error limiting server 1.2.3.5:11211',
'All memcached servers error-limited', 'All memcached servers error-limited',
]) ])
@ -714,10 +747,10 @@ class TestMemcached(unittest.TestCase):
do_calls(6, 13, error_limit_time=70, error_limit_count=11) do_calls(6, 13, error_limit_time=70, error_limit_count=11)
self.assertEqual(self.logger.get_lines_for_level('error'), [ self.assertEqual(self.logger.get_lines_for_level('error'), [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
] * 11 + [ ] * 11 + [
'Error talking to memcached: 1.2.3.5:11211: ' 'Error talking to memcached: 1.2.3.5:11211: '
'[Errno 32] Broken pipe', 'with key_prefix some_key: [Errno 32] Broken pipe',
'Error limiting server 1.2.3.5:11211', 'Error limiting server 1.2.3.5:11211',
'All memcached servers error-limited', 'All memcached servers error-limited',
]) ])
@ -953,7 +986,8 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8) self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
self.assertEqual( self.assertEqual(
self.logger.get_lines_for_level('error'), self.logger.get_lines_for_level('error'),
['Timeout getting a connection to memcached: 1.2.3.5:11211'] * 8) ['Timeout getting a connection to memcached: 1.2.3.5:11211'
': with key_prefix key'] * 8)
self.assertEqual(served['1.2.3.5'], 2) self.assertEqual(served['1.2.3.5'], 2)
self.assertEqual(pending['1.2.3.4'], 0) self.assertEqual(pending['1.2.3.4'], 0)
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0) self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
@ -992,7 +1026,8 @@ class TestMemcached(unittest.TestCase):
# try to get connect and no connection found # try to get connect and no connection found
# so it will result in StopIteration # so it will result in StopIteration
conn_generator = memcache_client._get_conns(b'key') conn_generator = memcache_client._get_conns(
'key', md5hash(b'key'))
with self.assertRaises(StopIteration): with self.assertRaises(StopIteration):
next(conn_generator) next(conn_generator)