memcached: Plumb logger into MemcacheRing
This way proxies log memcached errors in the normal way instead of to the root logger (which eventually gets them out on STDERR). If no logger is provided, fall back to the root logger behavior. Change-Id: I2f7b3e7d5b976fab07c9a2d0a9b8c0bd9a840dfd
This commit is contained in:
parent
f58791f376
commit
e4586fdcde
@ -160,7 +160,7 @@ class MemcacheRing(object):
|
||||
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
||||
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
||||
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
||||
max_conns=2):
|
||||
max_conns=2, logger=None):
|
||||
self._ring = {}
|
||||
self._errors = dict(((serv, []) for serv in servers))
|
||||
self._error_limited = dict(((serv, 0) for serv in servers))
|
||||
@ -178,18 +178,23 @@ class MemcacheRing(object):
|
||||
self._pool_timeout = pool_timeout
|
||||
self._allow_pickle = allow_pickle
|
||||
self._allow_unpickle = allow_unpickle or allow_pickle
|
||||
if logger is None:
|
||||
self.logger = logging.getLogger()
|
||||
else:
|
||||
self.logger = logger
|
||||
|
||||
def _exception_occurred(self, server, e, action='talking',
|
||||
sock=None, fp=None, got_connection=True):
|
||||
if isinstance(e, Timeout):
|
||||
logging.error("Timeout %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
elif isinstance(e, (socket.error, MemcacheConnectionError)):
|
||||
logging.error("Error %(action)s to memcached: %(server)s: %(err)s",
|
||||
{'action': action, 'server': server, 'err': e})
|
||||
else:
|
||||
logging.exception("Error %(action)s to memcached: %(server)s",
|
||||
self.logger.error("Timeout %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
elif isinstance(e, (socket.error, MemcacheConnectionError)):
|
||||
self.logger.error(
|
||||
"Error %(action)s to memcached: %(server)s: %(err)s",
|
||||
{'action': action, 'server': server, 'err': e})
|
||||
else:
|
||||
self.logger.exception("Error %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
try:
|
||||
if fp:
|
||||
fp.close()
|
||||
@ -213,7 +218,7 @@ class MemcacheRing(object):
|
||||
if err > now - ERROR_LIMIT_TIME]
|
||||
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
||||
self._error_limited[server] = now + ERROR_LIMIT_DURATION
|
||||
logging.error('Error limiting server %s', server)
|
||||
self.logger.error('Error limiting server %s', server)
|
||||
|
||||
def _get_conns(self, key):
|
||||
"""
|
||||
|
@ -19,6 +19,7 @@ from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
||||
|
||||
from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT,
|
||||
IO_TIMEOUT, TRY_COUNT)
|
||||
from swift.common.utils import get_logger
|
||||
|
||||
|
||||
class MemcacheMiddleware(object):
|
||||
@ -28,6 +29,7 @@ class MemcacheMiddleware(object):
|
||||
|
||||
def __init__(self, app, conf):
|
||||
self.app = app
|
||||
self.logger = get_logger(conf, log_route='memcache')
|
||||
self.memcache_servers = conf.get('memcache_servers')
|
||||
serialization_format = conf.get('memcache_serialization_support')
|
||||
try:
|
||||
@ -102,7 +104,8 @@ class MemcacheMiddleware(object):
|
||||
io_timeout=io_timeout,
|
||||
allow_pickle=(serialization_format == 0),
|
||||
allow_unpickle=(serialization_format <= 1),
|
||||
max_conns=max_conns)
|
||||
max_conns=max_conns,
|
||||
logger=self.logger)
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
env['swift.cache'] = self.memcache
|
||||
|
@ -20,6 +20,7 @@ from collections import defaultdict
|
||||
import errno
|
||||
from hashlib import md5
|
||||
import io
|
||||
import logging
|
||||
import six
|
||||
import socket
|
||||
import time
|
||||
@ -184,9 +185,14 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.logger = debug_logger()
|
||||
patcher = mock.patch('swift.common.memcached.logging', self.logger)
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher.start()
|
||||
|
||||
def test_logger_kwarg(self):
|
||||
server_socket = '%s:%s' % ('[::1]', 11211)
|
||||
client = memcached.MemcacheRing([server_socket])
|
||||
self.assertIs(client.logger, logging.getLogger())
|
||||
|
||||
client = memcached.MemcacheRing([server_socket], logger=self.logger)
|
||||
self.assertIs(client.logger, self.logger)
|
||||
|
||||
def test_get_conns(self):
|
||||
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
@ -202,7 +208,8 @@ class TestMemcached(unittest.TestCase):
|
||||
sock2ipport = '%s:%s' % (sock2ip, memcached.DEFAULT_MEMCACHED_PORT)
|
||||
# We're deliberately using sock2ip (no port) here to test that the
|
||||
# default port is used.
|
||||
memcache_client = memcached.MemcacheRing([sock1ipport, sock2ip])
|
||||
memcache_client = memcached.MemcacheRing([sock1ipport, sock2ip],
|
||||
logger=self.logger)
|
||||
one = two = True
|
||||
while one or two: # Run until we match hosts one and two
|
||||
key = uuid4().hex.encode('ascii')
|
||||
@ -230,7 +237,8 @@ class TestMemcached(unittest.TestCase):
|
||||
sock.listen(1)
|
||||
sock_addr = sock.getsockname()
|
||||
server_socket = '[%s]:%s' % (sock_addr[0], sock_addr[1])
|
||||
memcache_client = memcached.MemcacheRing([server_socket])
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -251,7 +259,8 @@ class TestMemcached(unittest.TestCase):
|
||||
server_socket = '[%s]:%s' % (sock_addr[0], sock_addr[1])
|
||||
server_host = '[%s]' % sock_addr[0]
|
||||
memcached.DEFAULT_MEMCACHED_PORT = sock_addr[1]
|
||||
memcache_client = memcached.MemcacheRing([server_host])
|
||||
memcache_client = memcached.MemcacheRing([server_host],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -265,7 +274,7 @@ class TestMemcached(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
# IPv6 address with missing [] is invalid
|
||||
server_socket = '%s:%s' % ('::1', 11211)
|
||||
memcached.MemcacheRing([server_socket])
|
||||
memcached.MemcacheRing([server_socket], logger=self.logger)
|
||||
|
||||
def test_get_conns_hostname(self):
|
||||
with patch('swift.common.memcached.socket.getaddrinfo') as addrinfo:
|
||||
@ -279,7 +288,8 @@ class TestMemcached(unittest.TestCase):
|
||||
addrinfo.return_value = [(socket.AF_INET,
|
||||
socket.SOCK_STREAM, 0, '',
|
||||
('127.0.0.1', sock_addr[1]))]
|
||||
memcache_client = memcached.MemcacheRing([server_socket])
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -304,7 +314,8 @@ class TestMemcached(unittest.TestCase):
|
||||
addrinfo.return_value = [(socket.AF_INET6,
|
||||
socket.SOCK_STREAM, 0, '',
|
||||
('::1', sock_addr[1]))]
|
||||
memcache_client = memcached.MemcacheRing([server_socket])
|
||||
memcache_client = memcached.MemcacheRing([server_socket],
|
||||
logger=self.logger)
|
||||
key = uuid4().hex.encode('ascii')
|
||||
for conn in memcache_client._get_conns(key):
|
||||
peer_sockaddr = conn[2].getpeername()
|
||||
@ -317,7 +328,8 @@ class TestMemcached(unittest.TestCase):
|
||||
sock.close()
|
||||
|
||||
def test_set_get_json(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -350,7 +362,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertAlmostEqual(float(cache_timeout), esttimeout, delta=1)
|
||||
|
||||
def test_get_failed_connection_mid_request(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -365,18 +378,17 @@ class TestMemcached(unittest.TestCase):
|
||||
# force the logging through the DebugLogger instead of the nose
|
||||
# handler. This will use stdout, so we can assert that no stack trace
|
||||
# is logged.
|
||||
logger = debug_logger()
|
||||
with patch("sys.stdout", fake_stdout),\
|
||||
patch('swift.common.memcached.logging', logger):
|
||||
with patch("sys.stdout", fake_stdout):
|
||||
mock.read_return_empty_str = True
|
||||
self.assertIsNone(memcache_client.get('some_key'))
|
||||
log_lines = logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Error talking to memcached', log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertNotIn("Traceback", fake_stdout.getvalue())
|
||||
|
||||
def test_incr(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -396,7 +408,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertTrue(mock.close_called)
|
||||
|
||||
def test_incr_failed_connection_mid_request(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -411,19 +424,18 @@ class TestMemcached(unittest.TestCase):
|
||||
# force the logging through the DebugLogger instead of the nose
|
||||
# handler. This will use stdout, so we can assert that no stack trace
|
||||
# is logged.
|
||||
logger = debug_logger()
|
||||
with patch("sys.stdout", fake_stdout), \
|
||||
patch('swift.common.memcached.logging', logger):
|
||||
with patch("sys.stdout", fake_stdout):
|
||||
mock.read_return_empty_str = True
|
||||
self.assertRaises(memcached.MemcacheConnectionError,
|
||||
memcache_client.incr, 'some_key', delta=1)
|
||||
log_lines = logger.get_lines_for_level('error')
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Error talking to memcached', log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertNotIn('Traceback', fake_stdout.getvalue())
|
||||
|
||||
def test_incr_w_timeout(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -455,7 +467,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEqual(mock.cache, {cache_key: (b'0', b'0', b'10')})
|
||||
|
||||
def test_decr(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -473,7 +486,7 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def test_retry(self):
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211', '1.2.3.5:11211'])
|
||||
['1.2.3.4:11211', '1.2.3.5:11211'], logger=self.logger)
|
||||
mock1 = ExplodingMockMemcached()
|
||||
mock2 = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
@ -500,7 +513,8 @@ class TestMemcached(unittest.TestCase):
|
||||
[])
|
||||
|
||||
def test_delete(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -510,7 +524,8 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertIsNone(memcache_client.get('some_key'))
|
||||
|
||||
def test_multi(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -560,7 +575,8 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def test_multi_delete(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
|
||||
'1.2.3.5:11211'])
|
||||
'1.2.3.5:11211'],
|
||||
logger=self.logger)
|
||||
mock1 = MockMemcached()
|
||||
mock2 = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
@ -598,7 +614,8 @@ class TestMemcached(unittest.TestCase):
|
||||
|
||||
def test_serialization(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
allow_pickle=True)
|
||||
allow_pickle=True,
|
||||
logger=self.logger)
|
||||
mock = MockMemcached()
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock, mock)] * 2)
|
||||
@ -643,7 +660,8 @@ class TestMemcached(unittest.TestCase):
|
||||
mock_sock.connect = wait_connect
|
||||
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||
connect_timeout=10)
|
||||
connect_timeout=10,
|
||||
logger=self.logger)
|
||||
# sanity
|
||||
self.assertEqual(1, len(memcache_client._client_cache))
|
||||
for server, pool in memcache_client._client_cache.items():
|
||||
@ -702,7 +720,8 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
|
||||
'1.2.3.5:11211'],
|
||||
io_timeout=0.5,
|
||||
pool_timeout=0.1)
|
||||
pool_timeout=0.1,
|
||||
logger=self.logger)
|
||||
|
||||
# Hand out a couple slow connections to 1.2.3.5, leaving 1.2.3.4
|
||||
# fast. All ten (10) clients should try to talk to .5 first, and
|
||||
|
Loading…
x
Reference in New Issue
Block a user