Memcached: add timing stats to set/get and other public functions
Change-Id: If6af519440fb444539e2526ea4dcca0ec0636388
This commit is contained in:
parent
87e9ca7a66
commit
b4124e0cd2
@ -57,7 +57,8 @@ from eventlet import Timeout
|
|||||||
from six.moves import range
|
from six.moves import range
|
||||||
from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import md5, human_readable, config_true_value
|
from swift.common.utils import md5, human_readable, config_true_value, \
|
||||||
|
memcached_timing_stats
|
||||||
|
|
||||||
DEFAULT_MEMCACHED_PORT = 11211
|
DEFAULT_MEMCACHED_PORT = 11211
|
||||||
|
|
||||||
@ -75,6 +76,11 @@ ERROR_LIMIT_COUNT = 10
|
|||||||
ERROR_LIMIT_TIME = ERROR_LIMIT_DURATION = 60
|
ERROR_LIMIT_TIME = ERROR_LIMIT_DURATION = 60
|
||||||
DEFAULT_ITEM_SIZE_WARNING_THRESHOLD = -1
|
DEFAULT_ITEM_SIZE_WARNING_THRESHOLD = -1
|
||||||
|
|
||||||
|
# Different sample rates for emitting Memcached timing stats.
|
||||||
|
TIMING_SAMPLE_RATE_HIGH = 0.1
|
||||||
|
TIMING_SAMPLE_RATE_MEDIUM = 0.01
|
||||||
|
TIMING_SAMPLE_RATE_LOW = 0.001
|
||||||
|
|
||||||
|
|
||||||
def md5hash(key):
|
def md5hash(key):
|
||||||
if not isinstance(key, bytes):
|
if not isinstance(key, bytes):
|
||||||
@ -292,6 +298,9 @@ class MemcacheRing(object):
|
|||||||
"""Returns a server connection to the pool."""
|
"""Returns a server connection to the pool."""
|
||||||
self._client_cache[server].put((fp, sock))
|
self._client_cache[server].put((fp, sock))
|
||||||
|
|
||||||
|
# Sample rates of different memcached operations are based on generic
|
||||||
|
# swift usage patterns.
|
||||||
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||||
def set(self, key, value, serialize=True, time=0,
|
def set(self, key, value, serialize=True, time=0,
|
||||||
min_compress_len=0, raise_on_error=False):
|
min_compress_len=0, raise_on_error=False):
|
||||||
"""
|
"""
|
||||||
@ -348,6 +357,7 @@ class MemcacheRing(object):
|
|||||||
raise MemcacheConnectionError(
|
raise MemcacheConnectionError(
|
||||||
"No memcached connections succeeded.")
|
"No memcached connections succeeded.")
|
||||||
|
|
||||||
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_MEDIUM)
|
||||||
def get(self, key, raise_on_error=False):
|
def get(self, key, raise_on_error=False):
|
||||||
"""
|
"""
|
||||||
Gets the object specified by key. It will also unserialize the object
|
Gets the object specified by key. It will also unserialize the object
|
||||||
@ -387,6 +397,7 @@ class MemcacheRing(object):
|
|||||||
raise MemcacheConnectionError(
|
raise MemcacheConnectionError(
|
||||||
"No memcached connections succeeded.")
|
"No memcached connections succeeded.")
|
||||||
|
|
||||||
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW)
|
||||||
def incr(self, key, delta=1, time=0):
|
def incr(self, key, delta=1, time=0):
|
||||||
"""
|
"""
|
||||||
Increments a key which has a numeric value by delta.
|
Increments a key which has a numeric value by delta.
|
||||||
@ -442,6 +453,7 @@ class MemcacheRing(object):
|
|||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(server, e, sock=sock, fp=fp)
|
||||||
raise MemcacheConnectionError("No Memcached connections succeeded.")
|
raise MemcacheConnectionError("No Memcached connections succeeded.")
|
||||||
|
|
||||||
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_LOW)
|
||||||
def decr(self, key, delta=1, time=0):
|
def decr(self, key, delta=1, time=0):
|
||||||
"""
|
"""
|
||||||
Decrements a key which has a numeric value by delta. Calls incr with
|
Decrements a key which has a numeric value by delta. Calls incr with
|
||||||
@ -457,6 +469,7 @@ class MemcacheRing(object):
|
|||||||
"""
|
"""
|
||||||
return self.incr(key, delta=-delta, time=time)
|
return self.incr(key, delta=-delta, time=time)
|
||||||
|
|
||||||
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||||
def delete(self, key, server_key=None):
|
def delete(self, key, server_key=None):
|
||||||
"""
|
"""
|
||||||
Deletes a key/value pair from memcache.
|
Deletes a key/value pair from memcache.
|
||||||
@ -478,6 +491,7 @@ class MemcacheRing(object):
|
|||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(server, e, sock=sock, fp=fp)
|
||||||
|
|
||||||
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||||
def set_multi(self, mapping, server_key, serialize=True, time=0,
|
def set_multi(self, mapping, server_key, serialize=True, time=0,
|
||||||
min_compress_len=0):
|
min_compress_len=0):
|
||||||
"""
|
"""
|
||||||
@ -518,6 +532,7 @@ class MemcacheRing(object):
|
|||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
self._exception_occurred(server, e, sock=sock, fp=fp)
|
self._exception_occurred(server, e, sock=sock, fp=fp)
|
||||||
|
|
||||||
|
@memcached_timing_stats(sample_rate=TIMING_SAMPLE_RATE_HIGH)
|
||||||
def get_multi(self, keys, server_key):
|
def get_multi(self, keys, server_key):
|
||||||
"""
|
"""
|
||||||
Gets multiple values from memcache for the given keys.
|
Gets multiple values from memcache for the given keys.
|
||||||
|
@ -2035,6 +2035,26 @@ def timing_stats(**dec_kwargs):
|
|||||||
return decorating_func
|
return decorating_func
|
||||||
|
|
||||||
|
|
||||||
|
def memcached_timing_stats(**dec_kwargs):
|
||||||
|
"""
|
||||||
|
Returns a decorator that logs timing events or errors for public methods in
|
||||||
|
MemcacheRing class, such as memcached set, get and etc.
|
||||||
|
"""
|
||||||
|
def decorating_func(func):
|
||||||
|
method = func.__name__
|
||||||
|
|
||||||
|
@functools.wraps(func)
|
||||||
|
def _timing_stats(cache, *args, **kwargs):
|
||||||
|
start_time = time.time()
|
||||||
|
result = func(cache, *args, **kwargs)
|
||||||
|
cache.logger.timing_since(
|
||||||
|
'memcached.' + method + '.timing', start_time, **dec_kwargs)
|
||||||
|
return result
|
||||||
|
|
||||||
|
return _timing_stats
|
||||||
|
return decorating_func
|
||||||
|
|
||||||
|
|
||||||
class SwiftLoggerAdapter(logging.LoggerAdapter):
|
class SwiftLoggerAdapter(logging.LoggerAdapter):
|
||||||
"""
|
"""
|
||||||
A logging.LoggerAdapter subclass that also passes through StatsD method
|
A logging.LoggerAdapter subclass that also passes through StatsD method
|
||||||
|
@ -38,6 +38,7 @@ from swiftclient import client, get_auth, ClientException
|
|||||||
from swift.proxy.controllers.base import get_cache_key
|
from swift.proxy.controllers.base import get_cache_key
|
||||||
from swift.proxy.controllers.obj import num_container_updates
|
from swift.proxy.controllers.obj import num_container_updates
|
||||||
from test import annotate_failure
|
from test import annotate_failure
|
||||||
|
from test.debug_logger import debug_logger
|
||||||
from test.probe import PROXY_BASE_URL
|
from test.probe import PROXY_BASE_URL
|
||||||
from test.probe.brain import BrainSplitter
|
from test.probe.brain import BrainSplitter
|
||||||
from test.probe.common import ReplProbeTest, get_server_number, \
|
from test.probe.common import ReplProbeTest, get_server_number, \
|
||||||
@ -126,7 +127,8 @@ class BaseTestContainerSharding(ReplProbeTest):
|
|||||||
self.init_brain(self.container_name)
|
self.init_brain(self.container_name)
|
||||||
self.sharders = Manager(['container-sharder'])
|
self.sharders = Manager(['container-sharder'])
|
||||||
self.internal_client = self.make_internal_client()
|
self.internal_client = self.make_internal_client()
|
||||||
self.memcache = MemcacheRing(['127.0.0.1:11211'])
|
self.logger = debug_logger('sharder-test')
|
||||||
|
self.memcache = MemcacheRing(['127.0.0.1:11211'], logger=self.logger)
|
||||||
self.container_replicators = Manager(['container-replicator'])
|
self.container_replicators = Manager(['container-replicator'])
|
||||||
|
|
||||||
def init_brain(self, container_name):
|
def init_brain(self, container_name):
|
||||||
|
@ -1048,6 +1048,56 @@ class TestMemcached(unittest.TestCase):
|
|||||||
# Let's do a big number
|
# Let's do a big number
|
||||||
do_test('1' * 2048576, 1000000, True)
|
do_test('1' * 2048576, 1000000, True)
|
||||||
|
|
||||||
|
def test_operations_timing_stats(self):
|
||||||
|
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'],
|
||||||
|
logger=self.logger)
|
||||||
|
mock = MockMemcached()
|
||||||
|
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
|
||||||
|
[(mock, mock)] * 2)
|
||||||
|
|
||||||
|
with patch('time.time',) as mock_time:
|
||||||
|
mock_time.return_value = 1000.99
|
||||||
|
memcache_client.set('some_key', [1, 2, 3])
|
||||||
|
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||||
|
self.assertEqual('memcached.set.timing', last_stats[0][0])
|
||||||
|
self.assertEqual(last_stats[0][1], 1000.99)
|
||||||
|
mock_time.return_value = 2000.99
|
||||||
|
self.assertEqual(memcache_client.get('some_key'), [1, 2, 3])
|
||||||
|
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||||
|
self.assertEqual('memcached.get.timing', last_stats[0][0])
|
||||||
|
self.assertEqual(last_stats[0][1], 2000.99)
|
||||||
|
mock_time.return_value = 3000.99
|
||||||
|
self.assertEqual(memcache_client.decr('decr_key', delta=5), 0)
|
||||||
|
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||||
|
self.assertEqual('memcached.decr.timing', last_stats[0][0])
|
||||||
|
self.assertEqual(last_stats[0][1], 3000.99)
|
||||||
|
mock_time.return_value = 4000.99
|
||||||
|
self.assertEqual(memcache_client.incr('decr_key', delta=5), 5)
|
||||||
|
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||||
|
self.assertEqual('memcached.incr.timing', last_stats[0][0])
|
||||||
|
self.assertEqual(last_stats[0][1], 4000.99)
|
||||||
|
mock_time.return_value = 5000.99
|
||||||
|
memcache_client.set_multi(
|
||||||
|
{'some_key1': [1, 2, 3], 'some_key2': [4, 5, 6]}, 'multi_key')
|
||||||
|
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||||
|
self.assertEqual('memcached.set_multi.timing', last_stats[0][0])
|
||||||
|
self.assertEqual(last_stats[0][1], 5000.99)
|
||||||
|
mock_time.return_value = 6000.99
|
||||||
|
self.assertEqual(
|
||||||
|
memcache_client.get_multi(
|
||||||
|
('some_key2', 'some_key1'),
|
||||||
|
'multi_key'),
|
||||||
|
[[4, 5, 6],
|
||||||
|
[1, 2, 3]])
|
||||||
|
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||||
|
self.assertEqual('memcached.get_multi.timing', last_stats[0][0])
|
||||||
|
self.assertEqual(last_stats[0][1], 6000.99)
|
||||||
|
mock_time.return_value = 7000.99
|
||||||
|
memcache_client.delete('some_key')
|
||||||
|
last_stats = self.logger.log_dict['timing_since'][-1]
|
||||||
|
self.assertEqual('memcached.delete.timing', last_stats[0][0])
|
||||||
|
self.assertEqual(last_stats[0][1], 7000.99)
|
||||||
|
|
||||||
|
|
||||||
class ExcConfigParser(object):
|
class ExcConfigParser(object):
|
||||||
|
|
||||||
|
@ -5763,6 +5763,40 @@ class TestStatsdLogging(unittest.TestCase):
|
|||||||
self.assertEqual(mock_controller.args[0], 'METHOD.errors.timing')
|
self.assertEqual(mock_controller.args[0], 'METHOD.errors.timing')
|
||||||
self.assertTrue(mock_controller.args[1] > 0)
|
self.assertTrue(mock_controller.args[1] > 0)
|
||||||
|
|
||||||
|
def test_memcached_timing_stats(self):
|
||||||
|
class MockMemcached(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.logger = self
|
||||||
|
self.args = ()
|
||||||
|
self.called = 'UNKNOWN'
|
||||||
|
|
||||||
|
def timing_since(self, *args):
|
||||||
|
self.called = 'timing'
|
||||||
|
self.args = args
|
||||||
|
|
||||||
|
@utils.memcached_timing_stats()
|
||||||
|
def set(cache):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@utils.memcached_timing_stats()
|
||||||
|
def get(cache):
|
||||||
|
pass
|
||||||
|
|
||||||
|
mock_cache = MockMemcached()
|
||||||
|
with patch('time.time',) as mock_time:
|
||||||
|
mock_time.return_value = 1000.99
|
||||||
|
set(mock_cache)
|
||||||
|
self.assertEqual(mock_cache.called, 'timing')
|
||||||
|
self.assertEqual(len(mock_cache.args), 2)
|
||||||
|
self.assertEqual(mock_cache.args[0], 'memcached.set.timing')
|
||||||
|
self.assertEqual(mock_cache.args[1], 1000.99)
|
||||||
|
mock_time.return_value = 2000.99
|
||||||
|
get(mock_cache)
|
||||||
|
self.assertEqual(mock_cache.called, 'timing')
|
||||||
|
self.assertEqual(len(mock_cache.args), 2)
|
||||||
|
self.assertEqual(mock_cache.args[0], 'memcached.get.timing')
|
||||||
|
self.assertEqual(mock_cache.args[1], 2000.99)
|
||||||
|
|
||||||
|
|
||||||
class UnsafeXrange(object):
|
class UnsafeXrange(object):
|
||||||
"""
|
"""
|
||||||
|
Loading…
Reference in New Issue
Block a user