Update latest OSLO code
Change-Id: I804d1eae92e89740339546f0d0f490a3e4f21204
This commit is contained in:
parent
765baf8532
commit
15a1445583
@ -31,7 +31,7 @@ class PluginRpcDispatcher(dispatcher.RpcDispatcher):
|
|||||||
def __init__(self, callbacks):
|
def __init__(self, callbacks):
|
||||||
super(PluginRpcDispatcher, self).__init__(callbacks)
|
super(PluginRpcDispatcher, self).__init__(callbacks)
|
||||||
|
|
||||||
def dispatch(self, rpc_ctxt, version, method, **kwargs):
|
def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs):
|
||||||
rpc_ctxt_dict = rpc_ctxt.to_dict()
|
rpc_ctxt_dict = rpc_ctxt.to_dict()
|
||||||
user_id = rpc_ctxt_dict.pop('user_id', None)
|
user_id = rpc_ctxt_dict.pop('user_id', None)
|
||||||
if not user_id:
|
if not user_id:
|
||||||
@ -41,4 +41,4 @@ class PluginRpcDispatcher(dispatcher.RpcDispatcher):
|
|||||||
tenant_id = rpc_ctxt_dict.pop('project_id', None)
|
tenant_id = rpc_ctxt_dict.pop('project_id', None)
|
||||||
quantum_ctxt = context.Context(user_id, tenant_id, **rpc_ctxt_dict)
|
quantum_ctxt = context.Context(user_id, tenant_id, **rpc_ctxt_dict)
|
||||||
return super(PluginRpcDispatcher, self).dispatch(
|
return super(PluginRpcDispatcher, self).dispatch(
|
||||||
quantum_ctxt, version, method, **kwargs)
|
quantum_ctxt, version, method, namespace, **kwargs)
|
||||||
|
@ -23,11 +23,12 @@ context or provide additional information in their specific WSGI pipeline.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import itertools
|
import itertools
|
||||||
import uuid
|
|
||||||
|
from quantum.openstack.common import uuidutils
|
||||||
|
|
||||||
|
|
||||||
def generate_request_id():
|
def generate_request_id():
|
||||||
return 'req-' + str(uuid.uuid4())
|
return 'req-%s' % uuidutils.generate_uuid()
|
||||||
|
|
||||||
|
|
||||||
class RequestContext(object):
|
class RequestContext(object):
|
||||||
|
@ -24,10 +24,27 @@ Usual usage in an openstack.common module:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import gettext
|
import gettext
|
||||||
|
import os
|
||||||
|
|
||||||
|
_localedir = os.environ.get('quantum'.upper() + '_LOCALEDIR')
|
||||||
t = gettext.translation('openstack-common', 'locale', fallback=True)
|
_t = gettext.translation('quantum', localedir=_localedir, fallback=True)
|
||||||
|
|
||||||
|
|
||||||
def _(msg):
|
def _(msg):
|
||||||
return t.ugettext(msg)
|
return _t.ugettext(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def install(domain):
|
||||||
|
"""Install a _() function using the given translation domain.
|
||||||
|
|
||||||
|
Given a translation domain, install a _() function using gettext's
|
||||||
|
install() function.
|
||||||
|
|
||||||
|
The main difference from gettext.install() is that we allow
|
||||||
|
overriding the default localedir (e.g. /usr/share/locale) using
|
||||||
|
a translation-domain-specific environment variable (e.g.
|
||||||
|
NOVA_LOCALEDIR).
|
||||||
|
"""
|
||||||
|
gettext.install(domain,
|
||||||
|
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
|
||||||
|
unicode=True)
|
||||||
|
@ -38,11 +38,21 @@ import functools
|
|||||||
import inspect
|
import inspect
|
||||||
import itertools
|
import itertools
|
||||||
import json
|
import json
|
||||||
|
import types
|
||||||
import xmlrpclib
|
import xmlrpclib
|
||||||
|
|
||||||
from quantum.openstack.common import timeutils
|
from quantum.openstack.common import timeutils
|
||||||
|
|
||||||
|
|
||||||
|
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
||||||
|
inspect.isfunction, inspect.isgeneratorfunction,
|
||||||
|
inspect.isgenerator, inspect.istraceback, inspect.isframe,
|
||||||
|
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
|
||||||
|
inspect.isabstract]
|
||||||
|
|
||||||
|
_simple_types = (types.NoneType, int, basestring, bool, float, long)
|
||||||
|
|
||||||
|
|
||||||
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||||
level=0, max_depth=3):
|
level=0, max_depth=3):
|
||||||
"""Convert a complex object into primitives.
|
"""Convert a complex object into primitives.
|
||||||
@ -58,17 +68,30 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
Therefore, convert_instances=True is lossy ... be aware.
|
Therefore, convert_instances=True is lossy ... be aware.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
# handle obvious types first - order of basic types determined by running
|
||||||
inspect.isfunction, inspect.isgeneratorfunction,
|
# full tests on nova project, resulting in the following counts:
|
||||||
inspect.isgenerator, inspect.istraceback, inspect.isframe,
|
# 572754 <type 'NoneType'>
|
||||||
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
|
# 460353 <type 'int'>
|
||||||
inspect.isabstract]
|
# 379632 <type 'unicode'>
|
||||||
for test in nasty:
|
# 274610 <type 'str'>
|
||||||
if test(value):
|
# 199918 <type 'dict'>
|
||||||
return unicode(value)
|
# 114200 <type 'datetime.datetime'>
|
||||||
|
# 51817 <type 'bool'>
|
||||||
|
# 26164 <type 'list'>
|
||||||
|
# 6491 <type 'float'>
|
||||||
|
# 283 <type 'tuple'>
|
||||||
|
# 19 <type 'long'>
|
||||||
|
if isinstance(value, _simple_types):
|
||||||
|
return value
|
||||||
|
|
||||||
# value of itertools.count doesn't get caught by inspects
|
if isinstance(value, datetime.datetime):
|
||||||
# above and results in infinite loop when list(value) is called.
|
if convert_datetime:
|
||||||
|
return timeutils.strtime(value)
|
||||||
|
else:
|
||||||
|
return value
|
||||||
|
|
||||||
|
# value of itertools.count doesn't get caught by nasty_type_tests
|
||||||
|
# and results in infinite loop when list(value) is called.
|
||||||
if type(value) == itertools.count:
|
if type(value) == itertools.count:
|
||||||
return unicode(value)
|
return unicode(value)
|
||||||
|
|
||||||
@ -91,17 +114,18 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
convert_datetime=convert_datetime,
|
convert_datetime=convert_datetime,
|
||||||
level=level,
|
level=level,
|
||||||
max_depth=max_depth)
|
max_depth=max_depth)
|
||||||
|
if isinstance(value, dict):
|
||||||
|
return dict((k, recursive(v)) for k, v in value.iteritems())
|
||||||
|
elif isinstance(value, (list, tuple)):
|
||||||
|
return [recursive(lv) for lv in value]
|
||||||
|
|
||||||
# It's not clear why xmlrpclib created their own DateTime type, but
|
# It's not clear why xmlrpclib created their own DateTime type, but
|
||||||
# for our purposes, make it a datetime type which is explicitly
|
# for our purposes, make it a datetime type which is explicitly
|
||||||
# handled
|
# handled
|
||||||
if isinstance(value, xmlrpclib.DateTime):
|
if isinstance(value, xmlrpclib.DateTime):
|
||||||
value = datetime.datetime(*tuple(value.timetuple())[:6])
|
value = datetime.datetime(*tuple(value.timetuple())[:6])
|
||||||
|
|
||||||
if isinstance(value, (list, tuple)):
|
if convert_datetime and isinstance(value, datetime.datetime):
|
||||||
return [recursive(v) for v in value]
|
|
||||||
elif isinstance(value, dict):
|
|
||||||
return dict((k, recursive(v)) for k, v in value.iteritems())
|
|
||||||
elif convert_datetime and isinstance(value, datetime.datetime):
|
|
||||||
return timeutils.strtime(value)
|
return timeutils.strtime(value)
|
||||||
elif hasattr(value, 'iteritems'):
|
elif hasattr(value, 'iteritems'):
|
||||||
return recursive(dict(value.iteritems()), level=level + 1)
|
return recursive(dict(value.iteritems()), level=level + 1)
|
||||||
@ -112,6 +136,8 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
# Ignore class member vars.
|
# Ignore class member vars.
|
||||||
return recursive(value.__dict__, level=level + 1)
|
return recursive(value.__dict__, level=level + 1)
|
||||||
else:
|
else:
|
||||||
|
if any(test(value) for test in _nasty_type_tests):
|
||||||
|
return unicode(value)
|
||||||
return value
|
return value
|
||||||
except TypeError:
|
except TypeError:
|
||||||
# Class objects are tricky since they may define something like
|
# Class objects are tricky since they may define something like
|
||||||
|
@ -112,9 +112,9 @@ generic_log_opts = [
|
|||||||
|
|
||||||
log_opts = [
|
log_opts = [
|
||||||
cfg.StrOpt('logging_context_format_string',
|
cfg.StrOpt('logging_context_format_string',
|
||||||
default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
|
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||||
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
|
'%(name)s [%(request_id)s %(user)s %(tenant)s] '
|
||||||
'%(message)s',
|
'%(instance)s%(message)s',
|
||||||
help='format string to use for log messages with context'),
|
help='format string to use for log messages with context'),
|
||||||
cfg.StrOpt('logging_default_format_string',
|
cfg.StrOpt('logging_default_format_string',
|
||||||
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
|
||||||
@ -432,14 +432,11 @@ def _setup_logging_from_conf():
|
|||||||
else:
|
else:
|
||||||
log_root.setLevel(logging.WARNING)
|
log_root.setLevel(logging.WARNING)
|
||||||
|
|
||||||
level = logging.NOTSET
|
|
||||||
for pair in CONF.default_log_levels:
|
for pair in CONF.default_log_levels:
|
||||||
mod, _sep, level_name = pair.partition('=')
|
mod, _sep, level_name = pair.partition('=')
|
||||||
level = logging.getLevelName(level_name)
|
level = logging.getLevelName(level_name)
|
||||||
logger = logging.getLogger(mod)
|
logger = logging.getLogger(mod)
|
||||||
logger.setLevel(level)
|
logger.setLevel(level)
|
||||||
for handler in log_root.handlers:
|
|
||||||
logger.addHandler(handler)
|
|
||||||
|
|
||||||
_loggers = {}
|
_loggers = {}
|
||||||
|
|
||||||
|
@ -19,7 +19,8 @@
|
|||||||
Network-related utilities and helper functions.
|
Network-related utilities and helper functions.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
from quantum.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -30,7 +30,6 @@ LOG = logging.getLogger(__name__)
|
|||||||
notifier_opts = [
|
notifier_opts = [
|
||||||
cfg.MultiStrOpt('notification_driver',
|
cfg.MultiStrOpt('notification_driver',
|
||||||
default=[],
|
default=[],
|
||||||
deprecated_name='list_notifier_drivers',
|
|
||||||
help='Driver or drivers to handle sending notifications'),
|
help='Driver or drivers to handle sending notifications'),
|
||||||
cfg.StrOpt('default_notification_level',
|
cfg.StrOpt('default_notification_level',
|
||||||
default='INFO',
|
default='INFO',
|
||||||
|
@ -13,26 +13,72 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
from quantum.openstack.common.gettextutils import _
|
from quantum.openstack.common.gettextutils import _
|
||||||
from quantum.openstack.common import log as logging
|
from quantum.openstack.common import log as logging
|
||||||
|
from quantum.openstack.common import timeutils
|
||||||
|
|
||||||
|
|
||||||
|
periodic_opts = [
|
||||||
|
cfg.BoolOpt('run_external_periodic_tasks',
|
||||||
|
default=True,
|
||||||
|
help=('Some periodic tasks can be run in a separate process. '
|
||||||
|
'Should we run them here?')),
|
||||||
|
]
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(periodic_opts)
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
DEFAULT_INTERVAL = 60.0
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidPeriodicTaskArg(Exception):
|
||||||
|
message = _("Unexpected argument for periodic task creation: %(arg)s.")
|
||||||
|
|
||||||
|
|
||||||
def periodic_task(*args, **kwargs):
|
def periodic_task(*args, **kwargs):
|
||||||
"""Decorator to indicate that a method is a periodic task.
|
"""Decorator to indicate that a method is a periodic task.
|
||||||
|
|
||||||
This decorator can be used in two ways:
|
This decorator can be used in two ways:
|
||||||
|
|
||||||
1. Without arguments '@periodic_task', this will be run on every tick
|
1. Without arguments '@periodic_task', this will be run on every cycle
|
||||||
of the periodic scheduler.
|
of the periodic scheduler.
|
||||||
|
|
||||||
2. With arguments, @periodic_task(ticks_between_runs=N), this will be
|
2. With arguments:
|
||||||
run on every N ticks of the periodic scheduler.
|
@periodic_task(spacing=N [, run_immediately=[True|False]])
|
||||||
|
this will be run on approximately every N seconds. If this number is
|
||||||
|
negative the periodic task will be disabled. If the run_immediately
|
||||||
|
argument is provided and has a value of 'True', the first run of the
|
||||||
|
task will be shortly after task scheduler starts. If
|
||||||
|
run_immediately is omitted or set to 'False', the first time the
|
||||||
|
task runs will be approximately N seconds after the task scheduler
|
||||||
|
starts.
|
||||||
"""
|
"""
|
||||||
def decorator(f):
|
def decorator(f):
|
||||||
|
# Test for old style invocation
|
||||||
|
if 'ticks_between_runs' in kwargs:
|
||||||
|
raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
|
||||||
|
|
||||||
|
# Control if run at all
|
||||||
f._periodic_task = True
|
f._periodic_task = True
|
||||||
f._ticks_between_runs = kwargs.pop('ticks_between_runs', 0)
|
f._periodic_external_ok = kwargs.pop('external_process_ok', False)
|
||||||
|
if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
|
||||||
|
f._periodic_enabled = False
|
||||||
|
else:
|
||||||
|
f._periodic_enabled = kwargs.pop('enabled', True)
|
||||||
|
|
||||||
|
# Control frequency
|
||||||
|
f._periodic_spacing = kwargs.pop('spacing', 0)
|
||||||
|
f._periodic_immediate = kwargs.pop('run_immediately', False)
|
||||||
|
if f._periodic_immediate:
|
||||||
|
f._periodic_last_run = None
|
||||||
|
else:
|
||||||
|
f._periodic_last_run = timeutils.utcnow()
|
||||||
return f
|
return f
|
||||||
|
|
||||||
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
|
# NOTE(sirp): The `if` is necessary to allow the decorator to be used with
|
||||||
@ -59,7 +105,7 @@ class _PeriodicTasksMeta(type):
|
|||||||
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
|
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
|
||||||
|
|
||||||
# NOTE(sirp): if the attribute is not present then we must be the base
|
# NOTE(sirp): if the attribute is not present then we must be the base
|
||||||
# class, so, go ahead and initialize it. If the attribute is present,
|
# class, so, go ahead an initialize it. If the attribute is present,
|
||||||
# then we're a subclass so make a copy of it so we don't step on our
|
# then we're a subclass so make a copy of it so we don't step on our
|
||||||
# parent's toes.
|
# parent's toes.
|
||||||
try:
|
try:
|
||||||
@ -68,20 +114,39 @@ class _PeriodicTasksMeta(type):
|
|||||||
cls._periodic_tasks = []
|
cls._periodic_tasks = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
cls._ticks_to_skip = cls._ticks_to_skip.copy()
|
cls._periodic_last_run = cls._periodic_last_run.copy()
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
cls._ticks_to_skip = {}
|
cls._periodic_last_run = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
cls._periodic_spacing = cls._periodic_spacing.copy()
|
||||||
|
except AttributeError:
|
||||||
|
cls._periodic_spacing = {}
|
||||||
|
|
||||||
# This uses __dict__ instead of
|
|
||||||
# inspect.getmembers(cls, inspect.ismethod) so only the methods of the
|
|
||||||
# current class are added when this class is scanned, and base classes
|
|
||||||
# are not added redundantly.
|
|
||||||
for value in cls.__dict__.values():
|
for value in cls.__dict__.values():
|
||||||
if getattr(value, '_periodic_task', False):
|
if getattr(value, '_periodic_task', False):
|
||||||
task = value
|
task = value
|
||||||
name = task.__name__
|
name = task.__name__
|
||||||
|
|
||||||
|
if task._periodic_spacing < 0:
|
||||||
|
LOG.info(_('Skipping periodic task %(task)s because '
|
||||||
|
'its interval is negative'),
|
||||||
|
{'task': name})
|
||||||
|
continue
|
||||||
|
if not task._periodic_enabled:
|
||||||
|
LOG.info(_('Skipping periodic task %(task)s because '
|
||||||
|
'it is disabled'),
|
||||||
|
{'task': name})
|
||||||
|
continue
|
||||||
|
|
||||||
|
# A periodic spacing of zero indicates that this task should
|
||||||
|
# be run every pass
|
||||||
|
if task._periodic_spacing == 0:
|
||||||
|
task._periodic_spacing = None
|
||||||
|
|
||||||
cls._periodic_tasks.append((name, task))
|
cls._periodic_tasks.append((name, task))
|
||||||
cls._ticks_to_skip[name] = task._ticks_between_runs
|
cls._periodic_spacing[name] = task._periodic_spacing
|
||||||
|
cls._periodic_last_run[name] = task._periodic_last_run
|
||||||
|
|
||||||
|
|
||||||
class PeriodicTasks(object):
|
class PeriodicTasks(object):
|
||||||
@ -89,27 +154,34 @@ class PeriodicTasks(object):
|
|||||||
|
|
||||||
def run_periodic_tasks(self, context, raise_on_error=False):
|
def run_periodic_tasks(self, context, raise_on_error=False):
|
||||||
"""Tasks to be run at a periodic interval."""
|
"""Tasks to be run at a periodic interval."""
|
||||||
|
idle_for = DEFAULT_INTERVAL
|
||||||
for task_name, task in self._periodic_tasks:
|
for task_name, task in self._periodic_tasks:
|
||||||
full_task_name = '.'.join([self.__class__.__name__, task_name])
|
full_task_name = '.'.join([self.__class__.__name__, task_name])
|
||||||
|
|
||||||
ticks_to_skip = self._ticks_to_skip[task_name]
|
now = timeutils.utcnow()
|
||||||
if ticks_to_skip > 0:
|
spacing = self._periodic_spacing[task_name]
|
||||||
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
|
last_run = self._periodic_last_run[task_name]
|
||||||
" ticks left until next run"),
|
|
||||||
dict(full_task_name=full_task_name,
|
# If a periodic task is _nearly_ due, then we'll run it early
|
||||||
ticks_to_skip=ticks_to_skip))
|
if spacing is not None and last_run is not None:
|
||||||
self._ticks_to_skip[task_name] -= 1
|
due = last_run + datetime.timedelta(seconds=spacing)
|
||||||
|
if not timeutils.is_soon(due, 0.2):
|
||||||
|
idle_for = min(idle_for, timeutils.delta_seconds(now, due))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self._ticks_to_skip[task_name] = task._ticks_between_runs
|
if spacing is not None:
|
||||||
LOG.debug(_("Running periodic task %(full_task_name)s"),
|
idle_for = min(idle_for, spacing)
|
||||||
dict(full_task_name=full_task_name))
|
|
||||||
|
LOG.debug(_("Running periodic task %(full_task_name)s"), locals())
|
||||||
|
self._periodic_last_run[task_name] = timeutils.utcnow()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
task(self, context)
|
task(self, context)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if raise_on_error:
|
if raise_on_error:
|
||||||
raise
|
raise
|
||||||
LOG.exception(_("Error during %(full_task_name)s:"
|
LOG.exception(_("Error during %(full_task_name)s: %(e)s"),
|
||||||
" %(e)s"),
|
locals())
|
||||||
dict(e=e, full_task_name=full_task_name))
|
time.sleep(0)
|
||||||
|
|
||||||
|
return idle_for
|
||||||
|
@ -57,7 +57,6 @@ as it allows particular rules to be explicitly disabled.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import logging
|
|
||||||
import re
|
import re
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
@ -65,6 +64,7 @@ import urllib2
|
|||||||
|
|
||||||
from quantum.openstack.common.gettextutils import _
|
from quantum.openstack.common.gettextutils import _
|
||||||
from quantum.openstack.common import jsonutils
|
from quantum.openstack.common import jsonutils
|
||||||
|
from quantum.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
System-level utilities and helper functions.
|
System-level utilities and helper functions.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
|
||||||
import random
|
import random
|
||||||
import shlex
|
import shlex
|
||||||
|
|
||||||
@ -27,6 +26,7 @@ from eventlet.green import subprocess
|
|||||||
from eventlet import greenthread
|
from eventlet import greenthread
|
||||||
|
|
||||||
from quantum.openstack.common.gettextutils import _
|
from quantum.openstack.common.gettextutils import _
|
||||||
|
from quantum.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
@ -26,13 +26,13 @@ For some wrappers that add message versioning to rpc, see:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
|
||||||
|
|
||||||
from oslo.config import cfg
|
from oslo.config import cfg
|
||||||
|
|
||||||
from quantum.openstack.common.gettextutils import _
|
from quantum.openstack.common.gettextutils import _
|
||||||
from quantum.openstack.common import importutils
|
from quantum.openstack.common import importutils
|
||||||
from quantum.openstack.common import local
|
from quantum.openstack.common import local
|
||||||
|
from quantum.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
ctxt = unpack_context(self.conf, message_data)
|
ctxt = unpack_context(self.conf, message_data)
|
||||||
method = message_data.get('method')
|
method = message_data.get('method')
|
||||||
args = message_data.get('args', {})
|
args = message_data.get('args', {})
|
||||||
version = message_data.get('version', None)
|
version = message_data.get('version')
|
||||||
|
namespace = message_data.get('namespace')
|
||||||
if not method:
|
if not method:
|
||||||
LOG.warn(_('no method for message: %s') % message_data)
|
LOG.warn(_('no method for message: %s') % message_data)
|
||||||
ctxt.reply(_('No method for message: %s') % message_data,
|
ctxt.reply(_('No method for message: %s') % message_data,
|
||||||
connection_pool=self.connection_pool)
|
connection_pool=self.connection_pool)
|
||||||
return
|
return
|
||||||
self.pool.spawn_n(self._process_data, ctxt, version, method, args)
|
self.pool.spawn_n(self._process_data, ctxt, version, method,
|
||||||
|
namespace, args)
|
||||||
|
|
||||||
def _process_data(self, ctxt, version, method, args):
|
def _process_data(self, ctxt, version, method, namespace, args):
|
||||||
"""Process a message in a new thread.
|
"""Process a message in a new thread.
|
||||||
|
|
||||||
If the proxy object we have has a dispatch method
|
If the proxy object we have has a dispatch method
|
||||||
@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
"""
|
"""
|
||||||
ctxt.update_store()
|
ctxt.update_store()
|
||||||
try:
|
try:
|
||||||
rval = self.proxy.dispatch(ctxt, version, method, **args)
|
rval = self.proxy.dispatch(ctxt, version, method, namespace,
|
||||||
|
**args)
|
||||||
# Check if the result was a generator
|
# Check if the result was a generator
|
||||||
if inspect.isgenerator(rval):
|
if inspect.isgenerator(rval):
|
||||||
for x in rval:
|
for x in rval:
|
||||||
|
@ -339,7 +339,7 @@ def deserialize_remote_exception(conf, data):
|
|||||||
if not issubclass(klass, Exception):
|
if not issubclass(klass, Exception):
|
||||||
raise TypeError("Can only deserialize Exceptions")
|
raise TypeError("Can only deserialize Exceptions")
|
||||||
|
|
||||||
failure = klass(**failure.get('kwargs', {}))
|
failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
|
||||||
except (AttributeError, TypeError, ImportError):
|
except (AttributeError, TypeError, ImportError):
|
||||||
return RemoteError(name, failure.get('message'), trace)
|
return RemoteError(name, failure.get('message'), trace)
|
||||||
|
|
||||||
|
@ -103,13 +103,16 @@ class RpcDispatcher(object):
|
|||||||
self.callbacks = callbacks
|
self.callbacks = callbacks
|
||||||
super(RpcDispatcher, self).__init__()
|
super(RpcDispatcher, self).__init__()
|
||||||
|
|
||||||
def dispatch(self, ctxt, version, method, **kwargs):
|
def dispatch(self, ctxt, version, method, namespace, **kwargs):
|
||||||
"""Dispatch a message based on a requested version.
|
"""Dispatch a message based on a requested version.
|
||||||
|
|
||||||
:param ctxt: The request context
|
:param ctxt: The request context
|
||||||
:param version: The requested API version from the incoming message
|
:param version: The requested API version from the incoming message
|
||||||
:param method: The method requested to be called by the incoming
|
:param method: The method requested to be called by the incoming
|
||||||
message.
|
message.
|
||||||
|
:param namespace: The namespace for the requested method. If None,
|
||||||
|
the dispatcher will look for a method on a callback
|
||||||
|
object with no namespace set.
|
||||||
:param kwargs: A dict of keyword arguments to be passed to the method.
|
:param kwargs: A dict of keyword arguments to be passed to the method.
|
||||||
|
|
||||||
:returns: Whatever is returned by the underlying method that gets
|
:returns: Whatever is returned by the underlying method that gets
|
||||||
@ -120,13 +123,25 @@ class RpcDispatcher(object):
|
|||||||
|
|
||||||
had_compatible = False
|
had_compatible = False
|
||||||
for proxyobj in self.callbacks:
|
for proxyobj in self.callbacks:
|
||||||
if hasattr(proxyobj, 'RPC_API_VERSION'):
|
# Check for namespace compatibility
|
||||||
|
try:
|
||||||
|
cb_namespace = proxyobj.RPC_API_NAMESPACE
|
||||||
|
except AttributeError:
|
||||||
|
cb_namespace = None
|
||||||
|
|
||||||
|
if namespace != cb_namespace:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for version compatibility
|
||||||
|
try:
|
||||||
rpc_api_version = proxyobj.RPC_API_VERSION
|
rpc_api_version = proxyobj.RPC_API_VERSION
|
||||||
else:
|
except AttributeError:
|
||||||
rpc_api_version = '1.0'
|
rpc_api_version = '1.0'
|
||||||
|
|
||||||
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
|
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
|
||||||
version)
|
version)
|
||||||
had_compatible = had_compatible or is_compatible
|
had_compatible = had_compatible or is_compatible
|
||||||
|
|
||||||
if not hasattr(proxyobj, method):
|
if not hasattr(proxyobj, method):
|
||||||
continue
|
continue
|
||||||
if is_compatible:
|
if is_compatible:
|
||||||
|
@ -57,13 +57,14 @@ class Consumer(object):
|
|||||||
self.topic = topic
|
self.topic = topic
|
||||||
self.proxy = proxy
|
self.proxy = proxy
|
||||||
|
|
||||||
def call(self, context, version, method, args, timeout):
|
def call(self, context, version, method, namespace, args, timeout):
|
||||||
done = eventlet.event.Event()
|
done = eventlet.event.Event()
|
||||||
|
|
||||||
def _inner():
|
def _inner():
|
||||||
ctxt = RpcContext.from_dict(context.to_dict())
|
ctxt = RpcContext.from_dict(context.to_dict())
|
||||||
try:
|
try:
|
||||||
rval = self.proxy.dispatch(context, version, method, **args)
|
rval = self.proxy.dispatch(context, version, method,
|
||||||
|
namespace, **args)
|
||||||
res = []
|
res = []
|
||||||
# Caller might have called ctxt.reply() manually
|
# Caller might have called ctxt.reply() manually
|
||||||
for (reply, failure) in ctxt._response:
|
for (reply, failure) in ctxt._response:
|
||||||
@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None):
|
|||||||
return
|
return
|
||||||
args = msg.get('args', {})
|
args = msg.get('args', {})
|
||||||
version = msg.get('version', None)
|
version = msg.get('version', None)
|
||||||
|
namespace = msg.get('namespace', None)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
consumer = CONSUMERS[topic][0]
|
consumer = CONSUMERS[topic][0]
|
||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
return iter([None])
|
return iter([None])
|
||||||
else:
|
else:
|
||||||
return consumer.call(context, version, method, args, timeout)
|
return consumer.call(context, version, method, namespace, args,
|
||||||
|
timeout)
|
||||||
|
|
||||||
|
|
||||||
def call(conf, context, topic, msg, timeout=None):
|
def call(conf, context, topic, msg, timeout=None):
|
||||||
@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg):
|
|||||||
return
|
return
|
||||||
args = msg.get('args', {})
|
args = msg.get('args', {})
|
||||||
version = msg.get('version', None)
|
version = msg.get('version', None)
|
||||||
|
namespace = msg.get('namespace', None)
|
||||||
|
|
||||||
for consumer in CONSUMERS.get(topic, []):
|
for consumer in CONSUMERS.get(topic, []):
|
||||||
try:
|
try:
|
||||||
consumer.call(context, version, method, args, None)
|
consumer.call(context, version, method, namespace, args, None)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
@ -40,8 +40,8 @@ qpid_opts = [
|
|||||||
cfg.StrOpt('qpid_hostname',
|
cfg.StrOpt('qpid_hostname',
|
||||||
default='localhost',
|
default='localhost',
|
||||||
help='Qpid broker hostname'),
|
help='Qpid broker hostname'),
|
||||||
cfg.StrOpt('qpid_port',
|
cfg.IntOpt('qpid_port',
|
||||||
default='5672',
|
default=5672,
|
||||||
help='Qpid broker port'),
|
help='Qpid broker port'),
|
||||||
cfg.ListOpt('qpid_hosts',
|
cfg.ListOpt('qpid_hosts',
|
||||||
default=['$qpid_hostname:$qpid_port'],
|
default=['$qpid_hostname:$qpid_port'],
|
||||||
@ -320,7 +320,7 @@ class Connection(object):
|
|||||||
# Reconnection is done by self.reconnect()
|
# Reconnection is done by self.reconnect()
|
||||||
self.connection.reconnect = False
|
self.connection.reconnect = False
|
||||||
self.connection.heartbeat = self.conf.qpid_heartbeat
|
self.connection.heartbeat = self.conf.qpid_heartbeat
|
||||||
self.connection.protocol = self.conf.qpid_protocol
|
self.connection.transport = self.conf.qpid_protocol
|
||||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||||
|
|
||||||
def _register_consumer(self, consumer):
|
def _register_consumer(self, consumer):
|
||||||
|
@ -276,7 +276,8 @@ class InternalContext(object):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
result = proxy.dispatch(
|
result = proxy.dispatch(
|
||||||
ctx, data['version'], data['method'], **data['args'])
|
ctx, data['version'], data['method'],
|
||||||
|
data.get('namespace'), **data['args'])
|
||||||
return ConsumerBase.normalize_reply(result, ctx.replies)
|
return ConsumerBase.normalize_reply(result, ctx.replies)
|
||||||
except greenlet.GreenletExit:
|
except greenlet.GreenletExit:
|
||||||
# ignore these since they are just from shutdowns
|
# ignore these since they are just from shutdowns
|
||||||
@ -351,7 +352,7 @@ class ConsumerBase(object):
|
|||||||
return
|
return
|
||||||
|
|
||||||
proxy.dispatch(ctx, data['version'],
|
proxy.dispatch(ctx, data['version'],
|
||||||
data['method'], **data['args'])
|
data['method'], data.get('namespace'), **data['args'])
|
||||||
|
|
||||||
|
|
||||||
class ZmqBaseReactor(ConsumerBase):
|
class ZmqBaseReactor(ConsumerBase):
|
||||||
|
@ -35,10 +35,10 @@ matchmaker_opts = [
|
|||||||
default='/etc/nova/matchmaker_ring.json',
|
default='/etc/nova/matchmaker_ring.json',
|
||||||
help='Matchmaker ring file (JSON)'),
|
help='Matchmaker ring file (JSON)'),
|
||||||
cfg.IntOpt('matchmaker_heartbeat_freq',
|
cfg.IntOpt('matchmaker_heartbeat_freq',
|
||||||
default='300',
|
default=300,
|
||||||
help='Heartbeat frequency'),
|
help='Heartbeat frequency'),
|
||||||
cfg.IntOpt('matchmaker_heartbeat_ttl',
|
cfg.IntOpt('matchmaker_heartbeat_ttl',
|
||||||
default='600',
|
default=600,
|
||||||
help='Heartbeat time-to-live.'),
|
help='Heartbeat time-to-live.'),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -58,9 +58,13 @@ class RpcProxy(object):
|
|||||||
"""Return the topic to use for a message."""
|
"""Return the topic to use for a message."""
|
||||||
return topic if topic else self.topic
|
return topic if topic else self.topic
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def make_namespaced_msg(method, namespace, **kwargs):
|
||||||
|
return {'method': method, 'namespace': namespace, 'args': kwargs}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def make_msg(method, **kwargs):
|
def make_msg(method, **kwargs):
|
||||||
return {'method': method, 'args': kwargs}
|
return RpcProxy.make_namespaced_msg(method, None, **kwargs)
|
||||||
|
|
||||||
def call(self, context, msg, topic=None, version=None, timeout=None):
|
def call(self, context, msg, topic=None, version=None, timeout=None):
|
||||||
"""rpc.call() a remote method.
|
"""rpc.call() a remote method.
|
||||||
|
41
quantum/openstack/common/rpc/zmq_receiver.py
Executable file
41
quantum/openstack/common/rpc/zmq_receiver.py
Executable file
@ -0,0 +1,41 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack Foundation
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from quantum.openstack.common import log as logging
|
||||||
|
from quantum.openstack.common import rpc
|
||||||
|
from quantum.openstack.common.rpc import impl_zmq
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(rpc.rpc_opts)
|
||||||
|
CONF.register_opts(impl_zmq.zmq_opts)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
CONF(sys.argv[1:], project='oslo')
|
||||||
|
logging.setup("oslo")
|
||||||
|
|
||||||
|
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
|
||||||
|
reactor.consume_in_thread()
|
||||||
|
reactor.wait()
|
@ -145,7 +145,7 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
|
|||||||
self.needs_resync = False
|
self.needs_resync = False
|
||||||
self.sync_state()
|
self.sync_state()
|
||||||
|
|
||||||
@periodic_task.periodic_task(ticks_between_runs=6)
|
@periodic_task.periodic_task(spacing=6)
|
||||||
def collect_stats(self, context):
|
def collect_stats(self, context):
|
||||||
for pool_id in self.cache.get_pool_ids():
|
for pool_id in self.cache.get_pool_ids():
|
||||||
try:
|
try:
|
||||||
|
@ -519,8 +519,8 @@ class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
|
|||||||
[call(None,
|
[call(None,
|
||||||
{'args':
|
{'args':
|
||||||
{'devices': ['fake_device']},
|
{'devices': ['fake_device']},
|
||||||
'method':
|
'method': 'security_group_rules_for_devices',
|
||||||
'security_group_rules_for_devices'},
|
'namespace': None},
|
||||||
version=sg_rpc.SG_RPC_VERSION,
|
version=sg_rpc.SG_RPC_VERSION,
|
||||||
topic='fake_topic')])
|
topic='fake_topic')])
|
||||||
|
|
||||||
@ -544,7 +544,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase):
|
|||||||
[call(None,
|
[call(None,
|
||||||
{'args':
|
{'args':
|
||||||
{'security_groups': ['fake_sgid']},
|
{'security_groups': ['fake_sgid']},
|
||||||
'method': 'security_groups_rule_updated'},
|
'method': 'security_groups_rule_updated',
|
||||||
|
'namespace': None},
|
||||||
version=sg_rpc.SG_RPC_VERSION,
|
version=sg_rpc.SG_RPC_VERSION,
|
||||||
topic='fake-security_group-update')])
|
topic='fake-security_group-update')])
|
||||||
|
|
||||||
@ -555,7 +556,8 @@ class SecurityGroupAgentRpcApiTestCase(base.BaseTestCase):
|
|||||||
[call(None,
|
[call(None,
|
||||||
{'args':
|
{'args':
|
||||||
{'security_groups': ['fake_sgid']},
|
{'security_groups': ['fake_sgid']},
|
||||||
'method': 'security_groups_member_updated'},
|
'method': 'security_groups_member_updated',
|
||||||
|
'namespace': None},
|
||||||
version=sg_rpc.SG_RPC_VERSION,
|
version=sg_rpc.SG_RPC_VERSION,
|
||||||
topic='fake-security_group-update')])
|
topic='fake-security_group-update')])
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user