memcache race condition and combining incr and decr
This commit is contained in:
parent
f494fc37a6
commit
c08de81aeb
@ -168,49 +168,42 @@ class MemcacheRing(object):
|
||||
def incr(self, key, delta=1, timeout=0):
|
||||
"""
|
||||
Increments a key which has a numeric value by delta.
|
||||
If the key can't be found, it's added as delta.
|
||||
If the key can't be found, it's added as delta or 0 if delta < 0.
|
||||
If passed a negative number, will use memcached's decr. Returns
|
||||
the int stored in memcached
|
||||
Note: The data memcached stores as the result of incr/decr is
|
||||
an unsigned int. decr's that result in a number below 0 are
|
||||
stored as 0.
|
||||
|
||||
:param key: key
|
||||
:param delta: amount to add to the value of key (or set as the value
|
||||
if the key is not found)
|
||||
if the key is not found) will be cast to an int
|
||||
:param timeout: ttl in memcache
|
||||
"""
|
||||
key = md5hash(key)
|
||||
command = 'incr'
|
||||
if delta < 0:
|
||||
command = 'decr'
|
||||
delta = str(int(abs(delta)))
|
||||
for (server, fp, sock) in self._get_conns(key):
|
||||
try:
|
||||
sock.sendall('incr %s %s\r\n' % (key, delta))
|
||||
sock.sendall('%s %s %s\r\n' % (command, key, delta))
|
||||
line = fp.readline().strip().split()
|
||||
if line[0].upper() == 'NOT_FOUND':
|
||||
line[0] = str(delta)
|
||||
sock.sendall('add %s %d %d %s noreply\r\n%s\r\n' % \
|
||||
(key, 0, timeout, len(line[0]), line[0]))
|
||||
ret = int(line[0].strip())
|
||||
self._return_conn(server, fp, sock)
|
||||
return ret
|
||||
except Exception, e:
|
||||
self._exception_occurred(server, e)
|
||||
|
||||
def decr(self, key, delta=1, timeout=0):
|
||||
"""
|
||||
Decrements a key which has a numeric value by delta.
|
||||
If the key can't be found, it's added as 0. Memcached
|
||||
will treat data values below 0 as 0 with incr/decr.
|
||||
|
||||
:param key: key
|
||||
:param delta: amount to subtract to the value of key (or set
|
||||
as the value if the key is not found)
|
||||
:param timeout: ttl in memcache
|
||||
"""
|
||||
key = md5hash(key)
|
||||
for (server, fp, sock) in self._get_conns(key):
|
||||
try:
|
||||
sock.sendall('decr %s %s\r\n' % (key, delta))
|
||||
line = fp.readline().strip().split()
|
||||
if line[0].upper() == 'NOT_FOUND':
|
||||
line[0] = '0'
|
||||
sock.sendall('add %s %d %d %s noreply\r\n%s\r\n' %
|
||||
(key, 0, timeout, len(line[0]), line[0]))
|
||||
ret = int(line[0].strip())
|
||||
add_val = delta
|
||||
if command == 'decr':
|
||||
add_val = '0'
|
||||
sock.sendall('add %s %d %d %s\r\n%s\r\n' % \
|
||||
(key, 0, timeout, len(add_val), add_val))
|
||||
line = fp.readline().strip().split()
|
||||
if line[0].upper() == 'NOT_STORED':
|
||||
sock.sendall('%s %s %s\r\n' % (command, key, delta))
|
||||
line = fp.readline().strip().split()
|
||||
ret = int(line[0].strip())
|
||||
else:
|
||||
ret = int(add_val)
|
||||
else:
|
||||
ret = int(line[0].strip())
|
||||
self._return_conn(server, fp, sock)
|
||||
return ret
|
||||
except Exception, e:
|
||||
|
@ -148,7 +148,7 @@ class RateLimitMiddleware(object):
|
||||
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
|
||||
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
|
||||
# treat as no-op decrement time
|
||||
self.memcache_client.decr(key, delta=time_per_request_m)
|
||||
self.memcache_client.incr(key, delta=-time_per_request_m)
|
||||
raise MaxSleepTimeHit("Max Sleep Time Exceeded: %s" %
|
||||
need_to_sleep_m)
|
||||
|
||||
|
@ -36,15 +36,7 @@ class FakeMemcache(object):
|
||||
return True
|
||||
|
||||
def incr(self, key, delta=1, timeout=0):
|
||||
if delta < 0:
|
||||
raise "Cannot incr by a negative number"
|
||||
self.store[key] = int(self.store.setdefault(key, 0)) + delta
|
||||
return int(self.store[key])
|
||||
|
||||
def decr(self, key, delta=1, timeout=0):
|
||||
if delta < 0:
|
||||
raise "Cannot decr by a negative number"
|
||||
self.store[key] = int(self.store.setdefault(key, 0)) - delta
|
||||
self.store[key] = int(self.store.setdefault(key, 0)) + int(delta)
|
||||
if self.store[key] < 0:
|
||||
self.store[key] = 0
|
||||
return int(self.store[key])
|
||||
|
@ -98,6 +98,17 @@ class MockMemcached(object):
|
||||
self.outbuf += str(val[2]) + '\r\n'
|
||||
else:
|
||||
self.outbuf += 'NOT_FOUND\r\n'
|
||||
elif parts[0].lower() == 'decr':
|
||||
if parts[1] in self.cache:
|
||||
val = list(self.cache[parts[1]])
|
||||
if int(val[2]) - int(parts[2]) > 0:
|
||||
val[2] = str(int(val[2]) - int(parts[2]))
|
||||
else:
|
||||
val[2] = '0'
|
||||
self.cache[parts[1]] = val
|
||||
self.outbuf += str(val[2]) + '\r\n'
|
||||
else:
|
||||
self.outbuf += 'NOT_FOUND\r\n'
|
||||
def readline(self):
|
||||
if self.down:
|
||||
raise Exception('mock is down')
|
||||
@ -151,6 +162,10 @@ class TestMemcached(unittest.TestCase):
|
||||
self.assertEquals(memcache_client.get('some_key'), '10')
|
||||
memcache_client.incr('some_key', delta=1)
|
||||
self.assertEquals(memcache_client.get('some_key'), '11')
|
||||
memcache_client.incr('some_key', delta=-5)
|
||||
self.assertEquals(memcache_client.get('some_key'), '6')
|
||||
memcache_client.incr('some_key', delta=-15)
|
||||
self.assertEquals(memcache_client.get('some_key'), '0')
|
||||
|
||||
def test_retry(self):
|
||||
logging.getLogger().addHandler(NullLoggingHandler())
|
||||
|
Loading…
x
Reference in New Issue
Block a user