Merge "Make error limits survive a ring reload"
This commit is contained in:
commit
2cf24e914b
@ -76,6 +76,8 @@ class Application(object):
|
||||
else:
|
||||
self.logger = logger
|
||||
|
||||
self._error_limiting = {}
|
||||
|
||||
swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.swift_dir = swift_dir
|
||||
self.node_timeout = int(conf.get('node_timeout', 10))
|
||||
@ -406,6 +408,9 @@ class Application(object):
|
||||
timing = round(timing, 3) # sort timings to the millisecond
|
||||
self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
|
||||
|
||||
def _error_limit_node_key(self, node):
|
||||
return "{ip}:{port}/{device}".format(**node)
|
||||
|
||||
def error_limited(self, node):
|
||||
"""
|
||||
Check if the node is currently error limited.
|
||||
@ -414,15 +419,16 @@ class Application(object):
|
||||
:returns: True if error limited, False otherwise
|
||||
"""
|
||||
now = time()
|
||||
if 'errors' not in node:
|
||||
node_key = self._error_limit_node_key(node)
|
||||
error_stats = self._error_limiting.get(node_key)
|
||||
|
||||
if error_stats is None or 'errors' not in error_stats:
|
||||
return False
|
||||
if 'last_error' in node and node['last_error'] < \
|
||||
if 'last_error' in error_stats and error_stats['last_error'] < \
|
||||
now - self.error_suppression_interval:
|
||||
del node['last_error']
|
||||
if 'errors' in node:
|
||||
del node['errors']
|
||||
self._error_limiting.pop(node_key, None)
|
||||
return False
|
||||
limited = node['errors'] > self.error_suppression_limit
|
||||
limited = error_stats['errors'] > self.error_suppression_limit
|
||||
if limited:
|
||||
self.logger.debug(
|
||||
_('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
|
||||
@ -438,8 +444,10 @@ class Application(object):
|
||||
:param node: dictionary of node to error limit
|
||||
:param msg: error message
|
||||
"""
|
||||
node['errors'] = self.error_suppression_limit + 1
|
||||
node['last_error'] = time()
|
||||
node_key = self._error_limit_node_key(node)
|
||||
error_stats = self._error_limiting.setdefault(node_key, {})
|
||||
error_stats['errors'] = self.error_suppression_limit + 1
|
||||
error_stats['last_error'] = time()
|
||||
self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
|
||||
{'msg': msg, 'ip': node['ip'],
|
||||
'port': node['port'], 'device': node['device']})
|
||||
@ -451,8 +459,10 @@ class Application(object):
|
||||
:param node: dictionary of node to handle errors for
|
||||
:param msg: error message
|
||||
"""
|
||||
node['errors'] = node.get('errors', 0) + 1
|
||||
node['last_error'] = time()
|
||||
node_key = self._error_limit_node_key(node)
|
||||
error_stats = self._error_limiting.setdefault(node_key, {})
|
||||
error_stats['errors'] = error_stats.get('errors', 0) + 1
|
||||
error_stats['last_error'] = time()
|
||||
self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
|
||||
{'msg': msg, 'ip': node['ip'],
|
||||
'port': node['port'], 'device': node['device']})
|
||||
|
@ -125,7 +125,8 @@ class PatchPolicies(object):
|
||||
|
||||
class FakeRing(Ring):
|
||||
|
||||
def __init__(self, replicas=3, max_more_nodes=0, part_power=0):
|
||||
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
|
||||
base_port=1000):
|
||||
"""
|
||||
:param part_power: make part calculation based on the path
|
||||
|
||||
@ -133,27 +134,23 @@ class FakeRing(Ring):
|
||||
out of ring methods will actually be based on the path - otherwise we
|
||||
exercise the real ring code, but ignore the result and return 1.
|
||||
"""
|
||||
self._base_port = base_port
|
||||
self.max_more_nodes = max_more_nodes
|
||||
self._part_shift = 32 - part_power
|
||||
# 9 total nodes (6 more past the initial 3) is the cap, no matter if
|
||||
# this is set higher, or R^2 for R replicas
|
||||
self.set_replicas(replicas)
|
||||
self.max_more_nodes = max_more_nodes
|
||||
self._part_shift = 32 - part_power
|
||||
self._reload()
|
||||
|
||||
def _reload(self):
|
||||
self._rtime = time.time()
|
||||
|
||||
def clear_errors(self):
|
||||
for dev in self.devs:
|
||||
for key in ('errors', 'last_error'):
|
||||
dev.pop(key, None)
|
||||
|
||||
def set_replicas(self, replicas):
|
||||
self.replicas = replicas
|
||||
self._devs = []
|
||||
for x in range(self.replicas):
|
||||
ip = '10.0.0.%s' % x
|
||||
port = 1000 + x
|
||||
port = self._base_port + x
|
||||
self._devs.append({
|
||||
'ip': ip,
|
||||
'replication_ip': ip,
|
||||
@ -177,7 +174,7 @@ class FakeRing(Ring):
|
||||
for x in xrange(self.replicas, min(self.replicas + self.max_more_nodes,
|
||||
self.replicas * self.replicas)):
|
||||
yield {'ip': '10.0.0.%s' % x,
|
||||
'port': 1000 + x,
|
||||
'port': self._base_port + x,
|
||||
'device': 'sda',
|
||||
'zone': x % 3,
|
||||
'region': x % 2,
|
||||
|
@ -280,6 +280,28 @@ def sortHeaderNames(headerNames):
|
||||
return ', '.join(headers)
|
||||
|
||||
|
||||
def node_error_count(proxy_app, ring_node):
|
||||
# Reach into the proxy's internals to get the error count for a
|
||||
# particular node
|
||||
node_key = proxy_app._error_limit_node_key(ring_node)
|
||||
return proxy_app._error_limiting.get(node_key, {}).get('errors', 0)
|
||||
|
||||
|
||||
def node_last_error(proxy_app, ring_node):
|
||||
# Reach into the proxy's internals to get the last error for a
|
||||
# particular node
|
||||
node_key = proxy_app._error_limit_node_key(ring_node)
|
||||
return proxy_app._error_limiting.get(node_key, {}).get('last_error')
|
||||
|
||||
|
||||
def set_node_errors(proxy_app, ring_node, value, last_error):
|
||||
# Set the node's error count to value
|
||||
node_key = proxy_app._error_limit_node_key(ring_node)
|
||||
stats = proxy_app._error_limiting.setdefault(node_key, {})
|
||||
stats['errors'] = value
|
||||
stats['last_error'] = last_error
|
||||
|
||||
|
||||
class FakeMemcacheReturnsNone(FakeMemcache):
|
||||
|
||||
def get(self, key):
|
||||
@ -923,20 +945,22 @@ class TestProxyServerLoading(unittest.TestCase):
|
||||
self.assert_(policy.object_ring)
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
||||
@patch_policies([StoragePolicy(0, 'zero', True,
|
||||
object_ring=FakeRing(base_port=3000))])
|
||||
class TestObjectController(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.app = proxy_server.Application(None, FakeMemcache(),
|
||||
logger=debug_logger('proxy-ut'),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
self.app = proxy_server.Application(
|
||||
None, FakeMemcache(),
|
||||
logger=debug_logger('proxy-ut'),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing())
|
||||
|
||||
def tearDown(self):
|
||||
self.app.account_ring.set_replicas(3)
|
||||
self.app.container_ring.set_replicas(3)
|
||||
for policy in POLICIES:
|
||||
policy.object_ring = FakeRing()
|
||||
policy.object_ring = FakeRing(base_port=3000)
|
||||
|
||||
def assert_status_map(self, method, statuses, expected, raise_exc=False):
|
||||
with save_globals():
|
||||
@ -2495,9 +2519,9 @@ class TestObjectController(unittest.TestCase):
|
||||
self.app.log_handoffs = True
|
||||
self.app.logger = FakeLogger()
|
||||
self.app.request_node_count = lambda r: 7
|
||||
object_ring.clear_errors()
|
||||
object_ring._devs[0]['errors'] = 999
|
||||
object_ring._devs[0]['last_error'] = 2 ** 63 - 1
|
||||
self.app._error_limiting = {} # clear out errors
|
||||
set_node_errors(self.app, object_ring._devs[0], 999,
|
||||
last_error=(2 ** 63 - 1))
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition):
|
||||
@ -2512,10 +2536,10 @@ class TestObjectController(unittest.TestCase):
|
||||
self.app.log_handoffs = True
|
||||
self.app.logger = FakeLogger()
|
||||
self.app.request_node_count = lambda r: 7
|
||||
object_ring.clear_errors()
|
||||
self.app._error_limiting = {} # clear out errors
|
||||
for i in range(2):
|
||||
object_ring._devs[i]['errors'] = 999
|
||||
object_ring._devs[i]['last_error'] = 2 ** 63 - 1
|
||||
set_node_errors(self.app, object_ring._devs[i], 999,
|
||||
last_error=(2 ** 63 - 1))
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition):
|
||||
@ -2534,10 +2558,10 @@ class TestObjectController(unittest.TestCase):
|
||||
self.app.logger = FakeLogger()
|
||||
self.app.request_node_count = lambda r: 10
|
||||
object_ring.set_replicas(4) # otherwise we run out of handoffs
|
||||
object_ring.clear_errors()
|
||||
self.app._error_limiting = {} # clear out errors
|
||||
for i in range(4):
|
||||
object_ring._devs[i]['errors'] = 999
|
||||
object_ring._devs[i]['last_error'] = 2 ** 63 - 1
|
||||
set_node_errors(self.app, object_ring._devs[i], 999,
|
||||
last_error=(2 ** 63 - 1))
|
||||
|
||||
collected_nodes = []
|
||||
for node in self.app.iter_nodes(object_ring, partition):
|
||||
@ -2595,7 +2619,8 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
def test_iter_nodes_with_custom_node_iter(self):
|
||||
object_ring = self.app.get_object_ring(None)
|
||||
node_list = [dict(id=n) for n in xrange(10)]
|
||||
node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D')
|
||||
for n in xrange(10)]
|
||||
with nested(
|
||||
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
|
||||
mock.patch.object(self.app, 'request_node_count',
|
||||
@ -2676,16 +2701,20 @@ class TestObjectController(unittest.TestCase):
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
|
||||
200)
|
||||
self.assertEquals(object_ring.devs[0]['errors'], 2)
|
||||
self.assert_('last_error' in object_ring.devs[0])
|
||||
self.assertEquals(
|
||||
node_error_count(controller.app, object_ring.devs[0]), 2)
|
||||
self.assert_(node_last_error(controller.app, object_ring.devs[0])
|
||||
is not None)
|
||||
for _junk in xrange(self.app.error_suppression_limit):
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 503, 503,
|
||||
503), 503)
|
||||
self.assertEquals(object_ring.devs[0]['errors'],
|
||||
self.app.error_suppression_limit + 1)
|
||||
self.assertEquals(
|
||||
node_error_count(controller.app, object_ring.devs[0]),
|
||||
self.app.error_suppression_limit + 1)
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200),
|
||||
503)
|
||||
self.assert_('last_error' in object_ring.devs[0])
|
||||
self.assert_(node_last_error(controller.app, object_ring.devs[0])
|
||||
is not None)
|
||||
self.assert_status_map(controller.PUT, (200, 200, 200, 201, 201,
|
||||
201), 503)
|
||||
self.assert_status_map(controller.POST,
|
||||
@ -2701,6 +2730,34 @@ class TestObjectController(unittest.TestCase):
|
||||
(200, 200, 200, 204, 204, 204), 503,
|
||||
raise_exc=True)
|
||||
|
||||
def test_error_limiting_survives_ring_reload(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
object_ring = controller.app.get_object_ring(None)
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
|
||||
200)
|
||||
self.assertEquals(
|
||||
node_error_count(controller.app, object_ring.devs[0]), 2)
|
||||
self.assert_(node_last_error(controller.app, object_ring.devs[0])
|
||||
is not None)
|
||||
for _junk in xrange(self.app.error_suppression_limit):
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 503, 503,
|
||||
503), 503)
|
||||
self.assertEquals(
|
||||
node_error_count(controller.app, object_ring.devs[0]),
|
||||
self.app.error_suppression_limit + 1)
|
||||
|
||||
# wipe out any state in the ring
|
||||
for policy in POLICIES:
|
||||
policy.object_ring = FakeRing(base_port=3000)
|
||||
|
||||
# and we still get an error, which proves that the
|
||||
# error-limiting info survived a ring reload
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 200, 200, 200),
|
||||
503)
|
||||
|
||||
def test_PUT_error_limiting(self):
|
||||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
@ -2712,12 +2769,13 @@ class TestObjectController(unittest.TestCase):
|
||||
200)
|
||||
|
||||
# 2, not 1, because assert_status_map() calls the method twice
|
||||
self.assertEquals(object_ring.devs[0].get('errors', 0), 2)
|
||||
self.assertEquals(object_ring.devs[1].get('errors', 0), 0)
|
||||
self.assertEquals(object_ring.devs[2].get('errors', 0), 0)
|
||||
self.assert_('last_error' in object_ring.devs[0])
|
||||
self.assert_('last_error' not in object_ring.devs[1])
|
||||
self.assert_('last_error' not in object_ring.devs[2])
|
||||
odevs = object_ring.devs
|
||||
self.assertEquals(node_error_count(controller.app, odevs[0]), 2)
|
||||
self.assertEquals(node_error_count(controller.app, odevs[1]), 0)
|
||||
self.assertEquals(node_error_count(controller.app, odevs[2]), 0)
|
||||
self.assert_(node_last_error(controller.app, odevs[0]) is not None)
|
||||
self.assert_(node_last_error(controller.app, odevs[1]) is None)
|
||||
self.assert_(node_last_error(controller.app, odevs[2]) is None)
|
||||
|
||||
def test_PUT_error_limiting_last_node(self):
|
||||
with save_globals():
|
||||
@ -2730,18 +2788,18 @@ class TestObjectController(unittest.TestCase):
|
||||
200)
|
||||
|
||||
# 2, not 1, because assert_status_map() calls the method twice
|
||||
self.assertEquals(object_ring.devs[0].get('errors', 0), 0)
|
||||
self.assertEquals(object_ring.devs[1].get('errors', 0), 0)
|
||||
self.assertEquals(object_ring.devs[2].get('errors', 0), 2)
|
||||
self.assert_('last_error' not in object_ring.devs[0])
|
||||
self.assert_('last_error' not in object_ring.devs[1])
|
||||
self.assert_('last_error' in object_ring.devs[2])
|
||||
odevs = object_ring.devs
|
||||
self.assertEquals(node_error_count(controller.app, odevs[0]), 0)
|
||||
self.assertEquals(node_error_count(controller.app, odevs[1]), 0)
|
||||
self.assertEquals(node_error_count(controller.app, odevs[2]), 2)
|
||||
self.assert_(node_last_error(controller.app, odevs[0]) is None)
|
||||
self.assert_(node_last_error(controller.app, odevs[1]) is None)
|
||||
self.assert_(node_last_error(controller.app, odevs[2]) is not None)
|
||||
|
||||
def test_acc_or_con_missing_returns_404(self):
|
||||
with save_globals():
|
||||
self.app.memcache = FakeMemcacheReturnsNone()
|
||||
self.app.account_ring.clear_errors()
|
||||
self.app.container_ring.clear_errors()
|
||||
self.app._error_limiting = {}
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
set_http_connect(200, 200, 200, 200, 200, 200)
|
||||
@ -2808,8 +2866,9 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
for dev in self.app.account_ring.devs:
|
||||
dev['errors'] = self.app.error_suppression_limit + 1
|
||||
dev['last_error'] = time.time()
|
||||
set_node_errors(
|
||||
self.app, dev, self.app.error_suppression_limit + 1,
|
||||
time.time())
|
||||
set_http_connect(200)
|
||||
# acct [isn't actually called since everything
|
||||
# is error limited]
|
||||
@ -2820,10 +2879,11 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
for dev in self.app.account_ring.devs:
|
||||
dev['errors'] = 0
|
||||
set_node_errors(self.app, dev, 0, last_error=None)
|
||||
for dev in self.app.container_ring.devs:
|
||||
dev['errors'] = self.app.error_suppression_limit + 1
|
||||
dev['last_error'] = time.time()
|
||||
set_node_errors(self.app, dev,
|
||||
self.app.error_suppression_limit + 1,
|
||||
time.time())
|
||||
set_http_connect(200, 200)
|
||||
# acct cont [isn't actually called since
|
||||
# everything is error limited]
|
||||
@ -5172,18 +5232,19 @@ class TestObjectController(unittest.TestCase):
|
||||
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
|
||||
StoragePolicy(1, 'one', False, object_ring=FakeRing()),
|
||||
StoragePolicy(2, 'two', False, True, object_ring=FakeRing())
|
||||
StoragePolicy(0, 'zero', True, object_ring=FakeRing(base_port=3000)),
|
||||
StoragePolicy(1, 'one', False, object_ring=FakeRing(base_port=3000)),
|
||||
StoragePolicy(2, 'two', False, True, object_ring=FakeRing(base_port=3000))
|
||||
])
|
||||
class TestContainerController(unittest.TestCase):
|
||||
"Test swift.proxy_server.ContainerController"
|
||||
|
||||
def setUp(self):
|
||||
self.app = proxy_server.Application(None, FakeMemcache(),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing(),
|
||||
logger=debug_logger())
|
||||
self.app = proxy_server.Application(
|
||||
None, FakeMemcache(),
|
||||
account_ring=FakeRing(),
|
||||
container_ring=FakeRing(base_port=2000),
|
||||
logger=debug_logger())
|
||||
|
||||
def test_convert_policy_to_index(self):
|
||||
controller = swift.proxy.controllers.ContainerController(self.app,
|
||||
@ -5600,7 +5661,7 @@ class TestContainerController(unittest.TestCase):
|
||||
for meth in ('DELETE', 'PUT'):
|
||||
with save_globals():
|
||||
self.app.memcache = FakeMemcacheReturnsNone()
|
||||
self.app.account_ring.clear_errors()
|
||||
self.app._error_limiting = {}
|
||||
controller = proxy_server.ContainerController(self.app,
|
||||
'account',
|
||||
'container')
|
||||
@ -5638,8 +5699,9 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
for dev in self.app.account_ring.devs:
|
||||
dev['errors'] = self.app.error_suppression_limit + 1
|
||||
dev['last_error'] = time.time()
|
||||
set_node_errors(self.app, dev,
|
||||
self.app.error_suppression_limit + 1,
|
||||
time.time())
|
||||
set_http_connect(200, 200, 200, 200, 200, 200)
|
||||
# Make sure it is a blank request wthout env caching
|
||||
req = Request.blank('/v1/a/c',
|
||||
@ -5677,19 +5739,26 @@ class TestContainerController(unittest.TestCase):
|
||||
with save_globals():
|
||||
controller = proxy_server.ContainerController(self.app, 'account',
|
||||
'container')
|
||||
container_ring = controller.app.container_ring
|
||||
controller.app.sort_nodes = lambda l: l
|
||||
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200,
|
||||
missing_container=False)
|
||||
|
||||
self.assertEquals(
|
||||
controller.app.container_ring.devs[0]['errors'], 2)
|
||||
self.assert_('last_error' in controller.app.container_ring.devs[0])
|
||||
node_error_count(controller.app, container_ring.devs[0]), 2)
|
||||
self.assert_(
|
||||
node_last_error(controller.app, container_ring.devs[0])
|
||||
is not None)
|
||||
for _junk in xrange(self.app.error_suppression_limit):
|
||||
self.assert_status_map(controller.HEAD,
|
||||
(200, 503, 503, 503), 503)
|
||||
self.assertEquals(controller.app.container_ring.devs[0]['errors'],
|
||||
self.app.error_suppression_limit + 1)
|
||||
self.assertEquals(
|
||||
node_error_count(controller.app, container_ring.devs[0]),
|
||||
self.app.error_suppression_limit + 1)
|
||||
self.assert_status_map(controller.HEAD, (200, 200, 200, 200), 503)
|
||||
self.assert_('last_error' in controller.app.container_ring.devs[0])
|
||||
self.assert_(
|
||||
node_last_error(controller.app, container_ring.devs[0])
|
||||
is not None)
|
||||
self.assert_status_map(controller.PUT, (200, 201, 201, 201), 503,
|
||||
missing_container=True)
|
||||
self.assert_status_map(controller.DELETE,
|
||||
|
Loading…
Reference in New Issue
Block a user