Remove TaskManager and just use keystoneauth
Support for concurrency and rate limiting has been added to keystoneauth, which is the library openstacksdk uses to talk to OpenStack. Instead of managing concurrency in nodepool using the TaskManager and pool of worker threads, let keystoneauth take over. This also means we no longer have a hook into the request process, so we defer statsd reporting to the openstacksdk layer as well. Change-Id: If21a10c56f43a121d30aa802f2c89d31df97f121
This commit is contained in:
parent
f733e9c4d4
commit
34aae137fa
@ -517,12 +517,11 @@ OpenStack API stats
|
|||||||
~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
Low level details on the timing of OpenStack API calls will be logged
|
Low level details on the timing of OpenStack API calls will be logged
|
||||||
by the API task manager. These calls are logged under
|
by ``openstacksdk``. These calls are logged under
|
||||||
``nodepool.task.<provider>.<api-call>``. The API call name is of the
|
``nodepool.task.<provider>.<api-call>``. The API call name is of the
|
||||||
generic format ``<endpoint><method><operation>`` transformed into a
|
generic format ``<service-type>.<method>.<operation>``. For example, the
|
||||||
CamelCase value with no deliminators; for example the
|
``GET /servers`` call to the ``compute`` service becomes
|
||||||
``compute.GET.servers`` call becomes ``ComputeGetServers`` and
|
``compute.GET.servers``.
|
||||||
``compute.POST.os-volumes_boot`` becomes ``ComputePostOsVolumesBoot``.
|
|
||||||
|
|
||||||
Since these calls reflect the internal operations of the
|
Since these calls reflect the internal operations of the
|
||||||
``openstacksdk``, the exact keys logged may vary across providers and
|
``openstacksdk``, the exact keys logged may vary across providers and
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
import copy
|
import copy
|
||||||
import logging
|
import logging
|
||||||
import operator
|
import operator
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import openstack
|
import openstack
|
||||||
@ -25,7 +26,6 @@ from nodepool import exceptions
|
|||||||
from nodepool.driver import Provider
|
from nodepool.driver import Provider
|
||||||
from nodepool.driver.utils import QuotaInformation
|
from nodepool.driver.utils import QuotaInformation
|
||||||
from nodepool.nodeutils import iterate_timeout
|
from nodepool.nodeutils import iterate_timeout
|
||||||
from nodepool.task_manager import TaskManager
|
|
||||||
from nodepool import stats
|
from nodepool import stats
|
||||||
from nodepool import version
|
from nodepool import version
|
||||||
from nodepool import zk
|
from nodepool import zk
|
||||||
@ -47,8 +47,6 @@ class OpenStackProvider(Provider):
|
|||||||
self._networks = {}
|
self._networks = {}
|
||||||
self.__flavors = {} # TODO(gtema): caching
|
self.__flavors = {} # TODO(gtema): caching
|
||||||
self.__azs = None
|
self.__azs = None
|
||||||
self._use_taskmanager = use_taskmanager
|
|
||||||
self._taskmanager = None
|
|
||||||
self._current_nodepool_quota = None
|
self._current_nodepool_quota = None
|
||||||
self._zk = None
|
self._zk = None
|
||||||
self._down_ports = set()
|
self._down_ports = set()
|
||||||
@ -57,20 +55,14 @@ class OpenStackProvider(Provider):
|
|||||||
self._statsd = stats.get_client()
|
self._statsd = stats.get_client()
|
||||||
|
|
||||||
def start(self, zk_conn):
|
def start(self, zk_conn):
|
||||||
if self._use_taskmanager:
|
|
||||||
self._taskmanager = TaskManager(self.provider.name,
|
|
||||||
self.provider.rate)
|
|
||||||
self._taskmanager.start()
|
|
||||||
self.resetClient()
|
self.resetClient()
|
||||||
self._zk = zk_conn
|
self._zk = zk_conn
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self._taskmanager:
|
pass
|
||||||
self._taskmanager.stop()
|
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
if self._taskmanager:
|
pass
|
||||||
self._taskmanager.join()
|
|
||||||
|
|
||||||
def getRequestHandler(self, poolworker, request):
|
def getRequestHandler(self, poolworker, request):
|
||||||
return handler.OpenStackNodeRequestHandler(poolworker, request)
|
return handler.OpenStackNodeRequestHandler(poolworker, request)
|
||||||
@ -83,13 +75,18 @@ class OpenStackProvider(Provider):
|
|||||||
return self.__flavors
|
return self.__flavors
|
||||||
|
|
||||||
def _getClient(self):
|
def _getClient(self):
|
||||||
if self._use_taskmanager:
|
rate_limit = None
|
||||||
manager = self._taskmanager
|
# nodepool tracks rate limit in time between requests.
|
||||||
else:
|
# openstacksdk tracks rate limit in requests per second.
|
||||||
manager = None
|
# 1/time = requests-per-second.
|
||||||
|
if self.provider.rate:
|
||||||
|
rate_limit = 1 / self.provider.rate
|
||||||
return openstack.connection.Connection(
|
return openstack.connection.Connection(
|
||||||
config=self.provider.cloud_config,
|
config=self.provider.cloud_config,
|
||||||
task_manager=manager,
|
rate_limit=rate_limit,
|
||||||
|
statsd_host=os.getenv('STATSD_HOST', None),
|
||||||
|
statsd_port=os.getenv('STATSD_PORT ', None),
|
||||||
|
statsd_prefix='nodepool.task.{0}'.format(self.provider.name),
|
||||||
app_name='nodepool',
|
app_name='nodepool',
|
||||||
app_version=version.version_info.version_string()
|
app_version=version.version_info.version_string()
|
||||||
)
|
)
|
||||||
|
@ -1,99 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
# Copyright (C) 2011-2013 OpenStack Foundation
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
||||||
# implied.
|
|
||||||
#
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
import threading
|
|
||||||
import logging
|
|
||||||
import re
|
|
||||||
import queue
|
|
||||||
import time
|
|
||||||
|
|
||||||
from openstack import task_manager as openstack_task_manager
|
|
||||||
|
|
||||||
from nodepool import stats
|
|
||||||
|
|
||||||
|
|
||||||
def _transform_task_name(task_name):
|
|
||||||
# Transform openstacksdk internal task name to something more
|
|
||||||
# suitable for sending to statsd for tracking; e.g.
|
|
||||||
#
|
|
||||||
# compute.DELETE.servers -> ComputeDeleteServers
|
|
||||||
# compute.POST.os-volumes_boot -> ComputePostOsVolumesBoot
|
|
||||||
parts = re.split('[.\-_]', task_name)
|
|
||||||
return "".join(
|
|
||||||
[part.lower().capitalize() for part in parts]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TaskManager(openstack_task_manager.TaskManager):
|
|
||||||
log = logging.getLogger("nodepool.TaskManager")
|
|
||||||
|
|
||||||
def __init__(self, name, rate, workers=5):
|
|
||||||
super(TaskManager, self).__init__(name=name, workers=workers)
|
|
||||||
self.daemon = True
|
|
||||||
self.queue = queue.Queue()
|
|
||||||
self._running = True
|
|
||||||
self.rate = float(rate)
|
|
||||||
self.statsd = stats.get_client()
|
|
||||||
self._thread = threading.Thread(name=name, target=self.run)
|
|
||||||
self._thread.daemon = True
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
self._thread.start()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self._running = False
|
|
||||||
self.queue.put(None)
|
|
||||||
|
|
||||||
def join(self):
|
|
||||||
self._thread.join()
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
last_ts = 0
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
task = self.queue.get()
|
|
||||||
if not task:
|
|
||||||
if not self._running:
|
|
||||||
break
|
|
||||||
continue
|
|
||||||
while True:
|
|
||||||
delta = time.time() - last_ts
|
|
||||||
if delta >= self.rate:
|
|
||||||
break
|
|
||||||
time.sleep(self.rate - delta)
|
|
||||||
self.log.debug("Manager %s running task %s (queue %s)" %
|
|
||||||
(self.name,
|
|
||||||
_transform_task_name(task.name),
|
|
||||||
self.queue.qsize()))
|
|
||||||
self.run_task(task)
|
|
||||||
self.queue.task_done()
|
|
||||||
except Exception:
|
|
||||||
self.log.exception("Task manager died.")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def post_run_task(self, elapsed_time, task):
|
|
||||||
task_name = _transform_task_name(task.name)
|
|
||||||
self.log.debug(
|
|
||||||
"Manager %s ran task %s in %ss" %
|
|
||||||
(self.name, task_name, elapsed_time))
|
|
||||||
|
|
||||||
if self.statsd:
|
|
||||||
# nodepool.task.PROVIDER.TASK_NAME
|
|
||||||
key = 'nodepool.task.%s.%s' % (self.name, task_name)
|
|
||||||
self.statsd.timing(key, int(elapsed_time * 1000))
|
|
||||||
self.statsd.incr(key)
|
|
@ -20,23 +20,10 @@ import yaml
|
|||||||
|
|
||||||
from nodepool import config as nodepool_config
|
from nodepool import config as nodepool_config
|
||||||
from nodepool import provider_manager
|
from nodepool import provider_manager
|
||||||
from nodepool import task_manager
|
|
||||||
from nodepool import tests
|
from nodepool import tests
|
||||||
|
|
||||||
|
|
||||||
class TestShadeIntegration(tests.IntegrationTestCase):
|
class TestShadeIntegration(tests.IntegrationTestCase):
|
||||||
def test_task_name_transformation(self):
|
|
||||||
t = task_manager._transform_task_name
|
|
||||||
self.assertEqual(
|
|
||||||
t('compute.DELETE.servers'),
|
|
||||||
'ComputeDeleteServers')
|
|
||||||
self.assertEqual(
|
|
||||||
t('compute.POST.os-volumes_boot'),
|
|
||||||
'ComputePostOsVolumesBoot')
|
|
||||||
self.assertEqual(
|
|
||||||
t('compute.GET.os-availability-zone'),
|
|
||||||
'ComputeGetOsAvailabilityZone')
|
|
||||||
|
|
||||||
def _cleanup_cloud_config(self):
|
def _cleanup_cloud_config(self):
|
||||||
os.remove(self.clouds_path)
|
os.remove(self.clouds_path)
|
||||||
|
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
---
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
The ``TaskManager`` used by the OpenStack provider has been removed.
|
||||||
|
The ``keystoneauth1`` library underneath ``openstacksdk`` has grown
|
||||||
|
support for rate limiting using a ``FairSemaphore`` instead of a pool
|
||||||
|
of worker threads. This should reduce the overall thread count.
|
||||||
|
- |
|
||||||
|
statsd key names have changed. Because of the removal of ``TaskManager``
|
||||||
|
statsd calls are being deferred to openstacksdk. Instead of keys of the
|
||||||
|
form ``ComputeGetServers``, the openstacksdk keys are of the form
|
||||||
|
``compute.GET.servers``. They will always start with the normalized
|
||||||
|
``service-type``, followed by the HTTP verb, followed by a ``.`` separated
|
||||||
|
list of url segments. Any service version, project-id entries in the url
|
||||||
|
or ``.json`` suffixes will be removed.
|
@ -6,9 +6,8 @@ python-daemon>=2.0.4,<2.1.0
|
|||||||
extras
|
extras
|
||||||
statsd>=3.0
|
statsd>=3.0
|
||||||
PrettyTable>=0.6,<0.8
|
PrettyTable>=0.6,<0.8
|
||||||
# openstacksdk before 0.21.0 has issues with dogpile.cache
|
# openstacksdk before 0.27.0 is TaskManager based
|
||||||
# openstacksdk removes taskmanager in 0.27.0
|
openstacksdk>=0.27.0
|
||||||
openstacksdk>=0.21.0,<0.27.0
|
|
||||||
diskimage-builder>=2.0.0
|
diskimage-builder>=2.0.0
|
||||||
voluptuous
|
voluptuous
|
||||||
kazoo
|
kazoo
|
||||||
|
Loading…
Reference in New Issue
Block a user