Merge "Socket errors don't warrant tracebacks when talking to memcached"
This commit is contained in:
commit
daea9932d4
@ -49,7 +49,6 @@ import json
|
||||
import logging
|
||||
import time
|
||||
from bisect import bisect
|
||||
from swift import gettext_ as _
|
||||
from hashlib import md5
|
||||
|
||||
from eventlet.green import socket
|
||||
@ -163,10 +162,13 @@ class MemcacheRing(object):
|
||||
def _exception_occurred(self, server, e, action='talking',
|
||||
sock=None, fp=None, got_connection=True):
|
||||
if isinstance(e, Timeout):
|
||||
logging.error(_("Timeout %(action)s to memcached: %(server)s"),
|
||||
logging.error("Timeout %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
elif isinstance(e, socket.error):
|
||||
logging.error("Error %(action)s to memcached: %(server)s: %(err)s",
|
||||
{'action': action, 'server': server, 'err': e})
|
||||
else:
|
||||
logging.exception(_("Error %(action)s to memcached: %(server)s"),
|
||||
logging.exception("Error %(action)s to memcached: %(server)s",
|
||||
{'action': action, 'server': server})
|
||||
try:
|
||||
if fp:
|
||||
@ -191,7 +193,7 @@ class MemcacheRing(object):
|
||||
if err > now - ERROR_LIMIT_TIME]
|
||||
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
||||
self._error_limited[server] = now + ERROR_LIMIT_DURATION
|
||||
logging.error(_('Error limiting server %s'), server)
|
||||
logging.error('Error limiting server %s', server)
|
||||
|
||||
def _get_conns(self, key):
|
||||
"""
|
||||
|
@ -17,19 +17,22 @@
|
||||
"""Tests for swift.common.utils"""
|
||||
|
||||
from collections import defaultdict
|
||||
import errno
|
||||
from hashlib import md5
|
||||
import logging
|
||||
import socket
|
||||
import time
|
||||
import unittest
|
||||
from uuid import uuid4
|
||||
import os
|
||||
|
||||
import mock
|
||||
|
||||
from eventlet import GreenPool, sleep, Queue
|
||||
from eventlet.pools import Pool
|
||||
|
||||
from swift.common import memcached
|
||||
from mock import patch, MagicMock
|
||||
from test.unit import NullLoggingHandler
|
||||
from test.unit import debug_logger
|
||||
|
||||
|
||||
class MockedMemcachePool(memcached.MemcacheConnPool):
|
||||
@ -48,15 +51,15 @@ class ExplodingMockMemcached(object):
|
||||
|
||||
def sendall(self, string):
|
||||
self.exploded = True
|
||||
raise socket.error()
|
||||
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
||||
|
||||
def readline(self):
|
||||
self.exploded = True
|
||||
raise socket.error()
|
||||
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
||||
|
||||
def read(self, size):
|
||||
self.exploded = True
|
||||
raise socket.error()
|
||||
raise socket.error(errno.EPIPE, os.strerror(errno.EPIPE))
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
@ -169,6 +172,12 @@ class MockMemcached(object):
|
||||
class TestMemcached(unittest.TestCase):
|
||||
"""Tests for swift.common.memcached"""
|
||||
|
||||
def setUp(self):
|
||||
self.logger = debug_logger()
|
||||
patcher = mock.patch('swift.common.memcached.logging', self.logger)
|
||||
self.addCleanup(patcher.stop)
|
||||
patcher.start()
|
||||
|
||||
def test_get_conns(self):
|
||||
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock1.bind(('127.0.0.1', 0))
|
||||
@ -397,7 +406,6 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client.decr, 'some_key', delta=15)
|
||||
|
||||
def test_retry(self):
|
||||
logging.getLogger().addHandler(NullLoggingHandler())
|
||||
memcache_client = memcached.MemcacheRing(
|
||||
['1.2.3.4:11211', '1.2.3.5:11211'])
|
||||
mock1 = ExplodingMockMemcached()
|
||||
@ -405,10 +413,25 @@ class TestMemcached(unittest.TestCase):
|
||||
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||
[(mock2, mock2)])
|
||||
memcache_client._client_cache['1.2.3.5:11211'] = MockedMemcachePool(
|
||||
[(mock1, mock1)])
|
||||
[(mock1, mock1), (mock1, mock1)])
|
||||
memcache_client.set('some_key', [1, 2, 3])
|
||||
self.assertEqual(mock1.exploded, True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
|
||||
self.logger.clear()
|
||||
mock1.exploded = False
|
||||
self.assertEqual(memcache_client.get('some_key'), [1, 2, 3])
|
||||
self.assertEqual(mock1.exploded, True)
|
||||
self.assertEqual(self.logger.get_lines_for_level('error'), [
|
||||
'Error talking to memcached: 1.2.3.5:11211: '
|
||||
'[Errno 32] Broken pipe',
|
||||
])
|
||||
# Check that we really did call create() twice
|
||||
self.assertEqual(memcache_client._client_cache['1.2.3.5:11211'].mocks,
|
||||
[])
|
||||
|
||||
def test_delete(self):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||
@ -544,25 +567,23 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertTrue(connections.empty())
|
||||
|
||||
def test_connection_pool_timeout(self):
|
||||
orig_conn_pool = memcached.MemcacheConnPool
|
||||
try:
|
||||
connections = defaultdict(Queue)
|
||||
pending = defaultdict(int)
|
||||
served = defaultdict(int)
|
||||
connections = defaultdict(Queue)
|
||||
pending = defaultdict(int)
|
||||
served = defaultdict(int)
|
||||
|
||||
class MockConnectionPool(orig_conn_pool):
|
||||
def get(self):
|
||||
pending[self.host] += 1
|
||||
conn = connections[self.host].get()
|
||||
pending[self.host] -= 1
|
||||
return conn
|
||||
class MockConnectionPool(memcached.MemcacheConnPool):
|
||||
def get(self):
|
||||
pending[self.host] += 1
|
||||
conn = connections[self.host].get()
|
||||
pending[self.host] -= 1
|
||||
return conn
|
||||
|
||||
def put(self, *args, **kwargs):
|
||||
connections[self.host].put(*args, **kwargs)
|
||||
served[self.host] += 1
|
||||
|
||||
memcached.MemcacheConnPool = MockConnectionPool
|
||||
def put(self, *args, **kwargs):
|
||||
connections[self.host].put(*args, **kwargs)
|
||||
served[self.host] += 1
|
||||
|
||||
with mock.patch.object(memcached, 'MemcacheConnPool',
|
||||
MockConnectionPool):
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
|
||||
'1.2.3.5:11211'],
|
||||
io_timeout=0.5,
|
||||
@ -587,18 +608,20 @@ class TestMemcached(unittest.TestCase):
|
||||
# Wait for the dust to settle.
|
||||
p.waitall()
|
||||
|
||||
self.assertEqual(pending['1.2.3.5'], 8)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
|
||||
self.assertEqual(served['1.2.3.5'], 2)
|
||||
self.assertEqual(pending['1.2.3.4'], 0)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
|
||||
self.assertEqual(served['1.2.3.4'], 8)
|
||||
self.assertEqual(pending['1.2.3.5'], 8)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
|
||||
self.assertEqual(
|
||||
self.logger.get_lines_for_level('error'),
|
||||
['Timeout getting a connection to memcached: 1.2.3.5:11211'] * 8)
|
||||
self.assertEqual(served['1.2.3.5'], 2)
|
||||
self.assertEqual(pending['1.2.3.4'], 0)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
|
||||
self.assertEqual(served['1.2.3.4'], 8)
|
||||
|
||||
# and we never got more put in that we gave out
|
||||
self.assertEqual(connections['1.2.3.5'].qsize(), 2)
|
||||
self.assertEqual(connections['1.2.3.4'].qsize(), 2)
|
||||
|
||||
# and we never got more put in that we gave out
|
||||
self.assertEqual(connections['1.2.3.5'].qsize(), 2)
|
||||
self.assertEqual(connections['1.2.3.4'].qsize(), 2)
|
||||
finally:
|
||||
memcached.MemcacheConnPool = orig_conn_pool
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user