memcached: log user provided keys in exception error logging.
User provided keys are need to debug those tracebacks/timeouts when clients talking to memcached, in order to associate those failures with specific memcache usages within swift services. Change-Id: I07491bb4ebc3baa13cf09f64a04a61011d561409
This commit is contained in:
parent
f99a6e5762
commit
9fb860880d
@ -117,6 +117,13 @@ def set_msg(key, flags, timeout, value):
|
|||||||
]) + (b'\r\n' + value + b'\r\n')
|
]) + (b'\r\n' + value + b'\r\n')
|
||||||
|
|
||||||
|
|
||||||
|
# get the prefix of a user provided memcache key by removing the content after
|
||||||
|
# the last '/', all current usages within swift are using prefix, such as
|
||||||
|
# "shard-updating-v2", "nvratelimit" and etc.
|
||||||
|
def get_key_prefix(key):
|
||||||
|
return key.rsplit('/', 1)[0]
|
||||||
|
|
||||||
|
|
||||||
class MemcacheConnectionError(Exception):
|
class MemcacheConnectionError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -216,18 +223,24 @@ class MemcacheRing(object):
|
|||||||
def memcache_servers(self):
|
def memcache_servers(self):
|
||||||
return list(self._client_cache.keys())
|
return list(self._client_cache.keys())
|
||||||
|
|
||||||
def _exception_occurred(self, server, e, action='talking',
|
def _exception_occurred(self, server, e, key_prefix, action='talking',
|
||||||
sock=None, fp=None, got_connection=True):
|
sock=None, fp=None, got_connection=True):
|
||||||
if isinstance(e, Timeout):
|
if isinstance(e, Timeout):
|
||||||
self.logger.error("Timeout %(action)s to memcached: %(server)s",
|
self.logger.error(
|
||||||
{'action': action, 'server': server})
|
"Timeout %(action)s to memcached: %(server)s"
|
||||||
|
": with key_prefix %(key_prefix)s",
|
||||||
|
{'action': action, 'server': server, 'key_prefix': key_prefix})
|
||||||
elif isinstance(e, (socket.error, MemcacheConnectionError)):
|
elif isinstance(e, (socket.error, MemcacheConnectionError)):
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
"Error %(action)s to memcached: %(server)s: %(err)s",
|
"Error %(action)s to memcached: %(server)s: "
|
||||||
{'action': action, 'server': server, 'err': e})
|
"with key_prefix %(key_prefix)s: %(err)s",
|
||||||
|
{'action': action, 'server': server, 'err': e,
|
||||||
|
'key_prefix': key_prefix})
|
||||||
else:
|
else:
|
||||||
self.logger.exception("Error %(action)s to memcached: %(server)s",
|
self.logger.exception("Error %(action)s to memcached: %(server)s"
|
||||||
{'action': action, 'server': server})
|
": with key_prefix %(key_prefix)s",
|
||||||
|
{'action': action, 'server': server,
|
||||||
|
'key_prefix': key_prefix})
|
||||||
try:
|
try:
|
||||||
if fp:
|
if fp:
|
||||||
fp.close()
|
fp.close()
|
||||||
@ -257,14 +270,17 @@ class MemcacheRing(object):
|
|||||||
self._error_limited[server] = now + self._error_limit_duration
|
self._error_limited[server] = now + self._error_limit_duration
|
||||||
self.logger.error('Error limiting server %s', server)
|
self.logger.error('Error limiting server %s', server)
|
||||||
|
|
||||||
def _get_conns(self, key):
|
def _get_conns(self, key_prefix, hash_key):
|
||||||
"""
|
"""
|
||||||
Retrieves a server conn from the pool, or connects a new one.
|
Retrieves a server conn from the pool, or connects a new one.
|
||||||
Chooses the server based on a consistent hash of "key".
|
Chooses the server based on a consistent hash of "key".
|
||||||
|
|
||||||
|
:param key_prefix: the prefix of user provided key.
|
||||||
|
:param hash_key: the consistent hash of user key, or server key for
|
||||||
|
set_multi and get_multi.
|
||||||
:return: generator to serve memcached connection
|
:return: generator to serve memcached connection
|
||||||
"""
|
"""
|
||||||
pos = bisect(self._sorted, key)
|
pos = bisect(self._sorted, hash_key)
|
||||||
served = []
|
served = []
|
||||||
any_yielded = False
|
any_yielded = False
|
||||||
while len(served) < self._tries:
|
while len(served) < self._tries:
|
||||||
@ -283,14 +299,14 @@ class MemcacheRing(object):
|
|||||||
yield server, fp, sock
|
yield server, fp, sock
|
||||||
except MemcachePoolTimeout as e:
|
except MemcachePoolTimeout as e:
|
||||||
self._exception_occurred(
|
self._exception_occurred(
|
||||||
server, e, action='getting a connection',
|
server, e, key_prefix, action='getting a connection',
|
||||||
got_connection=False)
|
got_connection=False)
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
# Typically a Timeout exception caught here is the one raised
|
# Typically a Timeout exception caught here is the one raised
|
||||||
# by the create() method of this server's MemcacheConnPool
|
# by the create() method of this server's MemcacheConnPool
|
||||||
# object.
|
# object.
|
||||||
self._exception_occurred(
|
self._exception_occurred(
|
||||||
server, e, action='connecting', sock=sock)
|
server, e, key_prefix, action='connecting', sock=sock)
|
||||||
if not any_yielded:
|
if not any_yielded:
|
||||||
self.logger.error('All memcached servers error-limited')
|
self.logger.error('All memcached servers error-limited')
|
||||||
|
|
||||||
@ -318,7 +334,8 @@ class MemcacheRing(object):
|
|||||||
:param raise_on_error: if True, propagate Timeouts and other errors.
|
:param raise_on_error: if True, propagate Timeouts and other errors.
|
||||||
By default, errors are ignored.
|
By default, errors are ignored.
|
||||||
"""
|
"""
|
||||||
key = md5hash(key)
|
key_prefix = get_key_prefix(key)
|
||||||
|
hash_key = md5hash(key)
|
||||||
timeout = sanitize_timeout(time)
|
timeout = sanitize_timeout(time)
|
||||||
flags = 0
|
flags = 0
|
||||||
if serialize:
|
if serialize:
|
||||||
@ -329,10 +346,10 @@ class MemcacheRing(object):
|
|||||||
elif not isinstance(value, bytes):
|
elif not isinstance(value, bytes):
|
||||||
value = str(value).encode('utf-8')
|
value = str(value).encode('utf-8')
|
||||||
|
|
||||||
for (server, fp, sock) in self._get_conns(key):
|
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||||
try:
|
try:
|
||||||
with Timeout(self._io_timeout):
|
with Timeout(self._io_timeout):
|
||||||
sock.sendall(set_msg(key, flags, timeout, value))
|
sock.sendall(set_msg(hash_key, flags, timeout, value))
|
||||||
# Wait for the set to complete
|
# Wait for the set to complete
|
||||||
msg = fp.readline().strip()
|
msg = fp.readline().strip()
|
||||||
if msg != b'STORED':
|
if msg != b'STORED':
|
||||||
@ -352,7 +369,8 @@ class MemcacheRing(object):
|
|||||||
self._return_conn(server, fp, sock)
|
self._return_conn(server, fp, sock)
|
||||||
return
|
return
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(
|
||||||
|
server, e, key_prefix, sock=sock, fp=fp)
|
||||||
if raise_on_error:
|
if raise_on_error:
|
||||||
raise MemcacheConnectionError(
|
raise MemcacheConnectionError(
|
||||||
"No memcached connections succeeded.")
|
"No memcached connections succeeded.")
|
||||||
@ -368,19 +386,20 @@ class MemcacheRing(object):
|
|||||||
By default, errors are treated as cache misses.
|
By default, errors are treated as cache misses.
|
||||||
:returns: value of the key in memcache
|
:returns: value of the key in memcache
|
||||||
"""
|
"""
|
||||||
key = md5hash(key)
|
key_prefix = get_key_prefix(key)
|
||||||
|
hash_key = md5hash(key)
|
||||||
value = None
|
value = None
|
||||||
for (server, fp, sock) in self._get_conns(key):
|
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||||
try:
|
try:
|
||||||
with Timeout(self._io_timeout):
|
with Timeout(self._io_timeout):
|
||||||
sock.sendall(b'get ' + key + b'\r\n')
|
sock.sendall(b'get ' + hash_key + b'\r\n')
|
||||||
line = fp.readline().strip().split()
|
line = fp.readline().strip().split()
|
||||||
while True:
|
while True:
|
||||||
if not line:
|
if not line:
|
||||||
raise MemcacheConnectionError('incomplete read')
|
raise MemcacheConnectionError('incomplete read')
|
||||||
if line[0].upper() == b'END':
|
if line[0].upper() == b'END':
|
||||||
break
|
break
|
||||||
if line[0].upper() == b'VALUE' and line[1] == key:
|
if line[0].upper() == b'VALUE' and line[1] == hash_key:
|
||||||
size = int(line[3])
|
size = int(line[3])
|
||||||
value = fp.read(size)
|
value = fp.read(size)
|
||||||
if int(line[2]) & PICKLE_FLAG:
|
if int(line[2]) & PICKLE_FLAG:
|
||||||
@ -392,7 +411,8 @@ class MemcacheRing(object):
|
|||||||
self._return_conn(server, fp, sock)
|
self._return_conn(server, fp, sock)
|
||||||
return value
|
return value
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(
|
||||||
|
server, e, key_prefix, sock=sock, fp=fp)
|
||||||
if raise_on_error:
|
if raise_on_error:
|
||||||
raise MemcacheConnectionError(
|
raise MemcacheConnectionError(
|
||||||
"No memcached connections succeeded.")
|
"No memcached connections succeeded.")
|
||||||
@ -415,17 +435,18 @@ class MemcacheRing(object):
|
|||||||
:returns: result of incrementing
|
:returns: result of incrementing
|
||||||
:raises MemcacheConnectionError:
|
:raises MemcacheConnectionError:
|
||||||
"""
|
"""
|
||||||
key = md5hash(key)
|
key_prefix = get_key_prefix(key)
|
||||||
|
hash_key = md5hash(key)
|
||||||
command = b'incr'
|
command = b'incr'
|
||||||
if delta < 0:
|
if delta < 0:
|
||||||
command = b'decr'
|
command = b'decr'
|
||||||
delta = str(abs(int(delta))).encode('ascii')
|
delta = str(abs(int(delta))).encode('ascii')
|
||||||
timeout = sanitize_timeout(time)
|
timeout = sanitize_timeout(time)
|
||||||
for (server, fp, sock) in self._get_conns(key):
|
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||||
try:
|
try:
|
||||||
with Timeout(self._io_timeout):
|
with Timeout(self._io_timeout):
|
||||||
sock.sendall(b' '.join([
|
sock.sendall(b' '.join([
|
||||||
command, key, delta]) + b'\r\n')
|
command, hash_key, delta]) + b'\r\n')
|
||||||
line = fp.readline().strip().split()
|
line = fp.readline().strip().split()
|
||||||
if not line:
|
if not line:
|
||||||
raise MemcacheConnectionError('incomplete read')
|
raise MemcacheConnectionError('incomplete read')
|
||||||
@ -433,14 +454,16 @@ class MemcacheRing(object):
|
|||||||
add_val = delta
|
add_val = delta
|
||||||
if command == b'decr':
|
if command == b'decr':
|
||||||
add_val = b'0'
|
add_val = b'0'
|
||||||
sock.sendall(b' '.join([
|
sock.sendall(
|
||||||
b'add', key, b'0', str(timeout).encode('ascii'),
|
b' '.join(
|
||||||
str(len(add_val)).encode('ascii')
|
[b'add', hash_key, b'0', str(timeout).encode(
|
||||||
]) + b'\r\n' + add_val + b'\r\n')
|
'ascii'),
|
||||||
|
str(len(add_val)).encode('ascii')
|
||||||
|
]) + b'\r\n' + add_val + b'\r\n')
|
||||||
line = fp.readline().strip().split()
|
line = fp.readline().strip().split()
|
||||||
if line[0].upper() == b'NOT_STORED':
|
if line[0].upper() == b'NOT_STORED':
|
||||||
sock.sendall(b' '.join([
|
sock.sendall(b' '.join([
|
||||||
command, key, delta]) + b'\r\n')
|
command, hash_key, delta]) + b'\r\n')
|
||||||
line = fp.readline().strip().split()
|
line = fp.readline().strip().split()
|
||||||
ret = int(line[0].strip())
|
ret = int(line[0].strip())
|
||||||
else:
|
else:
|
||||||
@ -450,7 +473,8 @@ class MemcacheRing(object):
|
|||||||
self._return_conn(server, fp, sock)
|
self._return_conn(server, fp, sock)
|
||||||
return ret
|
return ret
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(
|
||||||
|
server, e, key_prefix, sock=sock, fp=fp)
|
||||||
raise MemcacheConnectionError("No Memcached connections succeeded.")
|
raise MemcacheConnectionError("No Memcached connections succeeded.")
|
||||||
|
|
||||||
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW)
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW)
|
||||||
@ -478,18 +502,20 @@ class MemcacheRing(object):
|
|||||||
:param server_key: key to use in determining which server in the ring
|
:param server_key: key to use in determining which server in the ring
|
||||||
is used
|
is used
|
||||||
"""
|
"""
|
||||||
key = md5hash(key)
|
key_prefix = get_key_prefix(key)
|
||||||
server_key = md5hash(server_key) if server_key else key
|
hash_key = md5hash(key)
|
||||||
for (server, fp, sock) in self._get_conns(server_key):
|
server_key = md5hash(server_key) if server_key else hash_key
|
||||||
|
for (server, fp, sock) in self._get_conns(key_prefix, server_key):
|
||||||
try:
|
try:
|
||||||
with Timeout(self._io_timeout):
|
with Timeout(self._io_timeout):
|
||||||
sock.sendall(b'delete ' + key + b'\r\n')
|
sock.sendall(b'delete ' + hash_key + b'\r\n')
|
||||||
# Wait for the delete to complete
|
# Wait for the delete to complete
|
||||||
fp.readline()
|
fp.readline()
|
||||||
self._return_conn(server, fp, sock)
|
self._return_conn(server, fp, sock)
|
||||||
return
|
return
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(
|
||||||
|
server, e, key_prefix, sock=sock, fp=fp)
|
||||||
|
|
||||||
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||||
def set_multi(self, mapping, server_key, serialize=True, time=0,
|
def set_multi(self, mapping, server_key, serialize=True, time=0,
|
||||||
@ -508,7 +534,8 @@ class MemcacheRing(object):
|
|||||||
python-memcached interface. This implementation
|
python-memcached interface. This implementation
|
||||||
ignores it
|
ignores it
|
||||||
"""
|
"""
|
||||||
server_key = md5hash(server_key)
|
key_prefix = get_key_prefix(server_key)
|
||||||
|
hash_key = md5hash(server_key)
|
||||||
timeout = sanitize_timeout(time)
|
timeout = sanitize_timeout(time)
|
||||||
msg = []
|
msg = []
|
||||||
for key, value in mapping.items():
|
for key, value in mapping.items():
|
||||||
@ -520,7 +547,7 @@ class MemcacheRing(object):
|
|||||||
value = json.dumps(value).encode('ascii')
|
value = json.dumps(value).encode('ascii')
|
||||||
flags |= JSON_FLAG
|
flags |= JSON_FLAG
|
||||||
msg.append(set_msg(key, flags, timeout, value))
|
msg.append(set_msg(key, flags, timeout, value))
|
||||||
for (server, fp, sock) in self._get_conns(server_key):
|
for (server, fp, sock) in self._get_conns(key_prefix, hash_key):
|
||||||
try:
|
try:
|
||||||
with Timeout(self._io_timeout):
|
with Timeout(self._io_timeout):
|
||||||
sock.sendall(b''.join(msg))
|
sock.sendall(b''.join(msg))
|
||||||
@ -530,7 +557,8 @@ class MemcacheRing(object):
|
|||||||
self._return_conn(server, fp, sock)
|
self._return_conn(server, fp, sock)
|
||||||
return
|
return
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(
|
||||||
|
server, e, key_prefix, sock=sock, fp=fp)
|
||||||
|
|
||||||
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||||
def get_multi(self, keys, server_key):
|
def get_multi(self, keys, server_key):
|
||||||
@ -542,12 +570,13 @@ class MemcacheRing(object):
|
|||||||
is used
|
is used
|
||||||
:returns: list of values
|
:returns: list of values
|
||||||
"""
|
"""
|
||||||
|
key_prefix = get_key_prefix(server_key)
|
||||||
server_key = md5hash(server_key)
|
server_key = md5hash(server_key)
|
||||||
keys = [md5hash(key) for key in keys]
|
hash_keys = [md5hash(key) for key in keys]
|
||||||
for (server, fp, sock) in self._get_conns(server_key):
|
for (server, fp, sock) in self._get_conns(key_prefix, server_key):
|
||||||
try:
|
try:
|
||||||
with Timeout(self._io_timeout):
|
with Timeout(self._io_timeout):
|
||||||
sock.sendall(b'get ' + b' '.join(keys) + b'\r\n')
|
sock.sendall(b'get ' + b' '.join(hash_keys) + b'\r\n')
|
||||||
line = fp.readline().strip().split()
|
line = fp.readline().strip().split()
|
||||||
responses = {}
|
responses = {}
|
||||||
while True:
|
while True:
|
||||||
@ -566,7 +595,7 @@ class MemcacheRing(object):
|
|||||||
fp.readline()
|
fp.readline()
|
||||||
line = fp.readline().strip().split()
|
line = fp.readline().strip().split()
|
||||||
values = []
|
values = []
|
||||||
for key in keys:
|
for key in hash_keys:
|
||||||
if key in responses:
|
if key in responses:
|
||||||
values.append(responses[key])
|
values.append(responses[key])
|
||||||
else:
|
else:
|
||||||
@ -574,7 +603,8 @@ class MemcacheRing(object):
|
|||||||
self._return_conn(server, fp, sock)
|
self._return_conn(server, fp, sock)
|
||||||
return values
|
return values
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(
|
||||||
|
server, e, key_prefix, sock=sock, fp=fp)
|
||||||
|
|
||||||
|
|
||||||
def load_memcache(conf, logger):
|
def load_memcache(conf, logger):
|
||||||
|
@ -34,7 +34,8 @@ from eventlet.pools import Pool
|
|||||||
from eventlet.green import ssl
|
from eventlet.green import ssl
|
||||||
|
|
||||||
from swift.common import memcached
|
from swift.common import memcached
|
||||||
from swift.common.memcached import MemcacheConnectionError
|
from swift.common.memcached import MemcacheConnectionError, md5hash, \
|
||||||
|
get_key_prefix
|
||||||
from swift.common.utils import md5, human_readable
|
from swift.common.utils import md5, human_readable
|
||||||
from mock import patch, MagicMock
|
from mock import patch, MagicMock
|
||||||
from test.debug_logger import debug_logger
|
from test.debug_logger import debug_logger
|
||||||
@ -200,6 +201,21 @@ class TestMemcached(unittest.TestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.logger = debug_logger()
|
self.logger = debug_logger()
|
||||||
|
|
||||||
|
def test_get_key_prefix(self):
|
||||||
|
self.assertEqual(
|
||||||
|
get_key_prefix("shard-updating-v2/a/c"),
|
||||||
|
"shard-updating-v2/a")
|
||||||
|
self.assertEqual(
|
||||||
|
get_key_prefix("shard-listing-v2/accout/container3"),
|
||||||
|
"shard-listing-v2/accout")
|
||||||
|
self.assertEqual(
|
||||||
|
get_key_prefix("auth_reseller_name/token/X58E34EL2SDFLEY3"),
|
||||||
|
"auth_reseller_name/token")
|
||||||
|
self.assertEqual(
|
||||||
|
get_key_prefix("nvratelimit/v2/wf/2345392374"),
|
||||||
|
"nvratelimit/v2/wf")
|
||||||
|
self.assertEqual(get_key_prefix("some_key"), "some_key")
|
||||||
|
|
||||||
def test_logger_kwarg(self):
|
def test_logger_kwarg(self):
|
||||||
server_socket = '%s:%s' % ('[::1]', 11211)
|
server_socket = '%s:%s' % ('[::1]', 11211)
|
||||||
client = memcached.MemcacheRing([server_socket])
|
client = memcached.MemcacheRing([server_socket])
|
||||||
@ -219,7 +235,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
self.assertIs(client._client_cache[server]._tls_context, context)
|
self.assertIs(client._client_cache[server]._tls_context, context)
|
||||||
|
|
||||||
key = uuid4().hex.encode('ascii')
|
key = uuid4().hex.encode('ascii')
|
||||||
list(client._get_conns(key))
|
list(client._get_conns('test', key))
|
||||||
context.wrap_socket.assert_called_once()
|
context.wrap_socket.assert_called_once()
|
||||||
|
|
||||||
def test_get_conns(self):
|
def test_get_conns(self):
|
||||||
@ -241,7 +257,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
one = two = True
|
one = two = True
|
||||||
while one or two: # Run until we match hosts one and two
|
while one or two: # Run until we match hosts one and two
|
||||||
key = uuid4().hex.encode('ascii')
|
key = uuid4().hex.encode('ascii')
|
||||||
for conn in memcache_client._get_conns(key):
|
for conn in memcache_client._get_conns('test', key):
|
||||||
if 'b' not in getattr(conn[1], 'mode', ''):
|
if 'b' not in getattr(conn[1], 'mode', ''):
|
||||||
self.assertIsInstance(conn[1], (
|
self.assertIsInstance(conn[1], (
|
||||||
io.RawIOBase, io.BufferedIOBase))
|
io.RawIOBase, io.BufferedIOBase))
|
||||||
@ -268,7 +284,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
memcache_client = memcached.MemcacheRing([server_socket],
|
memcache_client = memcached.MemcacheRing([server_socket],
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
key = uuid4().hex.encode('ascii')
|
key = uuid4().hex.encode('ascii')
|
||||||
for conn in memcache_client._get_conns(key):
|
for conn in memcache_client._get_conns('test', key):
|
||||||
peer_sockaddr = conn[2].getpeername()
|
peer_sockaddr = conn[2].getpeername()
|
||||||
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
|
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
|
||||||
self.assertEqual(peer_socket, server_socket)
|
self.assertEqual(peer_socket, server_socket)
|
||||||
@ -290,7 +306,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
memcache_client = memcached.MemcacheRing([server_host],
|
memcache_client = memcached.MemcacheRing([server_host],
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
key = uuid4().hex.encode('ascii')
|
key = uuid4().hex.encode('ascii')
|
||||||
for conn in memcache_client._get_conns(key):
|
for conn in memcache_client._get_conns('test', key):
|
||||||
peer_sockaddr = conn[2].getpeername()
|
peer_sockaddr = conn[2].getpeername()
|
||||||
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
|
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
|
||||||
self.assertEqual(peer_socket, server_socket)
|
self.assertEqual(peer_socket, server_socket)
|
||||||
@ -319,7 +335,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
memcache_client = memcached.MemcacheRing([server_socket],
|
memcache_client = memcached.MemcacheRing([server_socket],
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
key = uuid4().hex.encode('ascii')
|
key = uuid4().hex.encode('ascii')
|
||||||
for conn in memcache_client._get_conns(key):
|
for conn in memcache_client._get_conns('test', key):
|
||||||
peer_sockaddr = conn[2].getpeername()
|
peer_sockaddr = conn[2].getpeername()
|
||||||
peer_socket = '%s:%s' % (peer_sockaddr[0],
|
peer_socket = '%s:%s' % (peer_sockaddr[0],
|
||||||
peer_sockaddr[1])
|
peer_sockaddr[1])
|
||||||
@ -345,7 +361,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
memcache_client = memcached.MemcacheRing([server_socket],
|
memcache_client = memcached.MemcacheRing([server_socket],
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
key = uuid4().hex.encode('ascii')
|
key = uuid4().hex.encode('ascii')
|
||||||
for conn in memcache_client._get_conns(key):
|
for conn in memcache_client._get_conns('test', key):
|
||||||
peer_sockaddr = conn[2].getpeername()
|
peer_sockaddr = conn[2].getpeername()
|
||||||
peer_socket = '[%s]:%s' % (peer_sockaddr[0],
|
peer_socket = '[%s]:%s' % (peer_sockaddr[0],
|
||||||
peer_sockaddr[1])
|
peer_sockaddr[1])
|
||||||
@ -539,7 +555,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
self.assertEqual(mock1.exploded, True)
|
self.assertEqual(mock1.exploded, True)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
])
|
])
|
||||||
|
|
||||||
self.logger.clear()
|
self.logger.clear()
|
||||||
@ -548,7 +564,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
self.assertEqual(mock1.exploded, True)
|
self.assertEqual(mock1.exploded, True)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
])
|
])
|
||||||
# Check that we really did call create() twice
|
# Check that we really did call create() twice
|
||||||
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
|
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
|
||||||
@ -571,7 +587,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
# to .4
|
# to .4
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
] * 11 + [
|
] * 11 + [
|
||||||
'Error limiting server 1.2.3.5:11211'
|
'Error limiting server 1.2.3.5:11211'
|
||||||
])
|
])
|
||||||
@ -583,10 +599,10 @@ class TestMemcached(unittest.TestCase):
|
|||||||
# as we keep going, eventually .4 gets error limited, too
|
# as we keep going, eventually .4 gets error limited, too
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.4:11211: '
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
] * 10 + [
|
] * 10 + [
|
||||||
'Error talking to memcached: 1.2.3.4:11211: '
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
'Error limiting server 1.2.3.4:11211',
|
'Error limiting server 1.2.3.4:11211',
|
||||||
'All memcached servers error-limited',
|
'All memcached servers error-limited',
|
||||||
])
|
])
|
||||||
@ -619,7 +635,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
# to .4
|
# to .4
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.4:11211: '
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
] * 20)
|
] * 20)
|
||||||
|
|
||||||
def test_error_raising(self):
|
def test_error_raising(self):
|
||||||
@ -634,7 +650,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
|
memcache_client.set('some_key', [1, 2, 3], raise_on_error=True)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.4:11211: '
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
])
|
])
|
||||||
self.logger.clear()
|
self.logger.clear()
|
||||||
|
|
||||||
@ -642,7 +658,17 @@ class TestMemcached(unittest.TestCase):
|
|||||||
memcache_client.get('some_key', raise_on_error=True)
|
memcache_client.get('some_key', raise_on_error=True)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.4:11211: '
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
|
])
|
||||||
|
self.logger.clear()
|
||||||
|
|
||||||
|
with self.assertRaises(MemcacheConnectionError):
|
||||||
|
memcache_client.set(
|
||||||
|
'shard-updating-v2/acc/container', [1, 2, 3],
|
||||||
|
raise_on_error=True)
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
|
'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe',
|
||||||
])
|
])
|
||||||
self.logger.clear()
|
self.logger.clear()
|
||||||
|
|
||||||
@ -650,14 +676,21 @@ class TestMemcached(unittest.TestCase):
|
|||||||
memcache_client.set('some_key', [1, 2, 3])
|
memcache_client.set('some_key', [1, 2, 3])
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.4:11211: '
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
])
|
])
|
||||||
self.logger.clear()
|
self.logger.clear()
|
||||||
|
|
||||||
memcache_client.get('some_key')
|
memcache_client.get('some_key')
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.4:11211: '
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
|
])
|
||||||
|
self.logger.clear()
|
||||||
|
|
||||||
|
memcache_client.set('shard-updating-v2/acc/container', [1, 2, 3])
|
||||||
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
|
'Error talking to memcached: 1.2.3.4:11211: '
|
||||||
|
'with key_prefix shard-updating-v2/acc: [Errno 32] Broken pipe',
|
||||||
])
|
])
|
||||||
|
|
||||||
def test_error_limiting_custom_config(self):
|
def test_error_limiting_custom_config(self):
|
||||||
@ -680,10 +713,10 @@ class TestMemcached(unittest.TestCase):
|
|||||||
do_calls(5, 12)
|
do_calls(5, 12)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
] * 10 + [
|
] * 10 + [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
'Error limiting server 1.2.3.5:11211',
|
'Error limiting server 1.2.3.5:11211',
|
||||||
'All memcached servers error-limited',
|
'All memcached servers error-limited',
|
||||||
])
|
])
|
||||||
@ -693,7 +726,7 @@ class TestMemcached(unittest.TestCase):
|
|||||||
do_calls(6, 20)
|
do_calls(6, 20)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
] * 20)
|
] * 20)
|
||||||
|
|
||||||
# with error_limit_time of 66, one call per 6 secs, twelfth one
|
# with error_limit_time of 66, one call per 6 secs, twelfth one
|
||||||
@ -701,10 +734,10 @@ class TestMemcached(unittest.TestCase):
|
|||||||
do_calls(6, 12, error_limit_time=66)
|
do_calls(6, 12, error_limit_time=66)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
] * 10 + [
|
] * 10 + [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
'Error limiting server 1.2.3.5:11211',
|
'Error limiting server 1.2.3.5:11211',
|
||||||
'All memcached servers error-limited',
|
'All memcached servers error-limited',
|
||||||
])
|
])
|
||||||
@ -714,10 +747,10 @@ class TestMemcached(unittest.TestCase):
|
|||||||
do_calls(6, 13, error_limit_time=70, error_limit_count=11)
|
do_calls(6, 13, error_limit_time=70, error_limit_count=11)
|
||||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
] * 11 + [
|
] * 11 + [
|
||||||
'Error talking to memcached: 1.2.3.5:11211: '
|
'Error talking to memcached: 1.2.3.5:11211: '
|
||||||
'[Errno 32] Broken pipe',
|
'with key_prefix some_key: [Errno 32] Broken pipe',
|
||||||
'Error limiting server 1.2.3.5:11211',
|
'Error limiting server 1.2.3.5:11211',
|
||||||
'All memcached servers error-limited',
|
'All memcached servers error-limited',
|
||||||
])
|
])
|
||||||
@ -953,7 +986,8 @@ class TestMemcached(unittest.TestCase):
|
|||||||
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
|
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
self.logger.get_lines_for_level('error'),
|
self.logger.get_lines_for_level('error'),
|
||||||
['Timeout getting a connection to memcached: 1.2.3.5:11211'] * 8)
|
['Timeout getting a connection to memcached: 1.2.3.5:11211'
|
||||||
|
': with key_prefix key'] * 8)
|
||||||
self.assertEqual(served['1.2.3.5'], 2)
|
self.assertEqual(served['1.2.3.5'], 2)
|
||||||
self.assertEqual(pending['1.2.3.4'], 0)
|
self.assertEqual(pending['1.2.3.4'], 0)
|
||||||
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
|
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
|
||||||
@ -992,7 +1026,8 @@ class TestMemcached(unittest.TestCase):
|
|||||||
|
|
||||||
# try to get connect and no connection found
|
# try to get connect and no connection found
|
||||||
# so it will result in StopIteration
|
# so it will result in StopIteration
|
||||||
conn_generator = memcache_client._get_conns(b'key')
|
conn_generator = memcache_client._get_conns(
|
||||||
|
'key', md5hash(b'key'))
|
||||||
with self.assertRaises(StopIteration):
|
with self.assertRaises(StopIteration):
|
||||||
next(conn_generator)
|
next(conn_generator)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user