Synchronize code from oslo

Use commit eaab5fae2502198e9fa57d0d90a7204a2bd83b16:
Merge "sort options to make --help output prettier"
(Wed Feb 13 12:52:14 2013 +0000)

Add processutils to quantum since impl_zmq depends on them.

Drop notifier.list_notifier that is not present in oslo.

Change-Id: I91d9ec05481b8c24da9fbee1ad4706ff56a3b7aa
Fixes: bug #1116290
This commit is contained in:
Alessio Ababilov 2013-02-05 16:01:36 +02:00
parent 7e892502f1
commit 868295095b
23 changed files with 396 additions and 290 deletions

View File

@ -77,9 +77,9 @@ copyright = u'2011-present, OpenStack, LLC.'
# #
# Version info # Version info
from quantum.version import version_info as quantum_version from quantum.version import version_info as quantum_version
release = quantum_version.version_string_with_vcs() release = quantum_version.release_string()
# The short X.Y version. # The short X.Y version.
version = quantum_version.canonical_version_string() version = quantum_version.version_string()
# The language for content autogenerated by Sphinx. Refer to documentation # The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages. # for a list of supported languages.

View File

@ -1,5 +1,5 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from openstack-common # The list of modules to copy from openstack-common
modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,processutils,rpc,service,setup,threadgroup,timeutils,uuidutils,version
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=quantum base=quantum

View File

@ -84,7 +84,7 @@ rpc.set_defaults(control_exchange='quantum')
def parse(args): def parse(args):
cfg.CONF(args=args, project='quantum', cfg.CONF(args=args, project='quantum',
version='%%prog %s' % quantum_version.version_string_with_vcs()) version='%%prog %s' % quantum_version.release_string())
# Validate that the base_mac is of the correct format # Validate that the base_mac is of the correct format
msg = attributes._validate_regex(cfg.CONF.base_mac, msg = attributes._validate_regex(cfg.CONF.base_mac,

View File

@ -1643,7 +1643,7 @@ class ConfigOpts(collections.Mapping):
""" """
self._args = args self._args = args
for opt, group in self._all_cli_opts(): for opt, group in sorted(self._all_cli_opts()):
opt._add_to_cli(self._oparser, group) opt._add_to_cli(self._oparser, group)
return vars(self._oparser.parse_args(args)) return vars(self._oparser.parse_args(args))

View File

@ -23,6 +23,8 @@ import logging
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
_FATAL_EXCEPTION_FORMAT_ERRORS = False
class Error(Exception): class Error(Exception):
def __init__(self, message=None): def __init__(self, message=None):
@ -121,9 +123,12 @@ class OpenstackException(Exception):
try: try:
self._error_string = self.message % kwargs self._error_string = self.message % kwargs
except Exception: except Exception as e:
# at least get the core message out if something happened if _FATAL_EXCEPTION_FORMAT_ERRORS:
self._error_string = self.message raise e
else:
# at least get the core message out if something happened
self._error_string = self.message
def __str__(self): def __str__(self):
return self._error_string return self._error_string

View File

@ -34,15 +34,21 @@ This module provides a few things:
import datetime import datetime
import functools
import inspect import inspect
import itertools import itertools
import json import json
import logging
import xmlrpclib import xmlrpclib
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import timeutils from quantum.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def to_primitive(value, convert_instances=False, level=0):
def to_primitive(value, convert_instances=False, convert_datetime=True,
level=0, max_depth=3):
"""Convert a complex object into primitives. """Convert a complex object into primitives.
Handy for JSON serialization. We can optionally handle instances, Handy for JSON serialization. We can optionally handle instances,
@ -78,12 +84,19 @@ def to_primitive(value, convert_instances=False, level=0):
if getattr(value, '__module__', None) == 'mox': if getattr(value, '__module__', None) == 'mox':
return 'mock' return 'mock'
if level > 3: if level > max_depth:
LOG.error(_('Max serialization depth exceeded on object: %d %s'),
level, value)
return '?' return '?'
# The try block may not be necessary after the class check above, # The try block may not be necessary after the class check above,
# but just in case ... # but just in case ...
try: try:
recursive = functools.partial(to_primitive,
convert_instances=convert_instances,
convert_datetime=convert_datetime,
level=level,
max_depth=max_depth)
# It's not clear why xmlrpclib created their own DateTime type, but # It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly # for our purposes, make it a datetime type which is explicitly
# handled # handled
@ -91,33 +104,19 @@ def to_primitive(value, convert_instances=False, level=0):
value = datetime.datetime(*tuple(value.timetuple())[:6]) value = datetime.datetime(*tuple(value.timetuple())[:6])
if isinstance(value, (list, tuple)): if isinstance(value, (list, tuple)):
o = [] return [recursive(v) for v in value]
for v in value:
o.append(to_primitive(v, convert_instances=convert_instances,
level=level))
return o
elif isinstance(value, dict): elif isinstance(value, dict):
o = {} return dict((k, recursive(v)) for k, v in value.iteritems())
for k, v in value.iteritems(): elif convert_datetime and isinstance(value, datetime.datetime):
o[k] = to_primitive(v, convert_instances=convert_instances,
level=level)
return o
elif isinstance(value, datetime.datetime):
return timeutils.strtime(value) return timeutils.strtime(value)
elif hasattr(value, 'iteritems'): elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()), return recursive(dict(value.iteritems()), level=level + 1)
convert_instances=convert_instances,
level=level + 1)
elif hasattr(value, '__iter__'): elif hasattr(value, '__iter__'):
return to_primitive(list(value), return recursive(list(value))
convert_instances=convert_instances,
level=level)
elif convert_instances and hasattr(value, '__dict__'): elif convert_instances and hasattr(value, '__dict__'):
# Likely an instance of something. Watch for cycles. # Likely an instance of something. Watch for cycles.
# Ignore class member vars. # Ignore class member vars.
return to_primitive(value.__dict__, return recursive(value.__dict__, level=level + 1)
convert_instances=convert_instances,
level=level + 1)
else: else:
return value return value
except TypeError: except TypeError:

View File

@ -26,6 +26,9 @@ class WeakLocal(corolocal.local):
def __getattribute__(self, attr): def __getattribute__(self, attr):
rval = corolocal.local.__getattribute__(self, attr) rval = corolocal.local.__getattribute__(self, attr)
if rval: if rval:
# NOTE(mikal): this bit is confusing. What is stored is a weak
# reference, not the value itself. We therefore need to lookup
# the weak reference and return the inner value here.
rval = rval() rval = rval()
return rval return rval
@ -34,4 +37,12 @@ class WeakLocal(corolocal.local):
return corolocal.local.__setattr__(self, attr, value) return corolocal.local.__setattr__(self, attr, value)
# NOTE(mikal): the name "store" should be deprecated in the future
store = WeakLocal() store = WeakLocal()
# A "weak" store uses weak references and allows an object to fall out of scope
# when it falls out of scope in the code that uses the thread local storage. A
# "strong" store will hold a reference to the object so that it never falls out
# of scope.
weak_store = WeakLocal()
strong_store = corolocal.local

View File

@ -29,6 +29,7 @@ from eventlet import semaphore
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import fileutils from quantum.openstack.common import fileutils
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
@ -39,9 +40,8 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False, cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'), help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path', cfg.StrOpt('lock_path',
default=os.path.abspath(os.path.join(os.path.dirname(__file__), help=('Directory to use for lock files. Default to a '
'../')), 'temp directory'))
help='Directory to use for lock files')
] ]
@ -140,7 +140,7 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
def foo(self, *args): def foo(self, *args):
... ...
ensures that only one thread will execute the bar method at a time. ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock:: Different methods can share the same lock::
@ -184,54 +184,66 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
LOG.debug(_('Got semaphore "%(lock)s" for method ' LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name, '"%(method)s"...'), {'lock': name,
'method': f.__name__}) 'method': f.__name__})
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local # NOTE(mikal): I know this looks odd
local_lock_path = lock_path if not hasattr(local.strong_store, 'locks_held'):
if not local_lock_path: local.strong_store.locks_held = []
local_lock_path = CONF.lock_path local.strong_store.locks_held.append(name)
if not local_lock_path: try:
cleanup_dir = True if external and not CONF.disable_process_locking:
local_lock_path = tempfile.mkdtemp() LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
if not os.path.exists(local_lock_path): # We need a copy of lock_path because it is non-local
cleanup_dir = True local_lock_path = lock_path
fileutils.ensure_tree(local_lock_path) if not local_lock_path:
local_lock_path = CONF.lock_path
# NOTE(mikal): the lock name cannot contain directory if not local_lock_path:
# separators cleanup_dir = True
safe_name = name.replace(os.sep, '_') local_lock_path = tempfile.mkdtemp()
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try: if not os.path.exists(local_lock_path):
lock = InterProcessLock(lock_file_path) cleanup_dir = True
with lock: fileutils.ensure_tree(local_lock_path)
LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
'for method "%(method)s"...'), # NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name, {'lock': name,
'path': lock_file_path, 'path': lock_file_path,
'method': f.__name__}) 'method': f.__name__})
retval = f(*args, **kwargs) # NOTE(vish): This removes the tempdir if we needed
finally: # to create one. This is used to
LOG.debug(_('Released file lock "%(lock)s" at %(path)s' # cleanup the locks left behind by unit
' for method "%(method)s"...'), # tests.
{'lock': name, if cleanup_dir:
'path': lock_file_path, shutil.rmtree(local_lock_path)
'method': f.__name__}) else:
# NOTE(vish): This removes the tempdir if we needed retval = f(*args, **kwargs)
# to create one. This is used to cleanup
# the locks left behind by unit tests. finally:
if cleanup_dir: local.strong_store.locks_held.remove(name)
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
return retval return retval
return inner return inner

View File

@ -1,118 +0,0 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
list_notifier_drivers_opt = cfg.MultiStrOpt(
'list_notifier_drivers',
default=['quantum.openstack.common.notifier.no_op_notifier'],
help='List of drivers to send notifications')
CONF = cfg.CONF
CONF.register_opt(list_notifier_drivers_opt)
LOG = logging.getLogger(__name__)
drivers = None
class ImportFailureNotifier(object):
"""Noisily re-raises some exception over-and-over when notify is called."""
def __init__(self, exception):
self.exception = exception
def notify(self, context, message):
raise self.exception
def _get_drivers():
"""Instantiates and returns drivers based on the flag values."""
global drivers
if drivers is None:
drivers = []
for notification_driver in CONF.list_notifier_drivers:
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
return drivers
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
else:
# Driver is already loaded; just add the object.
drivers.append(notification_driver)
def _object_name(obj):
name = []
if hasattr(obj, '__module__'):
name.append(obj.__module__)
if hasattr(obj, '__name__'):
name.append(obj.__name__)
else:
name.append(obj.__class__.__name__)
return '.'.join(name)
def remove_driver(notification_driver):
"""Remove a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
removed = False
if notification_driver in drivers:
# We're removing an object. Easy.
drivers.remove(notification_driver)
removed = True
else:
# We're removing a driver by name. Search for it.
for driver in drivers:
if _object_name(driver) == notification_driver:
drivers.remove(driver)
removed = True
if not removed:
raise ValueError("Cannot remove; %s is not in list" %
notification_driver)
def notify(context, message):
"""Passes notification to multiple notifiers in a list."""
for driver in _get_drivers():
try:
driver.notify(context, message)
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to send to "
"notification driver %(driver)s."), locals())
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global drivers
drivers = None

View File

@ -0,0 +1,135 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import logging
import random
import shlex
from eventlet.green import subprocess
from eventlet import greenthread
from quantum.openstack.common.gettextutils import _
LOG = logging.getLogger(__name__)
class UnknownArgumentError(Exception):
def __init__(self, message=None):
super(UnknownArgumentError, self).__init__(message)
class ProcessExecutionError(Exception):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r"
% (description, cmd, exit_code, stdout, stderr))
super(ProcessExecutionError, self).__init__(message)
def execute(*cmd, **kwargs):
"""
Helper method to shell out and execute a command through subprocess with
optional retry.
:param cmd: Passed to subprocess.Popen.
:type cmd: string
:param process_input: Send to opened process.
:type proces_input: string
:param check_exit_code: Defaults to 0. Will raise
:class:`ProcessExecutionError`
if the command exits without returning this value
as a returncode
:type check_exit_code: int
:param delay_on_retry: True | False. Defaults to True. If set to True,
wait a short amount of time before retrying.
:type delay_on_retry: boolean
:param attempts: How many times to retry cmd.
:type attempts: int
:param run_as_root: True | False. Defaults to False. If set to True,
the command is prefixed by the command specified
in the root_helper kwarg.
:type run_as_root: boolean
:param root_helper: command to prefix all cmd's with
:type root_helper: string
:returns: (stdout, stderr) from process execution
:raises: :class:`UnknownArgumentError` on
receiving unknown arguments
:raises: :class:`ProcessExecutionError`
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', 0)
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
if len(kwargs):
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root:
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
attempts -= 1
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=True)
result = None
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if (isinstance(check_exit_code, int) and
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
return result
except ProcessExecutionError:
if not attempts:
raise
else:
LOG.debug(_('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)

View File

@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see:
rpc.proxy rpc.proxy
""" """
import inspect
import logging
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils from quantum.openstack.common import importutils
from quantum.openstack.common import local
LOG = logging.getLogger(__name__)
rpc_opts = [ rpc_opts = [
@ -62,7 +70,8 @@ rpc_opts = [
help='AMQP exchange to connect to if using RabbitMQ or Qpid'), help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
] ]
cfg.CONF.register_opts(rpc_opts) CONF = cfg.CONF
CONF.register_opts(rpc_opts)
def set_defaults(control_exchange): def set_defaults(control_exchange):
@ -83,10 +92,27 @@ def create_connection(new=True):
:returns: An instance of openstack.common.rpc.common.Connection :returns: An instance of openstack.common.rpc.common.Connection
""" """
return _get_impl().create_connection(cfg.CONF, new=new) return _get_impl().create_connection(CONF, new=new)
def call(context, topic, msg, timeout=None): def _check_for_lock():
if not CONF.debug:
return None
if ((hasattr(local.strong_store, 'locks_held')
and local.strong_store.locks_held)):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
LOG.warn(_('A RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held,
'stack': stack})
return True
return False
def call(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method that returns something. """Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this :param context: Information that identifies the user that has made this
@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None):
"args" : dict_of_kwargs } "args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout. :param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option. If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: A dict from the remote method. :returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response :raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached. is not received before the timeout is reached.
""" """
return _get_impl().call(cfg.CONF, context, topic, msg, timeout) if check_for_lock:
_check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)
def cast(context, topic, msg): def cast(context, topic, msg):
@ -124,7 +154,7 @@ def cast(context, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().cast(cfg.CONF, context, topic, msg) return _get_impl().cast(CONF, context, topic, msg)
def fanout_cast(context, topic, msg): def fanout_cast(context, topic, msg):
@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) return _get_impl().fanout_cast(CONF, context, topic, msg)
def multicall(context, topic, msg, timeout=None): def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"""Invoke a remote method and get back an iterator. """Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in In this case, the remote method will be returning multiple values in
@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None):
"args" : dict_of_kwargs } "args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout. :param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option. If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is :returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value an index that starts at 0 and increases by one for each value
@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None):
:raises: openstack.common.rpc.common.Timeout if a complete response :raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached. is not received before the timeout is reached.
""" """
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) if check_for_lock:
_check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)
def notify(context, topic, msg, envelope=False): def notify(context, topic, msg, envelope=False):
@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, return _get_impl().cast_to_server(CONF, context, server_params, topic,
msg) msg)
@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
:returns: None :returns: None
""" """
return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, return _get_impl().fanout_cast_to_server(CONF, context, server_params,
topic, msg) topic, msg)
@ -263,10 +297,10 @@ def _get_impl():
global _RPCIMPL global _RPCIMPL
if _RPCIMPL is None: if _RPCIMPL is None:
try: try:
_RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) _RPCIMPL = importutils.import_module(CONF.rpc_backend)
except ImportError: except ImportError:
# For backwards compatibility with older nova config. # For backwards compatibility with older nova config.
impl = cfg.CONF.rpc_backend.replace('nova.rpc', impl = CONF.rpc_backend.replace('nova.rpc',
'nova.openstack.common.rpc') 'nova.openstack.common.rpc')
_RPCIMPL = importutils.import_module(impl) _RPCIMPL = importutils.import_module(impl)
return _RPCIMPL return _RPCIMPL

View File

@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool) conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout) wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg) conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg)) conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg return wait_msg

View File

@ -289,7 +289,7 @@ def deserialize_remote_exception(conf, data):
# NOTE(ameade): We DO NOT want to allow just any module to be imported, in # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
# order to prevent arbitrary code execution. # order to prevent arbitrary code execution.
if not module in conf.allowed_rpc_exception_modules: if module not in conf.allowed_rpc_exception_modules:
return RemoteError(name, failure.get('message'), trace) return RemoteError(name, failure.get('message'), trace)
try: try:

View File

@ -66,7 +66,8 @@ kombu_opts = [
help='the RabbitMQ userid'), help='the RabbitMQ userid'),
cfg.StrOpt('rabbit_password', cfg.StrOpt('rabbit_password',
default='guest', default='guest',
help='the RabbitMQ password'), help='the RabbitMQ password',
secret=True),
cfg.StrOpt('rabbit_virtual_host', cfg.StrOpt('rabbit_virtual_host',
default='/', default='/',
help='the RabbitMQ virtual host'), help='the RabbitMQ virtual host'),
@ -302,9 +303,15 @@ class Publisher(object):
channel=channel, channel=channel,
routing_key=self.routing_key) routing_key=self.routing_key)
def send(self, msg): def send(self, msg, timeout=None):
"""Send a message""" """Send a message"""
self.producer.publish(msg) if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
#
self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
else:
self.producer.publish(msg)
class DirectPublisher(Publisher): class DirectPublisher(Publisher):
@ -653,7 +660,7 @@ class Connection(object):
for proxy_cb in self.proxy_callbacks: for proxy_cb in self.proxy_callbacks:
proxy_cb.wait() proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs): def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
def _error_callback(exc): def _error_callback(exc):
@ -663,7 +670,7 @@ class Connection(object):
def _publish(): def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs) publisher = cls(self.conf, self.channel, topic, **kwargs)
publisher.send(msg) publisher.send(msg, timeout)
self.ensure(_error_callback, _publish) self.ensure(_error_callback, _publish)
@ -691,9 +698,9 @@ class Connection(object):
"""Send a 'direct' message""" """Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg) self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg): def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message""" """Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg) self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg): def fanout_send(self, topic, msg):
"""Send a 'fanout' message""" """Send a 'fanout' message"""
@ -701,7 +708,7 @@ class Connection(object):
def notify_send(self, topic, msg, **kwargs): def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic""" """Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg, **kwargs) self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None): def consume(self, limit=None):
"""Consume from all queues/consumers""" """Consume from all queues/consumers"""

View File

@ -51,7 +51,8 @@ qpid_opts = [
help='Username for qpid connection'), help='Username for qpid connection'),
cfg.StrOpt('qpid_password', cfg.StrOpt('qpid_password',
default='', default='',
help='Password for qpid connection'), help='Password for qpid connection',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms', cfg.StrOpt('qpid_sasl_mechanisms',
default='', default='',
help='Space separated list of SASL mechanisms to use for auth'), help='Space separated list of SASL mechanisms to use for auth'),
@ -486,9 +487,20 @@ class Connection(object):
"""Send a 'direct' message""" """Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg) self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg): def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message""" """Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg) #
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
# so let's create an actual qpid message here and get some
# value-add on the go.
#
# WARNING: Request timeout happens to be in the same units as
# qpid's TTL (seconds). If this changes in the future, then this
# will need to be altered accordingly.
#
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg): def fanout_send(self, topic, msg):
"""Send a 'fanout' message""" """Send a 'fanout' message"""

View File

@ -17,7 +17,6 @@
import os import os
import pprint import pprint
import socket import socket
import string
import sys import sys
import types import types
import uuid import uuid
@ -90,7 +89,7 @@ def _serialize(data):
Error if a developer passes us bad data. Error if a developer passes us bad data.
""" """
try: try:
return str(jsonutils.dumps(data, ensure_ascii=True)) return jsonutils.dumps(data, ensure_ascii=True)
except TypeError: except TypeError:
LOG.error(_("JSON serialization failed.")) LOG.error(_("JSON serialization failed."))
raise raise
@ -218,10 +217,11 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
msg_id = msg_id or 0
if serialize: if serialize:
data = rpc_common.serialize_msg(data, force_envelope) data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
_serialize(data)])
def close(self): def close(self):
self.outq.close() self.outq.close()
@ -295,13 +295,13 @@ class InternalContext(object):
ctx.replies) ctx.replies)
LOG.debug(_("Sending reply")) LOG.debug(_("Sending reply"))
cast(CONF, ctx, topic, { _multi_send(_cast, ctx, topic, {
'method': '-process_reply', 'method': '-process_reply',
'args': { 'args': {
'msg_id': msg_id, 'msg_id': msg_id, # Include for Folsom compat.
'response': response 'response': response
} }
}) }, _msg_id=msg_id)
class ConsumerBase(object): class ConsumerBase(object):
@ -321,21 +321,22 @@ class ConsumerBase(object):
return [result] return [result]
def process(self, style, target, proxy, ctx, data): def process(self, style, target, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
# Method starting with - are # Method starting with - are
# processed internally. (non-valid method name) # processed internally. (non-valid method name)
method = data['method'] method = data.get('method')
if not method:
LOG.error(_("RPC message did not include method."))
return
# Internal method # Internal method
# uses internal context for safety. # uses internal context for safety.
if data['method'][0] == '-': if method == '-reply':
# For reply / process_reply self.private_ctx.reply(ctx, proxy, **data['args'])
method = method[1:]
if method == 'reply':
self.private_ctx.reply(ctx, proxy, **data['args'])
return return
data.setdefault('version', None)
data.setdefault('args', {})
proxy.dispatch(ctx, data['version'], proxy.dispatch(ctx, data['version'],
data['method'], **data['args']) data['method'], **data['args'])
@ -436,20 +437,12 @@ class ZmqProxy(ZmqBaseReactor):
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
if topic.startswith('fanout~'):
sock_type = zmq.PUB sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
data = [str(msg_id), _serialize(response)]
else: else:
sock_type = zmq.PUSH sock_type = zmq.PUSH
if not topic in self.topic_proxy: if topic not in self.topic_proxy:
def publisher(waiter): def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic) LOG.info(_("Creating proxy for topic: %s"), topic)
@ -600,8 +593,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread() self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, def _cast(addr, context, topic, msg, timeout=None, serialize=True,
force_envelope=False): force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
@ -610,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr) conn = ZmqClient(addr)
# assumes cast can't return an exception # assumes cast can't return an exception
conn.cast(msg_id, topic, payload, serialize, force_envelope) conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception") raise RPCException("Cast failed. ZMQ Socket Exception")
finally: finally:
@ -618,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close() conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None, def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False): serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response # timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout timeout = timeout or CONF.rpc_response_timeout
@ -654,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
) )
LOG.debug(_("Sending cast")) LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload, _cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope) serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply")) LOG.debug(_("Cast sent; Waiting reply"))
@ -662,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
msg = msg_waiter.recv() msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response")) LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1]) responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error. # ZMQError trumps the Timeout error.
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("ZMQ Socket Error") raise RPCException("ZMQ Socket Error")
except (IndexError, KeyError):
raise RPCException(_("RPC Message Invalid."))
finally: finally:
if 'msg_waiter' in vars(): if 'msg_waiter' in vars():
msg_waiter.close() msg_waiter.close()
@ -682,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None, serialize=True, def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False): force_envelope=False, _msg_id=None):
""" """
Wraps the sending of messages, Wraps the sending of messages,
dispatches to the matchmaker and sends dispatches to the matchmaker and sends
@ -708,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast': if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context, eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout, serialize, _topic, msg, timeout, serialize,
force_envelope) force_envelope, _msg_id)
return return
return method(_addr, context, _topic, _topic, msg, timeout, return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope) serialize, force_envelope)
@ -777,21 +772,9 @@ def _get_ctxt():
return ZMQ_CTX return ZMQ_CTX
def _get_matchmaker(): def _get_matchmaker(*args, **kwargs):
global matchmaker global matchmaker
if not matchmaker: if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class' matchmaker = importutils.import_object(
mm_path = CONF.rpc_zmq_matchmaker.split('.') CONF.rpc_zmq_matchmaker, *args, **kwargs)
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
# Only initialize a class.
if mm_path[-1][0] not in string.ascii_uppercase:
LOG.error(_("Matchmaker could not be loaded.\n"
"rpc_zmq_matchmaker is not a class."))
raise RPCException(_("Error loading Matchmaker."))
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
return matchmaker return matchmaker

View File

@ -201,24 +201,25 @@ class FanoutRingExchange(RingExchange):
class LocalhostExchange(Exchange): class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local.""" """Exchange where all direct topics are local."""
def __init__(self): def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__() super(Exchange, self).__init__()
def run(self, key): def run(self, key):
return [(key.split('.')[0] + '.localhost', 'localhost')] return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange): class DirectExchange(Exchange):
""" """
Exchange where all topic keys are split, sending to second half. Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host" i.e. "compute.host" sends a message to "compute.host" running on "host"
""" """
def __init__(self): def __init__(self):
super(Exchange, self).__init__() super(Exchange, self).__init__()
def run(self, key): def run(self, key):
b, e = key.split('.', 1) e = key.split('.', 1)[1]
return [(b, e)] return [(key, e)]
class MatchMakerRing(MatchMakerBase): class MatchMakerRing(MatchMakerBase):
@ -237,11 +238,11 @@ class MatchMakerLocalhost(MatchMakerBase):
Match Maker where all bare topics resolve to localhost. Match Maker where all bare topics resolve to localhost.
Useful for testing. Useful for testing.
""" """
def __init__(self): def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__() super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange()) self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase): class MatchMakerStub(MatchMakerBase):

View File

@ -51,7 +51,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services = threadgroup.ThreadGroup('launcher') self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled() eventlet_backdoor.initialize_if_enabled()
@staticmethod @staticmethod
@ -310,7 +310,7 @@ class Service(object):
"""Service object for binaries running on hosts.""" """Service object for binaries running on hosts."""
def __init__(self, threads=1000): def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service', threads) self.tg = threadgroup.ThreadGroup(threads)
def start(self): def start(self):
pass pass

View File

@ -274,7 +274,7 @@ def _get_revno():
return len(revlist.splitlines()) return len(revlist.splitlines())
def get_version_from_git(pre_version): def _get_version_from_git(pre_version):
"""Return a version which is equal to the tag that's on the current """Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions revision if there is one, or tag plus number of additional revisions
if the current revision has no tag.""" if the current revision has no tag."""
@ -294,7 +294,7 @@ def get_version_from_git(pre_version):
return None return None
def get_version_from_pkg_info(package_name): def _get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can.""" """Get the version from PKG-INFO file if we can."""
try: try:
pkg_info_file = open('PKG-INFO', 'r') pkg_info_file = open('PKG-INFO', 'r')
@ -325,10 +325,10 @@ def get_version(package_name, pre_version=None):
version = os.environ.get("OSLO_PACKAGE_VERSION", None) version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version: if version:
return version return version
version = get_version_from_pkg_info(package_name) version = _get_version_from_pkg_info(package_name)
if version: if version:
return version return version
version = get_version_from_git(pre_version) version = _get_version_from_git(pre_version)
if version: if version:
return version return version
raise Exception("Versioning for this project requires either an sdist" raise Exception("Versioning for this project requires either an sdist"

View File

@ -38,8 +38,7 @@ class Thread(object):
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list. it has done so it can be removed from the threads list.
""" """
def __init__(self, name, thread, group): def __init__(self, thread, group):
self.name = name
self.thread = thread self.thread = thread
self.thread.link(_thread_done, group=group, thread=self) self.thread.link(_thread_done, group=group, thread=self)
@ -57,8 +56,7 @@ class ThreadGroup(object):
when need be). when need be).
* provide an easy API to add timers. * provide an easy API to add timers.
""" """
def __init__(self, name, thread_pool_size=10): def __init__(self, thread_pool_size=10):
self.name = name
self.pool = greenpool.GreenPool(thread_pool_size) self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = [] self.threads = []
self.timers = [] self.timers = []
@ -72,7 +70,7 @@ class ThreadGroup(object):
def add_thread(self, callback, *args, **kwargs): def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs) gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(callback.__name__, gt, self) th = Thread(gt, self)
self.threads.append(th) self.threads.append(th)
def thread_done(self, thread): def thread_done(self, thread):

View File

@ -98,6 +98,11 @@ def utcnow():
return datetime.datetime.utcnow() return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp"""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None utcnow.override_time = None
@ -162,3 +167,16 @@ def delta_seconds(before, after):
except AttributeError: except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds + return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6)) float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""
Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon

View File

@ -33,6 +33,14 @@ class VersionInfo(object):
self.version = None self.version = None
self._cached_version = None self._cached_version = None
def __str__(self):
"""Make the VersionInfo object behave like a string."""
return self.version_string()
def __repr__(self):
"""Include the name."""
return "VersionInfo(%s:%s)" % (self.package, self.version_string())
def _get_version_from_pkg_resources(self): def _get_version_from_pkg_resources(self):
"""Get the version of the package from the pkg_resources record """Get the version of the package from the pkg_resources record
associated with the package.""" associated with the package."""
@ -41,11 +49,11 @@ class VersionInfo(object):
provider = pkg_resources.get_provider(requirement) provider = pkg_resources.get_provider(requirement)
return provider.version return provider.version
except pkg_resources.DistributionNotFound: except pkg_resources.DistributionNotFound:
# The most likely cause for this is running tests in a tree with # The most likely cause for this is running tests in a tree
# produced from a tarball where the package itself has not been # produced from a tarball where the package itself has not been
# installed into anything. Check for a PKG-INFO file. # installed into anything. Revert to setup-time logic.
from quantum.openstack.common import setup from quantum.openstack.common import setup
return setup.get_version_from_pkg_info(self.package) return setup.get_version(self.package)
def release_string(self): def release_string(self):
"""Return the full version of the package including suffixes indicating """Return the full version of the package including suffixes indicating

View File

@ -25,6 +25,7 @@ import os
import subprocess import subprocess
import sys import sys
possible_topdir = os.getcwd() possible_topdir = os.getcwd()
if os.path.exists(os.path.join(possible_topdir, "quantum", if os.path.exists(os.path.join(possible_topdir, "quantum",
"__init__.py")): "__init__.py")):