From 34aae137faf9dc93e7776f3f1eef3eee15b88c6d Mon Sep 17 00:00:00 2001 From: Monty Taylor Date: Sun, 3 Mar 2019 15:26:49 +0000 Subject: [PATCH] Remove TaskManager and just use keystoneauth Support for concurrency and rate limiting has been added to keystoneauth, which is the library openstacksdk uses to talk to OpenStack. Instead of managing concurrency in nodepool using the TaskManager and pool of worker threads, let keystoneauth take over. This also means we no longer have a hook into the request process, so we defer statsd reporting to the openstacksdk layer as well. Change-Id: If21a10c56f43a121d30aa802f2c89d31df97f121 --- doc/source/operation.rst | 9 +- nodepool/driver/openstack/provider.py | 29 +++--- nodepool/task_manager.py | 99 ------------------- nodepool/tests/unit/test_sdk_integration.py | 13 --- ...ask-manager-replaced-12e4b3a0108f9358.yaml | 15 +++ requirements.txt | 5 +- 6 files changed, 34 insertions(+), 136 deletions(-) delete mode 100644 nodepool/task_manager.py create mode 100644 releasenotes/notes/task-manager-replaced-12e4b3a0108f9358.yaml diff --git a/doc/source/operation.rst b/doc/source/operation.rst index 6aa49442e..8b538e274 100644 --- a/doc/source/operation.rst +++ b/doc/source/operation.rst @@ -517,12 +517,11 @@ OpenStack API stats ~~~~~~~~~~~~~~~~~~~ Low level details on the timing of OpenStack API calls will be logged -by the API task manager. These calls are logged under +by ``openstacksdk``. These calls are logged under ``nodepool.task..``. The API call name is of the -generic format ```` transformed into a -CamelCase value with no deliminators; for example the -``compute.GET.servers`` call becomes ``ComputeGetServers`` and -``compute.POST.os-volumes_boot`` becomes ``ComputePostOsVolumesBoot``. +generic format ``..``. For example, the +``GET /servers`` call to the ``compute`` service becomes +``compute.GET.servers``. Since these calls reflect the internal operations of the ``openstacksdk``, the exact keys logged may vary across providers and diff --git a/nodepool/driver/openstack/provider.py b/nodepool/driver/openstack/provider.py index df7f670e9..bf765cac5 100755 --- a/nodepool/driver/openstack/provider.py +++ b/nodepool/driver/openstack/provider.py @@ -17,6 +17,7 @@ import copy import logging import operator +import os import time import openstack @@ -25,7 +26,6 @@ from nodepool import exceptions from nodepool.driver import Provider from nodepool.driver.utils import QuotaInformation from nodepool.nodeutils import iterate_timeout -from nodepool.task_manager import TaskManager from nodepool import stats from nodepool import version from nodepool import zk @@ -47,8 +47,6 @@ class OpenStackProvider(Provider): self._networks = {} self.__flavors = {} # TODO(gtema): caching self.__azs = None - self._use_taskmanager = use_taskmanager - self._taskmanager = None self._current_nodepool_quota = None self._zk = None self._down_ports = set() @@ -57,20 +55,14 @@ class OpenStackProvider(Provider): self._statsd = stats.get_client() def start(self, zk_conn): - if self._use_taskmanager: - self._taskmanager = TaskManager(self.provider.name, - self.provider.rate) - self._taskmanager.start() self.resetClient() self._zk = zk_conn def stop(self): - if self._taskmanager: - self._taskmanager.stop() + pass def join(self): - if self._taskmanager: - self._taskmanager.join() + pass def getRequestHandler(self, poolworker, request): return handler.OpenStackNodeRequestHandler(poolworker, request) @@ -83,13 +75,18 @@ class OpenStackProvider(Provider): return self.__flavors def _getClient(self): - if self._use_taskmanager: - manager = self._taskmanager - else: - manager = None + rate_limit = None + # nodepool tracks rate limit in time between requests. + # openstacksdk tracks rate limit in requests per second. + # 1/time = requests-per-second. + if self.provider.rate: + rate_limit = 1 / self.provider.rate return openstack.connection.Connection( config=self.provider.cloud_config, - task_manager=manager, + rate_limit=rate_limit, + statsd_host=os.getenv('STATSD_HOST', None), + statsd_port=os.getenv('STATSD_PORT ', None), + statsd_prefix='nodepool.task.{0}'.format(self.provider.name), app_name='nodepool', app_version=version.version_info.version_string() ) diff --git a/nodepool/task_manager.py b/nodepool/task_manager.py deleted file mode 100644 index 7d4c142ee..000000000 --- a/nodepool/task_manager.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python - -# Copyright (C) 2011-2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading -import logging -import re -import queue -import time - -from openstack import task_manager as openstack_task_manager - -from nodepool import stats - - -def _transform_task_name(task_name): - # Transform openstacksdk internal task name to something more - # suitable for sending to statsd for tracking; e.g. - # - # compute.DELETE.servers -> ComputeDeleteServers - # compute.POST.os-volumes_boot -> ComputePostOsVolumesBoot - parts = re.split('[.\-_]', task_name) - return "".join( - [part.lower().capitalize() for part in parts] - ) - - -class TaskManager(openstack_task_manager.TaskManager): - log = logging.getLogger("nodepool.TaskManager") - - def __init__(self, name, rate, workers=5): - super(TaskManager, self).__init__(name=name, workers=workers) - self.daemon = True - self.queue = queue.Queue() - self._running = True - self.rate = float(rate) - self.statsd = stats.get_client() - self._thread = threading.Thread(name=name, target=self.run) - self._thread.daemon = True - - def start(self): - self._thread.start() - - def stop(self): - self._running = False - self.queue.put(None) - - def join(self): - self._thread.join() - - def run(self): - last_ts = 0 - try: - while True: - task = self.queue.get() - if not task: - if not self._running: - break - continue - while True: - delta = time.time() - last_ts - if delta >= self.rate: - break - time.sleep(self.rate - delta) - self.log.debug("Manager %s running task %s (queue %s)" % - (self.name, - _transform_task_name(task.name), - self.queue.qsize())) - self.run_task(task) - self.queue.task_done() - except Exception: - self.log.exception("Task manager died.") - raise - - def post_run_task(self, elapsed_time, task): - task_name = _transform_task_name(task.name) - self.log.debug( - "Manager %s ran task %s in %ss" % - (self.name, task_name, elapsed_time)) - - if self.statsd: - # nodepool.task.PROVIDER.TASK_NAME - key = 'nodepool.task.%s.%s' % (self.name, task_name) - self.statsd.timing(key, int(elapsed_time * 1000)) - self.statsd.incr(key) diff --git a/nodepool/tests/unit/test_sdk_integration.py b/nodepool/tests/unit/test_sdk_integration.py index 105ff0062..97dfb748d 100644 --- a/nodepool/tests/unit/test_sdk_integration.py +++ b/nodepool/tests/unit/test_sdk_integration.py @@ -20,23 +20,10 @@ import yaml from nodepool import config as nodepool_config from nodepool import provider_manager -from nodepool import task_manager from nodepool import tests class TestShadeIntegration(tests.IntegrationTestCase): - def test_task_name_transformation(self): - t = task_manager._transform_task_name - self.assertEqual( - t('compute.DELETE.servers'), - 'ComputeDeleteServers') - self.assertEqual( - t('compute.POST.os-volumes_boot'), - 'ComputePostOsVolumesBoot') - self.assertEqual( - t('compute.GET.os-availability-zone'), - 'ComputeGetOsAvailabilityZone') - def _cleanup_cloud_config(self): os.remove(self.clouds_path) diff --git a/releasenotes/notes/task-manager-replaced-12e4b3a0108f9358.yaml b/releasenotes/notes/task-manager-replaced-12e4b3a0108f9358.yaml new file mode 100644 index 000000000..054ac18f7 --- /dev/null +++ b/releasenotes/notes/task-manager-replaced-12e4b3a0108f9358.yaml @@ -0,0 +1,15 @@ +--- +upgrade: + - | + The ``TaskManager`` used by the OpenStack provider has been removed. + The ``keystoneauth1`` library underneath ``openstacksdk`` has grown + support for rate limiting using a ``FairSemaphore`` instead of a pool + of worker threads. This should reduce the overall thread count. + - | + statsd key names have changed. Because of the removal of ``TaskManager`` + statsd calls are being deferred to openstacksdk. Instead of keys of the + form ``ComputeGetServers``, the openstacksdk keys are of the form + ``compute.GET.servers``. They will always start with the normalized + ``service-type``, followed by the HTTP verb, followed by a ``.`` separated + list of url segments. Any service version, project-id entries in the url + or ``.json`` suffixes will be removed. diff --git a/requirements.txt b/requirements.txt index 57ed5966d..3d188c482 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,9 +6,8 @@ python-daemon>=2.0.4,<2.1.0 extras statsd>=3.0 PrettyTable>=0.6,<0.8 -# openstacksdk before 0.21.0 has issues with dogpile.cache -# openstacksdk removes taskmanager in 0.27.0 -openstacksdk>=0.21.0,<0.27.0 +# openstacksdk before 0.27.0 is TaskManager based +openstacksdk>=0.27.0 diskimage-builder>=2.0.0 voluptuous kazoo