diff --git a/nodepool/builder.py b/nodepool/builder.py index 15f2b2400..5ee0e8605 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -24,11 +24,11 @@ import traceback import gear import shlex -from stats import statsd import config as nodepool_config import exceptions import provider_manager +import stats MINS = 60 HOURS = 60 * MINS @@ -85,6 +85,7 @@ class NodePoolBuilder(object): self._start_lock = threading.Lock() self._gearman_worker = None self._config = None + self.statsd = stats.get_client() @property def running(self): @@ -306,12 +307,12 @@ class NodePoolBuilder(object): # It can take a _very_ long time for Rackspace 1.0 to save an image manager.waitForImage(external_id, IMAGE_TIMEOUT) - if statsd: + if self.statsd: dt = int((time.time() - start_time) * 1000) key = 'nodepool.image_update.%s.%s' % (image_name, provider.name) - statsd.timing(key, dt) - statsd.incr(key) + self.statsd.timing(key, dt) + self.statsd.incr(key) self.log.info("Image %s in %s is ready" % (image_id, provider.name)) @@ -403,8 +404,8 @@ class NodePoolBuilder(object): self.log.info("DIB image %s with file %s is built" % ( image_name, filename)) - if statsd: + if self.statsd: dt = int((time.time() - start_time) * 1000) key = 'nodepool.dib_image_build.%s' % diskimage.name - statsd.timing(key, dt) - statsd.incr(key) + self.statsd.timing(key, dt) + self.statsd.incr(key) diff --git a/nodepool/nodepool.py b/nodepool/nodepool.py index 9ac431622..18bac55d7 100644 --- a/nodepool/nodepool.py +++ b/nodepool/nodepool.py @@ -35,8 +35,7 @@ import jenkins_manager import nodedb import nodeutils as utils import provider_manager -from stats import statsd - +import stats import config as nodepool_config @@ -83,6 +82,7 @@ class NodeCompleteThread(threading.Thread): self.jobname = jobname self.result = result self.branch = branch + self.statsd = stats.get_client() def run(self): try: @@ -122,29 +122,29 @@ class NodeCompleteThread(threading.Thread): self.log.info("Node id: %s failed acceptance test, deleting" % node.id) - if statsd and self.result == 'SUCCESS': + if self.statsd and self.result == 'SUCCESS': start = node.state_time dt = int((time.time() - start) * 1000) # nodepool.job.tempest key = 'nodepool.job.%s' % self.jobname - statsd.timing(key + '.runtime', dt) - statsd.incr(key + '.builds') + self.statsd.timing(key + '.runtime', dt) + self.statsd.incr(key + '.builds') # nodepool.job.tempest.master key += '.%s' % self.branch - statsd.timing(key + '.runtime', dt) - statsd.incr(key + '.builds') + self.statsd.timing(key + '.runtime', dt) + self.statsd.incr(key + '.builds') # nodepool.job.tempest.master.devstack-precise key += '.%s' % node.label_name - statsd.timing(key + '.runtime', dt) - statsd.incr(key + '.builds') + self.statsd.timing(key + '.runtime', dt) + self.statsd.incr(key + '.builds') # nodepool.job.tempest.master.devstack-precise.rax-ord key += '.%s' % node.provider_name - statsd.timing(key + '.runtime', dt) - statsd.incr(key + '.builds') + self.statsd.timing(key + '.runtime', dt) + self.statsd.incr(key + '.builds') time.sleep(DELETE_DELAY) self.nodepool.deleteNode(node.id) @@ -843,6 +843,7 @@ class ImageUpdater(threading.Thread): self.scriptdir = self.nodepool.config.scriptdir self.elementsdir = self.nodepool.config.elementsdir self.imagesdir = self.nodepool.config.imagesdir + self.statsd = stats.get_client() def run(self): try: @@ -970,12 +971,12 @@ class SnapshotImageUpdater(ImageUpdater): raise Exception("Image %s for image id: %s status: %s" % (image_id, self.snap_image.id, image['status'])) - if statsd: + if self.statsd: dt = int((time.time() - start_time) * 1000) key = 'nodepool.image_update.%s.%s' % (self.image.name, self.provider.name) - statsd.timing(key, dt) - statsd.incr(key) + self.statsd.timing(key, dt) + self.statsd.incr(key) self.snap_image.state = nodedb.READY session.commit() @@ -1073,6 +1074,7 @@ class NodePool(threading.Thread): self.zmq_context = None self.gearman_client = None self.apsched = None + self.statsd = stats.get_client() self._delete_threads = {} self._delete_threads_lock = threading.Lock() self._image_delete_threads = {} @@ -1855,13 +1857,13 @@ class NodePool(threading.Thread): node.delete() self.log.info("Deleted node id: %s" % node.id) - if statsd: + if self.statsd: dt = int((time.time() - node.state_time) * 1000) key = 'nodepool.delete.%s.%s.%s' % (image_name, node.provider_name, node.target_name) - statsd.timing(key, dt) - statsd.incr(key) + self.statsd.timing(key, dt) + self.statsd.incr(key) self.updateStats(session, node.provider_name) def deleteImage(self, snap_image_id): @@ -2193,7 +2195,7 @@ class NodePool(threading.Thread): self.log.debug("Finished periodic check") def updateStats(self, session, provider_name): - if not statsd: + if not self.statsd: return # This may be called outside of the main thread. @@ -2239,16 +2241,16 @@ class NodePool(threading.Thread): states[key] += 1 for key, count in states.items(): - statsd.gauge(key, count) + self.statsd.gauge(key, count) #nodepool.provider.PROVIDER.max_servers for provider in self.config.providers.values(): key = 'nodepool.provider.%s.max_servers' % provider.name - statsd.gauge(key, provider.max_servers) + self.statsd.gauge(key, provider.max_servers) def launchStats(self, subkey, dt, image_name, provider_name, target_name, node_az): - if not statsd: + if not self.statsd: return #nodepool.launch.provider.PROVIDER.subkey #nodepool.launch.image.IMAGE.subkey @@ -2265,5 +2267,5 @@ class NodePool(threading.Thread): keys.append('nodepool.launch.provider.%s.%s.%s' % (provider_name, node_az, subkey)) for key in keys: - statsd.timing(key, dt) - statsd.incr(key) + self.statsd.timing(key, dt) + self.statsd.incr(key) diff --git a/nodepool/stats.py b/nodepool/stats.py index b926c70b1..772281df1 100644 --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -13,22 +13,28 @@ # under the License. """ -Import and set `statsd` if STATSD_HOST is present in the -environment, else set it to None. This mirrors the behaviour of old -releases of upstream statsd and avoids us having to change anything -else. +Helper to create a statsd client from environment variables """ import os import logging +import statsd log = logging.getLogger("nodepool.stats") -if os.getenv('STATSD_HOST', None): - from statsd.defaults.env import statsd - log.info("Statsd reporting to %s:%s" % - (os.getenv('STATSD_HOST'), - os.getenv('STATSD_PORT', '8125'))) -else: - log.info("Statsd reporting disabled") - statsd = None +def get_client(): + """Return a statsd client object setup from environment variables; or + None if they are not set + """ + + # note we're just being careful to let the default values fall + # through to StatsClient() + statsd_args = {} + if os.getenv('STATSD_HOST', None): + statsd_args['host'] = os.environ['STATSD_HOST'] + if os.getenv('STATSD_PORT', None): + statsd_args['port'] = os.environ['STATSD_PORT'] + if statsd_args: + return statsd.StatsClient(**statsd_args) + else: + return None diff --git a/nodepool/task_manager.py b/nodepool/task_manager.py index 48108d708..cf804a35d 100644 --- a/nodepool/task_manager.py +++ b/nodepool/task_manager.py @@ -21,9 +21,9 @@ import threading from six.moves import queue as Queue import logging import time -from stats import statsd import requests.exceptions +import stats class ManagerStoppedException(Exception): pass @@ -72,6 +72,7 @@ class TaskManager(threading.Thread): self.name = name self.rate = float(rate) self._client = None + self.statsd = stats.get_client() def stop(self): self._running = False @@ -99,12 +100,12 @@ class TaskManager(threading.Thread): dt = last_ts - start self.log.debug("Manager %s ran task %s in %ss" % (self.name, task, dt)) - if statsd: + if self.statsd: #nodepool.task.PROVIDER.subkey subkey = type(task).__name__ key = 'nodepool.task.%s.%s' % (self.name, subkey) - statsd.timing(key, int(dt * 1000)) - statsd.incr(key) + self.statsd.timing(key, int(dt * 1000)) + self.statsd.incr(key) self.queue.task_done() except Exception: