Use pipelines for stats keys
Pipelines buffer stats and then send them out in more reasonable sized chunks, helping to avoid small UDP packets going missing in a flood of stats. Use this in stats.py. This needs a slight change to the assertedStats handler to extract the combined stats. This function is ported from Zuul where we updated to handle pipeline stats (Id4f6f5a6cd66581a81299ed5c67a5c49c95c9b52) so it is not really new code. Change-Id: I3f68450c7164d1cf0f1f57f9a31e5dca2f72bc43
This commit is contained in:
parent
67824d8e64
commit
cd9aa75640
@ -79,9 +79,11 @@ class StatsReporter(object):
|
|||||||
keys.append('nodepool.launch.requestor.%s.%s' %
|
keys.append('nodepool.launch.requestor.%s.%s' %
|
||||||
(requestor, subkey))
|
(requestor, subkey))
|
||||||
|
|
||||||
|
pipeline = self._statsd.pipeline()
|
||||||
for key in keys:
|
for key in keys:
|
||||||
self._statsd.timing(key, dt)
|
pipeline.timing(key, dt)
|
||||||
self._statsd.incr(key)
|
pipeline.incr(key)
|
||||||
|
pipeline.send()
|
||||||
|
|
||||||
def updateNodeStats(self, zk_conn, provider):
|
def updateNodeStats(self, zk_conn, provider):
|
||||||
'''
|
'''
|
||||||
@ -123,11 +125,13 @@ class StatsReporter(object):
|
|||||||
else:
|
else:
|
||||||
states[key] = 1
|
states[key] = 1
|
||||||
|
|
||||||
|
pipeline = self._statsd.pipeline()
|
||||||
for key, count in states.items():
|
for key, count in states.items():
|
||||||
self._statsd.gauge(key, count)
|
pipeline.gauge(key, count)
|
||||||
|
|
||||||
# nodepool.provider.PROVIDER.max_servers
|
# nodepool.provider.PROVIDER.max_servers
|
||||||
key = 'nodepool.provider.%s.max_servers' % provider.name
|
key = 'nodepool.provider.%s.max_servers' % provider.name
|
||||||
max_servers = sum([p.max_servers for p in provider.pools.values()
|
max_servers = sum([p.max_servers for p in provider.pools.values()
|
||||||
if p.max_servers])
|
if p.max_servers])
|
||||||
self._statsd.gauge(key, max_servers)
|
pipeline.gauge(key, max_servers)
|
||||||
|
pipeline.send()
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
"""Common utilities used in testing"""
|
"""Common utilities used in testing"""
|
||||||
|
|
||||||
import glob
|
import glob
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
@ -235,19 +236,63 @@ class BaseTestCase(testtools.TestCase):
|
|||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
def assertReportedStat(self, key, value=None, kind=None):
|
def assertReportedStat(self, key, value=None, kind=None):
|
||||||
|
"""Check statsd output
|
||||||
|
|
||||||
|
Check statsd return values. A ``value`` should specify a
|
||||||
|
``kind``, however a ``kind`` may be specified without a
|
||||||
|
``value`` for a generic match. Leave both empy to just check
|
||||||
|
for key presence.
|
||||||
|
|
||||||
|
:arg str key: The statsd key
|
||||||
|
:arg str value: The expected value of the metric ``key``
|
||||||
|
:arg str kind: The expected type of the metric ``key`` For example
|
||||||
|
|
||||||
|
- ``c`` counter
|
||||||
|
- ``g`` gauge
|
||||||
|
- ``ms`` timing
|
||||||
|
- ``s`` set
|
||||||
|
"""
|
||||||
|
|
||||||
|
if value:
|
||||||
|
self.assertNotEqual(kind, None)
|
||||||
|
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while time.time() < (start + 5):
|
while time.time() < (start + 5):
|
||||||
for stat in self.statsd.stats:
|
# Note our fake statsd just queues up results in a queue.
|
||||||
k, v = stat.decode('utf8').split(':')
|
# We just keep going through them until we find one that
|
||||||
|
# matches, or fail out. If statsd pipelines are used,
|
||||||
|
# large single packets are sent with stats separated by
|
||||||
|
# newlines; thus we first flatten the stats out into
|
||||||
|
# single entries.
|
||||||
|
stats = itertools.chain.from_iterable(
|
||||||
|
[s.decode('utf-8').split('\n') for s in self.statsd.stats])
|
||||||
|
for stat in stats:
|
||||||
|
k, v = stat.split(':')
|
||||||
if key == k:
|
if key == k:
|
||||||
if value is None and kind is None:
|
if kind is None:
|
||||||
return
|
# key with no qualifiers is found
|
||||||
elif value:
|
return True
|
||||||
if value == v:
|
|
||||||
return
|
s_value, s_kind = v.split('|')
|
||||||
elif kind:
|
|
||||||
if v.endswith('|' + kind):
|
# if no kind match, look for other keys
|
||||||
return
|
if kind != s_kind:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if value:
|
||||||
|
# special-case value|ms because statsd can turn
|
||||||
|
# timing results into float of indeterminate
|
||||||
|
# length, hence foiling string matching.
|
||||||
|
if kind == 'ms':
|
||||||
|
if float(value) == float(s_value):
|
||||||
|
return True
|
||||||
|
if value == s_value:
|
||||||
|
return True
|
||||||
|
# otherwise keep looking for other matches
|
||||||
|
continue
|
||||||
|
|
||||||
|
# this key matches
|
||||||
|
return True
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
raise Exception("Key %s not found in reported stats" % key)
|
raise Exception("Key %s not found in reported stats" % key)
|
||||||
|
@ -79,8 +79,8 @@ class TestLauncher(tests.DBTestCase):
|
|||||||
)
|
)
|
||||||
self.zk.deleteNodeRequest(req)
|
self.zk.deleteNodeRequest(req)
|
||||||
self.waitForNodeRequestLockDeletion(req.id)
|
self.waitForNodeRequestLockDeletion(req.id)
|
||||||
self.assertReportedStat('nodepool.nodes.ready', '1|g')
|
self.assertReportedStat('nodepool.nodes.ready', value='1', kind='g')
|
||||||
self.assertReportedStat('nodepool.nodes.building', '0|g')
|
self.assertReportedStat('nodepool.nodes.building', value='0', kind='g')
|
||||||
|
|
||||||
def test_node_assignment_order(self):
|
def test_node_assignment_order(self):
|
||||||
"""Test that nodes are assigned in the order requested"""
|
"""Test that nodes are assigned in the order requested"""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user