Don't use global statsd
Some history to motivate the change : the original statsd 2.0 era worked by just doing "import statsd" and it setup a global "statsd" object based on env variables for you, or set it to None if they didn't exist. With statsd 3.0 this changed semantics slightly to provide default values, so to maintain the status-quo we added the "stats" wrapper (I4791c9d26f2309f78a556de42af5b9945005aa46) which did the same thing. However, having a global object set at import time like this isn't the best idea. For example, when running unit tests, you may want to set the statsd host to your fake logger, but if it is already setup at import time your unit test can't override it when it calls the various classes. It creates other ordering problems as we are splitting up nodepool into more components. Thus we move to creating a separate client in each object as it is instantiated. To maintain the existing behaviour of returning "None" if the env variables aren't set we keep it in stats.py behind a new function. All stats callers are modified to get a client in their __init__() See also: Ib84655378bdb7c7c3c66bf6187b462b3be2f908d -- similar changes for zuul Change-Id: I6d339a8c631f8508a60e9ef890173780157adefd
This commit is contained in:
parent
cdaba901d9
commit
d5a2bb2c05
@ -24,11 +24,11 @@ import traceback
|
|||||||
|
|
||||||
import gear
|
import gear
|
||||||
import shlex
|
import shlex
|
||||||
from stats import statsd
|
|
||||||
|
|
||||||
import config as nodepool_config
|
import config as nodepool_config
|
||||||
import exceptions
|
import exceptions
|
||||||
import provider_manager
|
import provider_manager
|
||||||
|
import stats
|
||||||
|
|
||||||
MINS = 60
|
MINS = 60
|
||||||
HOURS = 60 * MINS
|
HOURS = 60 * MINS
|
||||||
@ -85,6 +85,7 @@ class NodePoolBuilder(object):
|
|||||||
self._start_lock = threading.Lock()
|
self._start_lock = threading.Lock()
|
||||||
self._gearman_worker = None
|
self._gearman_worker = None
|
||||||
self._config = None
|
self._config = None
|
||||||
|
self.statsd = stats.get_client()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def running(self):
|
def running(self):
|
||||||
@ -306,12 +307,12 @@ class NodePoolBuilder(object):
|
|||||||
# It can take a _very_ long time for Rackspace 1.0 to save an image
|
# It can take a _very_ long time for Rackspace 1.0 to save an image
|
||||||
manager.waitForImage(external_id, IMAGE_TIMEOUT)
|
manager.waitForImage(external_id, IMAGE_TIMEOUT)
|
||||||
|
|
||||||
if statsd:
|
if self.statsd:
|
||||||
dt = int((time.time() - start_time) * 1000)
|
dt = int((time.time() - start_time) * 1000)
|
||||||
key = 'nodepool.image_update.%s.%s' % (image_name,
|
key = 'nodepool.image_update.%s.%s' % (image_name,
|
||||||
provider.name)
|
provider.name)
|
||||||
statsd.timing(key, dt)
|
self.statsd.timing(key, dt)
|
||||||
statsd.incr(key)
|
self.statsd.incr(key)
|
||||||
|
|
||||||
self.log.info("Image %s in %s is ready" % (image_id,
|
self.log.info("Image %s in %s is ready" % (image_id,
|
||||||
provider.name))
|
provider.name))
|
||||||
@ -403,8 +404,8 @@ class NodePoolBuilder(object):
|
|||||||
self.log.info("DIB image %s with file %s is built" % (
|
self.log.info("DIB image %s with file %s is built" % (
|
||||||
image_name, filename))
|
image_name, filename))
|
||||||
|
|
||||||
if statsd:
|
if self.statsd:
|
||||||
dt = int((time.time() - start_time) * 1000)
|
dt = int((time.time() - start_time) * 1000)
|
||||||
key = 'nodepool.dib_image_build.%s' % diskimage.name
|
key = 'nodepool.dib_image_build.%s' % diskimage.name
|
||||||
statsd.timing(key, dt)
|
self.statsd.timing(key, dt)
|
||||||
statsd.incr(key)
|
self.statsd.incr(key)
|
||||||
|
@ -35,8 +35,7 @@ import jenkins_manager
|
|||||||
import nodedb
|
import nodedb
|
||||||
import nodeutils as utils
|
import nodeutils as utils
|
||||||
import provider_manager
|
import provider_manager
|
||||||
from stats import statsd
|
import stats
|
||||||
|
|
||||||
import config as nodepool_config
|
import config as nodepool_config
|
||||||
|
|
||||||
|
|
||||||
@ -83,6 +82,7 @@ class NodeCompleteThread(threading.Thread):
|
|||||||
self.jobname = jobname
|
self.jobname = jobname
|
||||||
self.result = result
|
self.result = result
|
||||||
self.branch = branch
|
self.branch = branch
|
||||||
|
self.statsd = stats.get_client()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
@ -122,29 +122,29 @@ class NodeCompleteThread(threading.Thread):
|
|||||||
self.log.info("Node id: %s failed acceptance test, deleting" %
|
self.log.info("Node id: %s failed acceptance test, deleting" %
|
||||||
node.id)
|
node.id)
|
||||||
|
|
||||||
if statsd and self.result == 'SUCCESS':
|
if self.statsd and self.result == 'SUCCESS':
|
||||||
start = node.state_time
|
start = node.state_time
|
||||||
dt = int((time.time() - start) * 1000)
|
dt = int((time.time() - start) * 1000)
|
||||||
|
|
||||||
# nodepool.job.tempest
|
# nodepool.job.tempest
|
||||||
key = 'nodepool.job.%s' % self.jobname
|
key = 'nodepool.job.%s' % self.jobname
|
||||||
statsd.timing(key + '.runtime', dt)
|
self.statsd.timing(key + '.runtime', dt)
|
||||||
statsd.incr(key + '.builds')
|
self.statsd.incr(key + '.builds')
|
||||||
|
|
||||||
# nodepool.job.tempest.master
|
# nodepool.job.tempest.master
|
||||||
key += '.%s' % self.branch
|
key += '.%s' % self.branch
|
||||||
statsd.timing(key + '.runtime', dt)
|
self.statsd.timing(key + '.runtime', dt)
|
||||||
statsd.incr(key + '.builds')
|
self.statsd.incr(key + '.builds')
|
||||||
|
|
||||||
# nodepool.job.tempest.master.devstack-precise
|
# nodepool.job.tempest.master.devstack-precise
|
||||||
key += '.%s' % node.label_name
|
key += '.%s' % node.label_name
|
||||||
statsd.timing(key + '.runtime', dt)
|
self.statsd.timing(key + '.runtime', dt)
|
||||||
statsd.incr(key + '.builds')
|
self.statsd.incr(key + '.builds')
|
||||||
|
|
||||||
# nodepool.job.tempest.master.devstack-precise.rax-ord
|
# nodepool.job.tempest.master.devstack-precise.rax-ord
|
||||||
key += '.%s' % node.provider_name
|
key += '.%s' % node.provider_name
|
||||||
statsd.timing(key + '.runtime', dt)
|
self.statsd.timing(key + '.runtime', dt)
|
||||||
statsd.incr(key + '.builds')
|
self.statsd.incr(key + '.builds')
|
||||||
|
|
||||||
time.sleep(DELETE_DELAY)
|
time.sleep(DELETE_DELAY)
|
||||||
self.nodepool.deleteNode(node.id)
|
self.nodepool.deleteNode(node.id)
|
||||||
@ -843,6 +843,7 @@ class ImageUpdater(threading.Thread):
|
|||||||
self.scriptdir = self.nodepool.config.scriptdir
|
self.scriptdir = self.nodepool.config.scriptdir
|
||||||
self.elementsdir = self.nodepool.config.elementsdir
|
self.elementsdir = self.nodepool.config.elementsdir
|
||||||
self.imagesdir = self.nodepool.config.imagesdir
|
self.imagesdir = self.nodepool.config.imagesdir
|
||||||
|
self.statsd = stats.get_client()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
@ -970,12 +971,12 @@ class SnapshotImageUpdater(ImageUpdater):
|
|||||||
raise Exception("Image %s for image id: %s status: %s" %
|
raise Exception("Image %s for image id: %s status: %s" %
|
||||||
(image_id, self.snap_image.id, image['status']))
|
(image_id, self.snap_image.id, image['status']))
|
||||||
|
|
||||||
if statsd:
|
if self.statsd:
|
||||||
dt = int((time.time() - start_time) * 1000)
|
dt = int((time.time() - start_time) * 1000)
|
||||||
key = 'nodepool.image_update.%s.%s' % (self.image.name,
|
key = 'nodepool.image_update.%s.%s' % (self.image.name,
|
||||||
self.provider.name)
|
self.provider.name)
|
||||||
statsd.timing(key, dt)
|
self.statsd.timing(key, dt)
|
||||||
statsd.incr(key)
|
self.statsd.incr(key)
|
||||||
|
|
||||||
self.snap_image.state = nodedb.READY
|
self.snap_image.state = nodedb.READY
|
||||||
session.commit()
|
session.commit()
|
||||||
@ -1073,6 +1074,7 @@ class NodePool(threading.Thread):
|
|||||||
self.zmq_context = None
|
self.zmq_context = None
|
||||||
self.gearman_client = None
|
self.gearman_client = None
|
||||||
self.apsched = None
|
self.apsched = None
|
||||||
|
self.statsd = stats.get_client()
|
||||||
self._delete_threads = {}
|
self._delete_threads = {}
|
||||||
self._delete_threads_lock = threading.Lock()
|
self._delete_threads_lock = threading.Lock()
|
||||||
self._image_delete_threads = {}
|
self._image_delete_threads = {}
|
||||||
@ -1855,13 +1857,13 @@ class NodePool(threading.Thread):
|
|||||||
node.delete()
|
node.delete()
|
||||||
self.log.info("Deleted node id: %s" % node.id)
|
self.log.info("Deleted node id: %s" % node.id)
|
||||||
|
|
||||||
if statsd:
|
if self.statsd:
|
||||||
dt = int((time.time() - node.state_time) * 1000)
|
dt = int((time.time() - node.state_time) * 1000)
|
||||||
key = 'nodepool.delete.%s.%s.%s' % (image_name,
|
key = 'nodepool.delete.%s.%s.%s' % (image_name,
|
||||||
node.provider_name,
|
node.provider_name,
|
||||||
node.target_name)
|
node.target_name)
|
||||||
statsd.timing(key, dt)
|
self.statsd.timing(key, dt)
|
||||||
statsd.incr(key)
|
self.statsd.incr(key)
|
||||||
self.updateStats(session, node.provider_name)
|
self.updateStats(session, node.provider_name)
|
||||||
|
|
||||||
def deleteImage(self, snap_image_id):
|
def deleteImage(self, snap_image_id):
|
||||||
@ -2193,7 +2195,7 @@ class NodePool(threading.Thread):
|
|||||||
self.log.debug("Finished periodic check")
|
self.log.debug("Finished periodic check")
|
||||||
|
|
||||||
def updateStats(self, session, provider_name):
|
def updateStats(self, session, provider_name):
|
||||||
if not statsd:
|
if not self.statsd:
|
||||||
return
|
return
|
||||||
# This may be called outside of the main thread.
|
# This may be called outside of the main thread.
|
||||||
|
|
||||||
@ -2239,16 +2241,16 @@ class NodePool(threading.Thread):
|
|||||||
states[key] += 1
|
states[key] += 1
|
||||||
|
|
||||||
for key, count in states.items():
|
for key, count in states.items():
|
||||||
statsd.gauge(key, count)
|
self.statsd.gauge(key, count)
|
||||||
|
|
||||||
#nodepool.provider.PROVIDER.max_servers
|
#nodepool.provider.PROVIDER.max_servers
|
||||||
for provider in self.config.providers.values():
|
for provider in self.config.providers.values():
|
||||||
key = 'nodepool.provider.%s.max_servers' % provider.name
|
key = 'nodepool.provider.%s.max_servers' % provider.name
|
||||||
statsd.gauge(key, provider.max_servers)
|
self.statsd.gauge(key, provider.max_servers)
|
||||||
|
|
||||||
def launchStats(self, subkey, dt, image_name,
|
def launchStats(self, subkey, dt, image_name,
|
||||||
provider_name, target_name, node_az):
|
provider_name, target_name, node_az):
|
||||||
if not statsd:
|
if not self.statsd:
|
||||||
return
|
return
|
||||||
#nodepool.launch.provider.PROVIDER.subkey
|
#nodepool.launch.provider.PROVIDER.subkey
|
||||||
#nodepool.launch.image.IMAGE.subkey
|
#nodepool.launch.image.IMAGE.subkey
|
||||||
@ -2265,5 +2267,5 @@ class NodePool(threading.Thread):
|
|||||||
keys.append('nodepool.launch.provider.%s.%s.%s' %
|
keys.append('nodepool.launch.provider.%s.%s.%s' %
|
||||||
(provider_name, node_az, subkey))
|
(provider_name, node_az, subkey))
|
||||||
for key in keys:
|
for key in keys:
|
||||||
statsd.timing(key, dt)
|
self.statsd.timing(key, dt)
|
||||||
statsd.incr(key)
|
self.statsd.incr(key)
|
||||||
|
@ -13,22 +13,28 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Import and set `statsd` if STATSD_HOST is present in the
|
Helper to create a statsd client from environment variables
|
||||||
environment, else set it to None. This mirrors the behaviour of old
|
|
||||||
releases of upstream statsd and avoids us having to change anything
|
|
||||||
else.
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
import statsd
|
||||||
|
|
||||||
log = logging.getLogger("nodepool.stats")
|
log = logging.getLogger("nodepool.stats")
|
||||||
|
|
||||||
if os.getenv('STATSD_HOST', None):
|
def get_client():
|
||||||
from statsd.defaults.env import statsd
|
"""Return a statsd client object setup from environment variables; or
|
||||||
log.info("Statsd reporting to %s:%s" %
|
None if they are not set
|
||||||
(os.getenv('STATSD_HOST'),
|
"""
|
||||||
os.getenv('STATSD_PORT', '8125')))
|
|
||||||
else:
|
# note we're just being careful to let the default values fall
|
||||||
log.info("Statsd reporting disabled")
|
# through to StatsClient()
|
||||||
statsd = None
|
statsd_args = {}
|
||||||
|
if os.getenv('STATSD_HOST', None):
|
||||||
|
statsd_args['host'] = os.environ['STATSD_HOST']
|
||||||
|
if os.getenv('STATSD_PORT', None):
|
||||||
|
statsd_args['port'] = os.environ['STATSD_PORT']
|
||||||
|
if statsd_args:
|
||||||
|
return statsd.StatsClient(**statsd_args)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
@ -21,9 +21,9 @@ import threading
|
|||||||
from six.moves import queue as Queue
|
from six.moves import queue as Queue
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from stats import statsd
|
|
||||||
import requests.exceptions
|
import requests.exceptions
|
||||||
|
|
||||||
|
import stats
|
||||||
|
|
||||||
class ManagerStoppedException(Exception):
|
class ManagerStoppedException(Exception):
|
||||||
pass
|
pass
|
||||||
@ -72,6 +72,7 @@ class TaskManager(threading.Thread):
|
|||||||
self.name = name
|
self.name = name
|
||||||
self.rate = float(rate)
|
self.rate = float(rate)
|
||||||
self._client = None
|
self._client = None
|
||||||
|
self.statsd = stats.get_client()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._running = False
|
self._running = False
|
||||||
@ -99,12 +100,12 @@ class TaskManager(threading.Thread):
|
|||||||
dt = last_ts - start
|
dt = last_ts - start
|
||||||
self.log.debug("Manager %s ran task %s in %ss" %
|
self.log.debug("Manager %s ran task %s in %ss" %
|
||||||
(self.name, task, dt))
|
(self.name, task, dt))
|
||||||
if statsd:
|
if self.statsd:
|
||||||
#nodepool.task.PROVIDER.subkey
|
#nodepool.task.PROVIDER.subkey
|
||||||
subkey = type(task).__name__
|
subkey = type(task).__name__
|
||||||
key = 'nodepool.task.%s.%s' % (self.name, subkey)
|
key = 'nodepool.task.%s.%s' % (self.name, subkey)
|
||||||
statsd.timing(key, int(dt * 1000))
|
self.statsd.timing(key, int(dt * 1000))
|
||||||
statsd.incr(key)
|
self.statsd.incr(key)
|
||||||
|
|
||||||
self.queue.task_done()
|
self.queue.task_done()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user