From 4cf96b37919d8be90e3c75194d5af4a5a2a36101 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Thu, 11 Oct 2012 14:04:02 -0700 Subject: [PATCH] Honor sample_rate in StatsD logging. It's there to let administrators turn down the barrage of stats data that StatsD must cope with, but it wasn't actually honored. Worse, if the sample rate was set to e.g. 0.2, the stats would all be multiplied by its inverse, e.g. 2. This patch actually drops packets when sample_rate < 1, so you get correct measurements. Fortunately, the default sample rate is 1 (i.e. drop nothing), and multiplying by 1/1 doesn't change anything, so stats with the default sample rate of 1.0 are, and have been, just fine. Fixes bug 1065643. Also, make the two touched files compliant with pep8 v1.3.3. Change-Id: I66663144009ae4c9ee96f6a111745d8f5d2f5ca3 --- swift/common/utils.py | 30 ++++++---- test/unit/common/test_utils.py | 100 ++++++++++++++++++++++++--------- 2 files changed, 92 insertions(+), 38 deletions(-) diff --git a/swift/common/utils.py b/swift/common/utils.py index 4c4c14cd14..19db1b2bdf 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -23,7 +23,7 @@ import sys import time import functools from hashlib import md5 -from random import shuffle +from random import random, shuffle from urllib import quote from contextlib import contextmanager, closing import ctypes @@ -233,7 +233,7 @@ def drop_buffer_cache(fd, offset, length): _posix_fadvise = load_libc_function('posix_fadvise64') # 4 means "POSIX_FADV_DONTNEED" ret = _posix_fadvise(fd, ctypes.c_uint64(offset), - ctypes.c_uint64(length), 4) + ctypes.c_uint64(length), 4) if ret != 0: logging.warn("posix_fadvise64(%s, %s, %s, 4) -> %s" % (fd, offset, length, ret)) @@ -311,16 +311,17 @@ def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False): minsegs += 1 maxsegs += 1 count = len(segs) - if segs[0] or count < minsegs or count > maxsegs or \ - '' in segs[1:minsegs]: + if (segs[0] or count < minsegs or count > maxsegs or + '' in segs[1:minsegs]): raise ValueError('Invalid path: %s' % quote(path)) else: minsegs += 1 maxsegs += 1 segs = path.split('/', maxsegs) count = len(segs) - if segs[0] or count < minsegs or count > maxsegs + 1 or \ - '' in segs[1:minsegs] or (count == maxsegs + 1 and segs[maxsegs]): + if (segs[0] or count < minsegs or count > maxsegs + 1 or + '' in segs[1:minsegs] or + (count == maxsegs + 1 and segs[maxsegs])): raise ValueError('Invalid path: %s' % quote(path)) segs = segs[1:maxsegs] segs.extend([None] * (maxsegs - 1 - len(segs))) @@ -407,6 +408,7 @@ class StatsdClient(object): self.set_prefix(tail_prefix) self._default_sample_rate = default_sample_rate self._target = (self._host, self._port) + self.random = random def set_prefix(self, new_prefix): if new_prefix and self._base_prefix: @@ -423,12 +425,18 @@ class StatsdClient(object): sample_rate = self._default_sample_rate parts = ['%s%s:%s' % (self._prefix, m_name, m_value), m_type] if sample_rate < 1: - parts.append('@%s' % (sample_rate,)) + if self.random() < sample_rate: + parts.append('@%s' % (sample_rate,)) + else: + return # Ideally, we'd cache a sending socket in self, but that # results in a socket getting shared by multiple green threads. - with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: + with closing(self._open_socket()) as sock: return sock.sendto('|'.join(parts), self._target) + def _open_socket(self): + return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + def update_stats(self, m_name, m_value, sample_rate=None): return self._send(m_name, m_value, 'c', sample_rate) @@ -587,10 +595,10 @@ class SwiftLogFormatter(logging.Formatter): def format(self, record): msg = logging.Formatter.format(self, record) if (record.txn_id and record.levelno != logging.INFO and - record.txn_id not in msg): + record.txn_id not in msg): msg = "%s (txn: %s)" % (msg, record.txn_id) if (record.client_ip and record.levelno != logging.INFO and - record.client_ip not in msg): + record.client_ip not in msg): msg = "%s (client_ip: %s)" % (msg, record.client_ip) return msg @@ -1084,7 +1092,7 @@ def readconf(conffile, section_name=None, log_name=None, defaults=None, conf = dict(c.items(section_name)) else: print _("Unable to find %s config section in %s") % \ - (section_name, conffile) + (section_name, conffile) sys.exit(1) if "log_name" not in conf: if log_name is not None: diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 1d0b2f3fa6..8274fe3c89 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -21,6 +21,7 @@ import errno import logging import mimetools import os +import random import re import socket import sys @@ -36,8 +37,8 @@ from tempfile import TemporaryFile, NamedTemporaryFile from eventlet import sleep -from swift.common.exceptions import Timeout, MessageTimeout, \ - ConnectionTimeout +from swift.common.exceptions import (Timeout, MessageTimeout, + ConnectionTimeout) from swift.common import utils @@ -82,6 +83,17 @@ class MockOs(): return getattr(os, name) +class MockUdpSocket(): + def __init__(self): + self.sent = [] + + def sendto(self, data, target): + self.sent.append((data, target)) + + def close(self): + pass + + class MockSys(): def __init__(self): @@ -197,21 +209,30 @@ class TestUtils(unittest.TestCase): def test_validate_device_partition(self): """ Test swift.common.utils.validate_device_partition """ utils.validate_device_partition('foo', 'bar') - self.assertRaises(ValueError, utils.validate_device_partition, '', '') - self.assertRaises(ValueError, utils.validate_device_partition, '', 'foo') - self.assertRaises(ValueError, utils.validate_device_partition, 'foo', '') - self.assertRaises(ValueError, utils.validate_device_partition, 'foo/bar', 'foo') - self.assertRaises(ValueError, utils.validate_device_partition, 'foo', 'foo/bar') - self.assertRaises(ValueError, utils.validate_device_partition, '.', 'foo') - self.assertRaises(ValueError, utils.validate_device_partition, '..', 'foo') - self.assertRaises(ValueError, utils.validate_device_partition, 'foo', '.') - self.assertRaises(ValueError, utils.validate_device_partition, 'foo', '..') + self.assertRaises(ValueError, + utils.validate_device_partition, '', '') + self.assertRaises(ValueError, + utils.validate_device_partition, '', 'foo') + self.assertRaises(ValueError, + utils.validate_device_partition, 'foo', '') + self.assertRaises(ValueError, + utils.validate_device_partition, 'foo/bar', 'foo') + self.assertRaises(ValueError, + utils.validate_device_partition, 'foo', 'foo/bar') + self.assertRaises(ValueError, + utils.validate_device_partition, '.', 'foo') + self.assertRaises(ValueError, + utils.validate_device_partition, '..', 'foo') + self.assertRaises(ValueError, + utils.validate_device_partition, 'foo', '.') + self.assertRaises(ValueError, + utils.validate_device_partition, 'foo', '..') try: - utils.validate_device_partition,('o\nn e', 'foo') + utils.validate_device_partition('o\nn e', 'foo') except ValueError, err: self.assertEquals(str(err), 'Invalid device: o%0An%20e') try: - utils.validate_device_partition,('foo', 'o\nn e') + utils.validate_device_partition('foo', 'o\nn e') except ValueError, err: self.assertEquals(str(err), 'Invalid partition: o%0An%20e') @@ -243,21 +264,21 @@ class TestUtils(unittest.TestCase): self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n') print >> sys.stderr, 'test6' self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n' - 'STDOUT: test6\n') + 'STDOUT: test6\n') sys.stderr = orig_stderr print 'test8' self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n' - 'STDOUT: test6\n') + 'STDOUT: test6\n') lfo.writelines(['a', 'b', 'c']) self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n' - 'STDOUT: test6\nSTDOUT: a#012b#012c\n') + 'STDOUT: test6\nSTDOUT: a#012b#012c\n') lfo.close() lfo.write('d') self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n' - 'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n') + 'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n') lfo.flush() self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n' - 'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n') + 'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n') got_exc = False try: for line in lfo: @@ -503,7 +524,7 @@ class TestUtils(unittest.TestCase): def test_storage_directory(self): self.assertEquals(utils.storage_directory('objects', '1', 'ABCDEF'), - 'objects/1/DEF/ABCDEF') + 'objects/1/DEF/ABCDEF') def test_whataremyips(self): myips = utils.whataremyips() @@ -522,7 +543,8 @@ class TestUtils(unittest.TestCase): self.assertEquals(utils.hash_path('a', 'c', 'o', raw_digest=False), '06fbf0b514e5199dfc4e00f42eb5ea83') self.assertEquals(utils.hash_path('a', 'c', 'o', raw_digest=True), - '\x06\xfb\xf0\xb5\x14\xe5\x19\x9d\xfcN\x00\xf4.\xb5\xea\x83') + '\x06\xfb\xf0\xb5\x14\xe5\x19\x9d\xfcN' + '\x00\xf4.\xb5\xea\x83') self.assertRaises(ValueError, utils.hash_path, 'a', object='o') def test_load_libc_function(self): @@ -870,9 +892,9 @@ log_name = %(yarr)s''' 'http://1.1.1.1/v1/a/c/o?query=param', 'http://1.1.1.1/v1/a/c/o?query=param#frag', 'http://1.1.1.2/v1/a/c/o'): - self.assertNotEquals(utils.validate_sync_to(badurl, - ['1.1.1.1', '2.2.2.2']), - None) + self.assertNotEquals( + utils.validate_sync_to(badurl, ['1.1.1.1', '2.2.2.2']), + None) def test_TRUE_VALUES(self): for v in utils.TRUE_VALUES: @@ -906,7 +928,8 @@ class TestStatsdLogging(unittest.TestCase): logger = utils.get_logger({'log_statsd_host': 'some.host.com'}, 'some-name', log_route='some-route') # white-box construction validation - self.assert_(isinstance(logger.logger.statsd_client, utils.StatsdClient)) + self.assert_(isinstance(logger.logger.statsd_client, + utils.StatsdClient)) self.assertEqual(logger.logger.statsd_client._host, 'some.host.com') self.assertEqual(logger.logger.statsd_client._port, 8125) self.assertEqual(logger.logger.statsd_client._prefix, 'some-name.') @@ -934,7 +957,29 @@ class TestStatsdLogging(unittest.TestCase): self.assertEqual(logger.logger.statsd_client._prefix, 'tomato.sauce.') self.assertEqual(logger.logger.statsd_client._host, 'another.host.com') self.assertEqual(logger.logger.statsd_client._port, 9876) - self.assertEqual(logger.logger.statsd_client._default_sample_rate, 0.75) + self.assertEqual(logger.logger.statsd_client._default_sample_rate, + 0.75) + + def test_sample_rates(self): + logger = utils.get_logger({'log_statsd_host': 'some.host.com'}) + + mock_socket = MockUdpSocket() + # encapsulation? what's that? + statsd_client = logger.logger.statsd_client + self.assertTrue(statsd_client.random is random.random) + + statsd_client._open_socket = lambda *_: mock_socket + statsd_client.random = lambda: 0.50001 + + logger.increment('tribbles', sample_rate=0.5) + self.assertEqual(len(mock_socket.sent), 0) + + statsd_client.random = lambda: 0.49999 + logger.increment('tribbles', sample_rate=0.5) + self.assertEqual(len(mock_socket.sent), 1) + + payload = mock_socket.sent[0][0] + self.assertTrue(payload.endswith("|@0.5")) class TestStatsdLoggingDelegation(unittest.TestCase): @@ -1003,7 +1048,8 @@ class TestStatsdLoggingDelegation(unittest.TestCase): # Delegate methods are no-ops self.assertEqual(None, logger.update_stats('foo', 88)) self.assertEqual(None, logger.update_stats('foo', 88, 0.57)) - self.assertEqual(None, logger.update_stats('foo', 88, sample_rate=0.61)) + self.assertEqual(None, logger.update_stats('foo', 88, + sample_rate=0.61)) self.assertEqual(None, logger.increment('foo')) self.assertEqual(None, logger.increment('foo', 0.57)) self.assertEqual(None, logger.increment('foo', sample_rate=0.61)) @@ -1175,7 +1221,7 @@ class TestStatsdLoggingDelegation(unittest.TestCase): self.assertEquals(logger.thread_locals, ('5678', '5.6.7.8')) finally: logger.thread_locals = orig_thread_locals - + if __name__ == '__main__': unittest.main()