ratelimiting does not handle memcache restart
This commit is contained in:
parent
aa14afe2bb
commit
a3474704c2
@ -45,6 +45,10 @@ def md5hash(key):
|
|||||||
return md5(key).hexdigest()
|
return md5(key).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
class MemcacheConnectionError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class MemcacheRing(object):
|
class MemcacheRing(object):
|
||||||
"""
|
"""
|
||||||
Simple, consistent-hashed memcache client.
|
Simple, consistent-hashed memcache client.
|
||||||
@ -180,6 +184,7 @@ class MemcacheRing(object):
|
|||||||
:param delta: amount to add to the value of key (or set as the value
|
:param delta: amount to add to the value of key (or set as the value
|
||||||
if the key is not found) will be cast to an int
|
if the key is not found) will be cast to an int
|
||||||
:param timeout: ttl in memcache
|
:param timeout: ttl in memcache
|
||||||
|
:raises MemcacheConnectionError:
|
||||||
"""
|
"""
|
||||||
key = md5hash(key)
|
key = md5hash(key)
|
||||||
command = 'incr'
|
command = 'incr'
|
||||||
@ -209,6 +214,7 @@ class MemcacheRing(object):
|
|||||||
return ret
|
return ret
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self._exception_occurred(server, e)
|
self._exception_occurred(server, e)
|
||||||
|
raise MemcacheConnectionError("No Memcached connections succeeded.")
|
||||||
|
|
||||||
def decr(self, key, delta=1, timeout=0):
|
def decr(self, key, delta=1, timeout=0):
|
||||||
"""
|
"""
|
||||||
@ -220,6 +226,7 @@ class MemcacheRing(object):
|
|||||||
value to 0 if the key is not found) will be cast to
|
value to 0 if the key is not found) will be cast to
|
||||||
an int
|
an int
|
||||||
:param timeout: ttl in memcache
|
:param timeout: ttl in memcache
|
||||||
|
:raises MemcacheConnectionError:
|
||||||
"""
|
"""
|
||||||
self.incr(key, delta=-delta, timeout=timeout)
|
self.incr(key, delta=-delta, timeout=timeout)
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ from webob.exc import HTTPNotFound
|
|||||||
|
|
||||||
from swift.common.utils import split_path, cache_from_env, get_logger
|
from swift.common.utils import split_path, cache_from_env, get_logger
|
||||||
from swift.proxy.server import get_container_memcache_key
|
from swift.proxy.server import get_container_memcache_key
|
||||||
|
from swift.common.memcached import MemcacheConnectionError
|
||||||
|
|
||||||
|
|
||||||
class MaxSleepTimeHitError(Exception):
|
class MaxSleepTimeHitError(Exception):
|
||||||
@ -136,6 +137,7 @@ class RateLimitMiddleware(object):
|
|||||||
:param max_rate: maximum rate allowed in requests per second
|
:param max_rate: maximum rate allowed in requests per second
|
||||||
:raises: MaxSleepTimeHitError if max sleep time is exceeded.
|
:raises: MaxSleepTimeHitError if max sleep time is exceeded.
|
||||||
'''
|
'''
|
||||||
|
try:
|
||||||
now_m = int(round(time.time() * self.clock_accuracy))
|
now_m = int(round(time.time() * self.clock_accuracy))
|
||||||
time_per_request_m = int(round(self.clock_accuracy / max_rate))
|
time_per_request_m = int(round(self.clock_accuracy / max_rate))
|
||||||
running_time_m = self.memcache_client.incr(key,
|
running_time_m = self.memcache_client.incr(key,
|
||||||
@ -158,6 +160,8 @@ class RateLimitMiddleware(object):
|
|||||||
need_to_sleep_m)
|
need_to_sleep_m)
|
||||||
|
|
||||||
return float(need_to_sleep_m) / self.clock_accuracy
|
return float(need_to_sleep_m) / self.clock_accuracy
|
||||||
|
except MemcacheConnectionError:
|
||||||
|
return 0
|
||||||
|
|
||||||
def handle_ratelimit(self, req, account_name, container_name, obj_name):
|
def handle_ratelimit(self, req, account_name, container_name, obj_name):
|
||||||
'''
|
'''
|
||||||
|
@ -21,12 +21,14 @@ from webob import Request
|
|||||||
|
|
||||||
from swift.common.middleware import ratelimit
|
from swift.common.middleware import ratelimit
|
||||||
from swift.proxy.server import get_container_memcache_key
|
from swift.proxy.server import get_container_memcache_key
|
||||||
|
from swift.common.memcached import MemcacheConnectionError
|
||||||
|
|
||||||
|
|
||||||
class FakeMemcache(object):
|
class FakeMemcache(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.store = {}
|
self.store = {}
|
||||||
|
self.error_on_incr = False
|
||||||
|
|
||||||
def get(self, key):
|
def get(self, key):
|
||||||
return self.store.get(key)
|
return self.store.get(key)
|
||||||
@ -36,6 +38,8 @@ class FakeMemcache(object):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def incr(self, key, delta=1, timeout=0):
|
def incr(self, key, delta=1, timeout=0):
|
||||||
|
if self.error_on_incr:
|
||||||
|
raise MemcacheConnectionError('Memcache restarting')
|
||||||
self.store[key] = int(self.store.setdefault(key, 0)) + int(delta)
|
self.store[key] = int(self.store.setdefault(key, 0)) + int(delta)
|
||||||
if self.store[key] < 0:
|
if self.store[key] < 0:
|
||||||
self.store[key] = 0
|
self.store[key] = 0
|
||||||
@ -403,6 +407,21 @@ class TestRateLimit(unittest.TestCase):
|
|||||||
start_response)
|
start_response)
|
||||||
self._run(make_app_call, num_calls, current_rate)
|
self._run(make_app_call, num_calls, current_rate)
|
||||||
|
|
||||||
|
def test_restarting_memcache(self):
|
||||||
|
current_rate = 2
|
||||||
|
num_calls = 5
|
||||||
|
conf_dict = {'account_ratelimit': current_rate}
|
||||||
|
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
|
||||||
|
ratelimit.http_connect = mock_http_connect(204)
|
||||||
|
req = Request.blank('/v/a')
|
||||||
|
req.environ['swift.cache'] = FakeMemcache()
|
||||||
|
req.environ['swift.cache'].error_on_incr = True
|
||||||
|
make_app_call = lambda: self.test_ratelimit(req.environ,
|
||||||
|
start_response)
|
||||||
|
begin = time.time()
|
||||||
|
self._run(make_app_call, num_calls, current_rate, check_time=False)
|
||||||
|
time_took = time.time() - begin
|
||||||
|
self.assert_(round(time_took, 1) == 0) # no memcache, no limiting
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -50,6 +50,7 @@ class MockMemcached(object):
|
|||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.down = False
|
self.down = False
|
||||||
self.exc_on_delete = False
|
self.exc_on_delete = False
|
||||||
|
self.read_return_none = False
|
||||||
|
|
||||||
def sendall(self, string):
|
def sendall(self, string):
|
||||||
if self.down:
|
if self.down:
|
||||||
@ -110,6 +111,8 @@ class MockMemcached(object):
|
|||||||
else:
|
else:
|
||||||
self.outbuf += 'NOT_FOUND\r\n'
|
self.outbuf += 'NOT_FOUND\r\n'
|
||||||
def readline(self):
|
def readline(self):
|
||||||
|
if self.read_return_none:
|
||||||
|
return None
|
||||||
if self.down:
|
if self.down:
|
||||||
raise Exception('mock is down')
|
raise Exception('mock is down')
|
||||||
if '\n' in self.outbuf:
|
if '\n' in self.outbuf:
|
||||||
@ -166,6 +169,9 @@ class TestMemcached(unittest.TestCase):
|
|||||||
self.assertEquals(memcache_client.get('some_key'), '6')
|
self.assertEquals(memcache_client.get('some_key'), '6')
|
||||||
memcache_client.incr('some_key', delta=-15)
|
memcache_client.incr('some_key', delta=-15)
|
||||||
self.assertEquals(memcache_client.get('some_key'), '0')
|
self.assertEquals(memcache_client.get('some_key'), '0')
|
||||||
|
mock.read_return_none = True
|
||||||
|
self.assertRaises(memcached.MemcacheConnectionError,
|
||||||
|
memcache_client.incr, 'some_key', delta=-15)
|
||||||
|
|
||||||
def test_decr(self):
|
def test_decr(self):
|
||||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
|
||||||
@ -179,6 +185,10 @@ class TestMemcached(unittest.TestCase):
|
|||||||
self.assertEquals(memcache_client.get('some_key'), '11')
|
self.assertEquals(memcache_client.get('some_key'), '11')
|
||||||
memcache_client.decr('some_key', delta=15)
|
memcache_client.decr('some_key', delta=15)
|
||||||
self.assertEquals(memcache_client.get('some_key'), '0')
|
self.assertEquals(memcache_client.get('some_key'), '0')
|
||||||
|
mock.read_return_none = True
|
||||||
|
self.assertRaises(memcached.MemcacheConnectionError,
|
||||||
|
memcache_client.decr, 'some_key', delta=15)
|
||||||
|
|
||||||
|
|
||||||
def test_retry(self):
|
def test_retry(self):
|
||||||
logging.getLogger().addHandler(NullLoggingHandler())
|
logging.getLogger().addHandler(NullLoggingHandler())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user