Add task management framework

Some shade clients, like nodepool, have a need to manage threads that
are doing API tasks and to manage rate limiting. That's not really a
thing that's appropriate for shade to know about, but because shade does
want to know the business logic, we need to provide a mechanism for
people to do that.

TaskManager is essentially nodepool.TaskManager except with
threading features removed and reworked to do instantaneous blocking
execution, since that's the behavior that most users will expect.

A patch will follow to move API calls to be manged by Tasks and the
shade.TaskManager. Once those are there, then nodepool can pass in its
TaskManager and the API operations in shade will naturally be managed by
the rate-limited operations in nodepool.

Co-Authored-By: James E. Blair <jeblair@openstack.org>
Change-Id: I60d25271de4009ee3f7f7684c72299fbd5d0f54f
This commit is contained in:
Monty Taylor 2015-03-28 19:16:09 -04:00
parent 1c756b8a57
commit fbac3c07a2
2 changed files with 92 additions and 0 deletions

View File

@ -42,6 +42,7 @@ import warnings
warnings.filterwarnings('ignore', 'Certificate has no `subjectAltName`')
from shade import meta
from shade import task_manager
__version__ = pbr.version.VersionInfo('shade').version_string()
OBJECT_MD5_KEY = 'x-object-meta-x-shade-md5'
@ -192,6 +193,10 @@ class OpenStackCloud(object):
(optional, defaults to dogpile.cache.null)
:param dict cache_arguments: Additional arguments to pass to the cache
constructor (optional, defaults to None)
:param TaskManager manager: Optional task manager to use for running
OpenStack API tasks. Unless you're doing
rate limiting client side, you almost
certainly don't need this. (optional)
"""
def __init__(self, cloud, auth,
@ -204,6 +209,7 @@ class OpenStackCloud(object):
debug=False, cache_interval=None,
cache_class='dogpile.cache.null',
cache_arguments=None,
manager=None,
**kwargs):
self.name = cloud
@ -213,6 +219,11 @@ class OpenStackCloud(object):
self.endpoint_type = endpoint_type
self.private = private
self.api_timeout = api_timeout
if manager is not None:
self.manager = manager
else:
self.manager = task_manager.TaskManager(
name=self.name, client=self)
self.service_types = _get_service_values(kwargs, 'service_type')
self.service_names = _get_service_values(kwargs, 'service_name')

81
shade/task_manager.py Normal file
View File

@ -0,0 +1,81 @@
#!/usr/bin/env python
# Copyright (C) 2011-2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import sys
import logging
import time
import six
@six.add_metaclass(abc.ABCMeta)
class Task(object):
def __init__(self, **kw):
self._exception = None
self._traceback = None
self._result = None
self.args = kw
@abc.abstractmethod
def main(self, client):
""" Override this method with the actual workload to be performed """
def done(self, result):
self._result = result
def exception(self, e, tb):
self._exception = e
self._traceback = tb
def wait(self):
if self._exception:
six.reraise(self._exception, None, self._traceback)
return self._result
def run(self, client):
try:
self.done(self.main(client))
except Exception as e:
self.exception(e, sys.exc_info()[2])
class TaskManager(object):
log = logging.getLogger("shade.TaskManager")
def __init__(self, client, name):
self.name = name
self._client = client
def stop(self):
""" This is a direct action passthrough TaskManager """
pass
def run(self):
""" This is a direct action passthrough TaskManager """
pass
def submitTask(self, task):
self.log.debug(
"Manager %s running task %s" % (self.name, type(task).__name__))
start = time.time()
task.run(self._client)
end = time.time()
self.log.debug(
"Manager %s ran task %s in %ss" % (self.name, task, (end - start)))
return task.wait()