diff --git a/doc/source/conf.py b/doc/source/conf.py index 449454c42f..abaf6c251b 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -77,9 +77,9 @@ copyright = u'2011-present, OpenStack, LLC.' # # Version info from quantum.version import version_info as quantum_version -release = quantum_version.version_string_with_vcs() +release = quantum_version.release_string() # The short X.Y version. -version = quantum_version.canonical_version_string() +version = quantum_version.version_string() # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/openstack-common.conf b/openstack-common.conf index 7ca64ce4ea..0d738376ed 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,5 +1,5 @@ [DEFAULT] # The list of modules to copy from openstack-common -modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,rpc,service,setup,threadgroup,timeutils,uuidutils,version +modules=cfg,context,eventlet_backdoor,exception,excutils,fileutils,gettextutils,importutils,iniparser,install_venv_common,jsonutils,local,lockutils,log,loopingcall,network_utils,notifier,periodic_task,policy,processutils,rpc,service,setup,threadgroup,timeutils,uuidutils,version # The base module to hold the copy of openstack.common base=quantum diff --git a/quantum/common/config.py b/quantum/common/config.py index f70bfa4b74..7fde3f9edc 100644 --- a/quantum/common/config.py +++ b/quantum/common/config.py @@ -84,7 +84,7 @@ rpc.set_defaults(control_exchange='quantum') def parse(args): cfg.CONF(args=args, project='quantum', - version='%%prog %s' % quantum_version.version_string_with_vcs()) + version='%%prog %s' % quantum_version.release_string()) # Validate that the base_mac is of the correct format msg = attributes._validate_regex(cfg.CONF.base_mac, diff --git a/quantum/openstack/common/cfg.py b/quantum/openstack/common/cfg.py index 56d169e1ab..9eaaf1ac32 100644 --- a/quantum/openstack/common/cfg.py +++ b/quantum/openstack/common/cfg.py @@ -1643,7 +1643,7 @@ class ConfigOpts(collections.Mapping): """ self._args = args - for opt, group in self._all_cli_opts(): + for opt, group in sorted(self._all_cli_opts()): opt._add_to_cli(self._oparser, group) return vars(self._oparser.parse_args(args)) diff --git a/quantum/openstack/common/exception.py b/quantum/openstack/common/exception.py index 20634b31bb..09c4d00a4f 100644 --- a/quantum/openstack/common/exception.py +++ b/quantum/openstack/common/exception.py @@ -23,6 +23,8 @@ import logging from quantum.openstack.common.gettextutils import _ +_FATAL_EXCEPTION_FORMAT_ERRORS = False + class Error(Exception): def __init__(self, message=None): @@ -121,9 +123,12 @@ class OpenstackException(Exception): try: self._error_string = self.message % kwargs - except Exception: - # at least get the core message out if something happened - self._error_string = self.message + except Exception as e: + if _FATAL_EXCEPTION_FORMAT_ERRORS: + raise e + else: + # at least get the core message out if something happened + self._error_string = self.message def __str__(self): return self._error_string diff --git a/quantum/openstack/common/jsonutils.py b/quantum/openstack/common/jsonutils.py index 72f39fa1b9..3fb6ff8af7 100644 --- a/quantum/openstack/common/jsonutils.py +++ b/quantum/openstack/common/jsonutils.py @@ -34,15 +34,21 @@ This module provides a few things: import datetime +import functools import inspect import itertools import json +import logging import xmlrpclib +from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import timeutils +LOG = logging.getLogger(__name__) -def to_primitive(value, convert_instances=False, level=0): + +def to_primitive(value, convert_instances=False, convert_datetime=True, + level=0, max_depth=3): """Convert a complex object into primitives. Handy for JSON serialization. We can optionally handle instances, @@ -78,12 +84,19 @@ def to_primitive(value, convert_instances=False, level=0): if getattr(value, '__module__', None) == 'mox': return 'mock' - if level > 3: + if level > max_depth: + LOG.error(_('Max serialization depth exceeded on object: %d %s'), + level, value) return '?' # The try block may not be necessary after the class check above, # but just in case ... try: + recursive = functools.partial(to_primitive, + convert_instances=convert_instances, + convert_datetime=convert_datetime, + level=level, + max_depth=max_depth) # It's not clear why xmlrpclib created their own DateTime type, but # for our purposes, make it a datetime type which is explicitly # handled @@ -91,33 +104,19 @@ def to_primitive(value, convert_instances=False, level=0): value = datetime.datetime(*tuple(value.timetuple())[:6]) if isinstance(value, (list, tuple)): - o = [] - for v in value: - o.append(to_primitive(v, convert_instances=convert_instances, - level=level)) - return o + return [recursive(v) for v in value] elif isinstance(value, dict): - o = {} - for k, v in value.iteritems(): - o[k] = to_primitive(v, convert_instances=convert_instances, - level=level) - return o - elif isinstance(value, datetime.datetime): + return dict((k, recursive(v)) for k, v in value.iteritems()) + elif convert_datetime and isinstance(value, datetime.datetime): return timeutils.strtime(value) elif hasattr(value, 'iteritems'): - return to_primitive(dict(value.iteritems()), - convert_instances=convert_instances, - level=level + 1) + return recursive(dict(value.iteritems()), level=level + 1) elif hasattr(value, '__iter__'): - return to_primitive(list(value), - convert_instances=convert_instances, - level=level) + return recursive(list(value)) elif convert_instances and hasattr(value, '__dict__'): # Likely an instance of something. Watch for cycles. # Ignore class member vars. - return to_primitive(value.__dict__, - convert_instances=convert_instances, - level=level + 1) + return recursive(value.__dict__, level=level + 1) else: return value except TypeError: diff --git a/quantum/openstack/common/local.py b/quantum/openstack/common/local.py index 19d962732c..8bdc837a91 100644 --- a/quantum/openstack/common/local.py +++ b/quantum/openstack/common/local.py @@ -26,6 +26,9 @@ class WeakLocal(corolocal.local): def __getattribute__(self, attr): rval = corolocal.local.__getattribute__(self, attr) if rval: + # NOTE(mikal): this bit is confusing. What is stored is a weak + # reference, not the value itself. We therefore need to lookup + # the weak reference and return the inner value here. rval = rval() return rval @@ -34,4 +37,12 @@ class WeakLocal(corolocal.local): return corolocal.local.__setattr__(self, attr, value) +# NOTE(mikal): the name "store" should be deprecated in the future store = WeakLocal() + +# A "weak" store uses weak references and allows an object to fall out of scope +# when it falls out of scope in the code that uses the thread local storage. A +# "strong" store will hold a reference to the object so that it never falls out +# of scope. +weak_store = WeakLocal() +strong_store = corolocal.local diff --git a/quantum/openstack/common/lockutils.py b/quantum/openstack/common/lockutils.py index b9500f7bdb..750a47bc72 100644 --- a/quantum/openstack/common/lockutils.py +++ b/quantum/openstack/common/lockutils.py @@ -29,6 +29,7 @@ from eventlet import semaphore from quantum.openstack.common import cfg from quantum.openstack.common import fileutils from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import local from quantum.openstack.common import log as logging @@ -39,9 +40,8 @@ util_opts = [ cfg.BoolOpt('disable_process_locking', default=False, help='Whether to disable inter-process locks'), cfg.StrOpt('lock_path', - default=os.path.abspath(os.path.join(os.path.dirname(__file__), - '../')), - help='Directory to use for lock files') + help=('Directory to use for lock files. Default to a ' + 'temp directory')) ] @@ -140,7 +140,7 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): def foo(self, *args): ... - ensures that only one thread will execute the bar method at a time. + ensures that only one thread will execute the foo method at a time. Different methods can share the same lock:: @@ -184,54 +184,66 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None): LOG.debug(_('Got semaphore "%(lock)s" for method ' '"%(method)s"...'), {'lock': name, 'method': f.__name__}) - if external and not CONF.disable_process_locking: - LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' - 'method "%(method)s"...'), - {'lock': name, 'method': f.__name__}) - cleanup_dir = False - # We need a copy of lock_path because it is non-local - local_lock_path = lock_path - if not local_lock_path: - local_lock_path = CONF.lock_path + # NOTE(mikal): I know this looks odd + if not hasattr(local.strong_store, 'locks_held'): + local.strong_store.locks_held = [] + local.strong_store.locks_held.append(name) - if not local_lock_path: - cleanup_dir = True - local_lock_path = tempfile.mkdtemp() + try: + if external and not CONF.disable_process_locking: + LOG.debug(_('Attempting to grab file lock "%(lock)s" ' + 'for method "%(method)s"...'), + {'lock': name, 'method': f.__name__}) + cleanup_dir = False - if not os.path.exists(local_lock_path): - cleanup_dir = True - fileutils.ensure_tree(local_lock_path) + # We need a copy of lock_path because it is non-local + local_lock_path = lock_path + if not local_lock_path: + local_lock_path = CONF.lock_path - # NOTE(mikal): the lock name cannot contain directory - # separators - safe_name = name.replace(os.sep, '_') - lock_file_name = '%s%s' % (lock_file_prefix, safe_name) - lock_file_path = os.path.join(local_lock_path, - lock_file_name) + if not local_lock_path: + cleanup_dir = True + local_lock_path = tempfile.mkdtemp() - try: - lock = InterProcessLock(lock_file_path) - with lock: - LOG.debug(_('Got file lock "%(lock)s" at %(path)s ' - 'for method "%(method)s"...'), + if not os.path.exists(local_lock_path): + cleanup_dir = True + fileutils.ensure_tree(local_lock_path) + + # NOTE(mikal): the lock name cannot contain directory + # separators + safe_name = name.replace(os.sep, '_') + lock_file_name = '%s%s' % (lock_file_prefix, safe_name) + lock_file_path = os.path.join(local_lock_path, + lock_file_name) + + try: + lock = InterProcessLock(lock_file_path) + with lock: + LOG.debug(_('Got file lock "%(lock)s" at ' + '%(path)s for method ' + '"%(method)s"...'), + {'lock': name, + 'path': lock_file_path, + 'method': f.__name__}) + retval = f(*args, **kwargs) + finally: + LOG.debug(_('Released file lock "%(lock)s" at ' + '%(path)s for method "%(method)s"...'), {'lock': name, 'path': lock_file_path, 'method': f.__name__}) - retval = f(*args, **kwargs) - finally: - LOG.debug(_('Released file lock "%(lock)s" at %(path)s' - ' for method "%(method)s"...'), - {'lock': name, - 'path': lock_file_path, - 'method': f.__name__}) - # NOTE(vish): This removes the tempdir if we needed - # to create one. This is used to cleanup - # the locks left behind by unit tests. - if cleanup_dir: - shutil.rmtree(local_lock_path) - else: - retval = f(*args, **kwargs) + # NOTE(vish): This removes the tempdir if we needed + # to create one. This is used to + # cleanup the locks left behind by unit + # tests. + if cleanup_dir: + shutil.rmtree(local_lock_path) + else: + retval = f(*args, **kwargs) + + finally: + local.strong_store.locks_held.remove(name) return retval return inner diff --git a/quantum/openstack/common/notifier/list_notifier.py b/quantum/openstack/common/notifier/list_notifier.py deleted file mode 100644 index baa4fab31f..0000000000 --- a/quantum/openstack/common/notifier/list_notifier.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from quantum.openstack.common import cfg -from quantum.openstack.common.gettextutils import _ -from quantum.openstack.common import importutils -from quantum.openstack.common import log as logging - - -list_notifier_drivers_opt = cfg.MultiStrOpt( - 'list_notifier_drivers', - default=['quantum.openstack.common.notifier.no_op_notifier'], - help='List of drivers to send notifications') - -CONF = cfg.CONF -CONF.register_opt(list_notifier_drivers_opt) - -LOG = logging.getLogger(__name__) - -drivers = None - - -class ImportFailureNotifier(object): - """Noisily re-raises some exception over-and-over when notify is called.""" - - def __init__(self, exception): - self.exception = exception - - def notify(self, context, message): - raise self.exception - - -def _get_drivers(): - """Instantiates and returns drivers based on the flag values.""" - global drivers - if drivers is None: - drivers = [] - for notification_driver in CONF.list_notifier_drivers: - try: - drivers.append(importutils.import_module(notification_driver)) - except ImportError as e: - drivers.append(ImportFailureNotifier(e)) - return drivers - - -def add_driver(notification_driver): - """Add a notification driver at runtime.""" - # Make sure the driver list is initialized. - _get_drivers() - if isinstance(notification_driver, basestring): - # Load and add - try: - drivers.append(importutils.import_module(notification_driver)) - except ImportError as e: - drivers.append(ImportFailureNotifier(e)) - else: - # Driver is already loaded; just add the object. - drivers.append(notification_driver) - - -def _object_name(obj): - name = [] - if hasattr(obj, '__module__'): - name.append(obj.__module__) - if hasattr(obj, '__name__'): - name.append(obj.__name__) - else: - name.append(obj.__class__.__name__) - return '.'.join(name) - - -def remove_driver(notification_driver): - """Remove a notification driver at runtime.""" - # Make sure the driver list is initialized. - _get_drivers() - removed = False - if notification_driver in drivers: - # We're removing an object. Easy. - drivers.remove(notification_driver) - removed = True - else: - # We're removing a driver by name. Search for it. - for driver in drivers: - if _object_name(driver) == notification_driver: - drivers.remove(driver) - removed = True - - if not removed: - raise ValueError("Cannot remove; %s is not in list" % - notification_driver) - - -def notify(context, message): - """Passes notification to multiple notifiers in a list.""" - for driver in _get_drivers(): - try: - driver.notify(context, message) - except Exception as e: - LOG.exception(_("Problem '%(e)s' attempting to send to " - "notification driver %(driver)s."), locals()) - - -def _reset_drivers(): - """Used by unit tests to reset the drivers.""" - global drivers - drivers = None diff --git a/quantum/openstack/common/processutils.py b/quantum/openstack/common/processutils.py new file mode 100644 index 0000000000..12ef558a42 --- /dev/null +++ b/quantum/openstack/common/processutils.py @@ -0,0 +1,135 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +System-level utilities and helper functions. +""" + +import logging +import random +import shlex + +from eventlet.green import subprocess +from eventlet import greenthread + +from quantum.openstack.common.gettextutils import _ + + +LOG = logging.getLogger(__name__) + + +class UnknownArgumentError(Exception): + def __init__(self, message=None): + super(UnknownArgumentError, self).__init__(message) + + +class ProcessExecutionError(Exception): + def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None, + description=None): + if description is None: + description = "Unexpected error while running command." + if exit_code is None: + exit_code = '-' + message = ("%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" + % (description, cmd, exit_code, stdout, stderr)) + super(ProcessExecutionError, self).__init__(message) + + +def execute(*cmd, **kwargs): + """ + Helper method to shell out and execute a command through subprocess with + optional retry. + + :param cmd: Passed to subprocess.Popen. + :type cmd: string + :param process_input: Send to opened process. + :type proces_input: string + :param check_exit_code: Defaults to 0. Will raise + :class:`ProcessExecutionError` + if the command exits without returning this value + as a returncode + :type check_exit_code: int + :param delay_on_retry: True | False. Defaults to True. If set to True, + wait a short amount of time before retrying. + :type delay_on_retry: boolean + :param attempts: How many times to retry cmd. + :type attempts: int + :param run_as_root: True | False. Defaults to False. If set to True, + the command is prefixed by the command specified + in the root_helper kwarg. + :type run_as_root: boolean + :param root_helper: command to prefix all cmd's with + :type root_helper: string + :returns: (stdout, stderr) from process execution + :raises: :class:`UnknownArgumentError` on + receiving unknown arguments + :raises: :class:`ProcessExecutionError` + """ + + process_input = kwargs.pop('process_input', None) + check_exit_code = kwargs.pop('check_exit_code', 0) + delay_on_retry = kwargs.pop('delay_on_retry', True) + attempts = kwargs.pop('attempts', 1) + run_as_root = kwargs.pop('run_as_root', False) + root_helper = kwargs.pop('root_helper', '') + if len(kwargs): + raise UnknownArgumentError(_('Got unknown keyword args ' + 'to utils.execute: %r') % kwargs) + if run_as_root: + cmd = shlex.split(root_helper) + list(cmd) + cmd = map(str, cmd) + + while attempts > 0: + attempts -= 1 + try: + LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd)) + _PIPE = subprocess.PIPE # pylint: disable=E1101 + obj = subprocess.Popen(cmd, + stdin=_PIPE, + stdout=_PIPE, + stderr=_PIPE, + close_fds=True) + result = None + if process_input is not None: + result = obj.communicate(process_input) + else: + result = obj.communicate() + obj.stdin.close() # pylint: disable=E1101 + _returncode = obj.returncode # pylint: disable=E1101 + if _returncode: + LOG.debug(_('Result was %s') % _returncode) + if (isinstance(check_exit_code, int) and + not isinstance(check_exit_code, bool) and + _returncode != check_exit_code): + (stdout, stderr) = result + raise ProcessExecutionError(exit_code=_returncode, + stdout=stdout, + stderr=stderr, + cmd=' '.join(cmd)) + return result + except ProcessExecutionError: + if not attempts: + raise + else: + LOG.debug(_('%r failed. Retrying.'), cmd) + if delay_on_retry: + greenthread.sleep(random.randint(20, 200) / 100.0) + finally: + # NOTE(termie): this appears to be necessary to let the subprocess + # call clean something up in between calls, without + # it two execute calls in a row hangs the second one + greenthread.sleep(0) diff --git a/quantum/openstack/common/rpc/__init__.py b/quantum/openstack/common/rpc/__init__.py index 8ce6456d5b..9d0fede6ff 100644 --- a/quantum/openstack/common/rpc/__init__.py +++ b/quantum/openstack/common/rpc/__init__.py @@ -25,8 +25,16 @@ For some wrappers that add message versioning to rpc, see: rpc.proxy """ +import inspect +import logging + from quantum.openstack.common import cfg +from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import importutils +from quantum.openstack.common import local + + +LOG = logging.getLogger(__name__) rpc_opts = [ @@ -62,7 +70,8 @@ rpc_opts = [ help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] -cfg.CONF.register_opts(rpc_opts) +CONF = cfg.CONF +CONF.register_opts(rpc_opts) def set_defaults(control_exchange): @@ -83,10 +92,27 @@ def create_connection(new=True): :returns: An instance of openstack.common.rpc.common.Connection """ - return _get_impl().create_connection(cfg.CONF, new=new) + return _get_impl().create_connection(CONF, new=new) -def call(context, topic, msg, timeout=None): +def _check_for_lock(): + if not CONF.debug: + return None + + if ((hasattr(local.strong_store, 'locks_held') + and local.strong_store.locks_held)): + stack = ' :: '.join([frame[3] for frame in inspect.stack()]) + LOG.warn(_('A RPC is being made while holding a lock. The locks ' + 'currently held are %(locks)s. This is probably a bug. ' + 'Please report it. Include the following: [%(stack)s].'), + {'locks': local.strong_store.locks_held, + 'stack': stack}) + return True + + return False + + +def call(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method that returns something. :param context: Information that identifies the user that has made this @@ -100,13 +126,17 @@ def call(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: A dict from the remote method. :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().call(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().call(CONF, context, topic, msg, timeout) def cast(context, topic, msg): @@ -124,7 +154,7 @@ def cast(context, topic, msg): :returns: None """ - return _get_impl().cast(cfg.CONF, context, topic, msg) + return _get_impl().cast(CONF, context, topic, msg) def fanout_cast(context, topic, msg): @@ -145,10 +175,10 @@ def fanout_cast(context, topic, msg): :returns: None """ - return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) + return _get_impl().fanout_cast(CONF, context, topic, msg) -def multicall(context, topic, msg, timeout=None): +def multicall(context, topic, msg, timeout=None, check_for_lock=False): """Invoke a remote method and get back an iterator. In this case, the remote method will be returning multiple values in @@ -166,6 +196,8 @@ def multicall(context, topic, msg, timeout=None): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. + :param check_for_lock: if True, a warning is emitted if a RPC call is made + with a lock held. :returns: An iterator. The iterator will yield a tuple (N, X) where N is an index that starts at 0 and increases by one for each value @@ -175,7 +207,9 @@ def multicall(context, topic, msg, timeout=None): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) + if check_for_lock: + _check_for_lock() + return _get_impl().multicall(CONF, context, topic, msg, timeout) def notify(context, topic, msg, envelope=False): @@ -217,7 +251,7 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, + return _get_impl().cast_to_server(CONF, context, server_params, topic, msg) @@ -233,7 +267,7 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, + return _get_impl().fanout_cast_to_server(CONF, context, server_params, topic, msg) @@ -263,10 +297,10 @@ def _get_impl(): global _RPCIMPL if _RPCIMPL is None: try: - _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + _RPCIMPL = importutils.import_module(CONF.rpc_backend) except ImportError: # For backwards compatibility with older nova config. - impl = cfg.CONF.rpc_backend.replace('nova.rpc', - 'nova.openstack.common.rpc') + impl = CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') _RPCIMPL = importutils.import_module(impl) return _RPCIMPL diff --git a/quantum/openstack/common/rpc/amqp.py b/quantum/openstack/common/rpc/amqp.py index 42fce2dd2c..66ffd528f7 100644 --- a/quantum/openstack/common/rpc/amqp.py +++ b/quantum/openstack/common/rpc/amqp.py @@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/quantum/openstack/common/rpc/common.py b/quantum/openstack/common/rpc/common.py index ca248eca9d..4be74d651e 100644 --- a/quantum/openstack/common/rpc/common.py +++ b/quantum/openstack/common/rpc/common.py @@ -289,7 +289,7 @@ def deserialize_remote_exception(conf, data): # NOTE(ameade): We DO NOT want to allow just any module to be imported, in # order to prevent arbitrary code execution. - if not module in conf.allowed_rpc_exception_modules: + if module not in conf.allowed_rpc_exception_modules: return RemoteError(name, failure.get('message'), trace) try: diff --git a/quantum/openstack/common/rpc/impl_kombu.py b/quantum/openstack/common/rpc/impl_kombu.py index 9e2620ffd7..70c2b87ae2 100644 --- a/quantum/openstack/common/rpc/impl_kombu.py +++ b/quantum/openstack/common/rpc/impl_kombu.py @@ -66,7 +66,8 @@ kombu_opts = [ help='the RabbitMQ userid'), cfg.StrOpt('rabbit_password', default='guest', - help='the RabbitMQ password'), + help='the RabbitMQ password', + secret=True), cfg.StrOpt('rabbit_virtual_host', default='/', help='the RabbitMQ virtual host'), @@ -302,9 +303,15 @@ class Publisher(object): channel=channel, routing_key=self.routing_key) - def send(self, msg): + def send(self, msg, timeout=None): """Send a message""" - self.producer.publish(msg) + if timeout: + # + # AMQP TTL is in milliseconds when set in the header. + # + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) + else: + self.producer.publish(msg) class DirectPublisher(Publisher): @@ -653,7 +660,7 @@ class Connection(object): for proxy_cb in self.proxy_callbacks: proxy_cb.wait() - def publisher_send(self, cls, topic, msg, **kwargs): + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): """Send to a publisher based on the publisher class""" def _error_callback(exc): @@ -663,7 +670,7 @@ class Connection(object): def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) - publisher.send(msg) + publisher.send(msg, timeout) self.ensure(_error_callback, _publish) @@ -691,9 +698,9 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + self.publisher_send(TopicPublisher, topic, msg, timeout) def fanout_send(self, topic, msg): """Send a 'fanout' message""" @@ -701,7 +708,7 @@ class Connection(object): def notify_send(self, topic, msg, **kwargs): """Send a notify message on a topic""" - self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) def consume(self, limit=None): """Consume from all queues/consumers""" diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index 7743b7205c..70b11d2147 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -51,7 +51,8 @@ qpid_opts = [ help='Username for qpid connection'), cfg.StrOpt('qpid_password', default='', - help='Password for qpid connection'), + help='Password for qpid connection', + secret=True), cfg.StrOpt('qpid_sasl_mechanisms', default='', help='Space separated list of SASL mechanisms to use for auth'), @@ -486,9 +487,20 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + # + # We want to create a message with attributes, e.g. a TTL. We + # don't really need to keep 'msg' in its JSON format any longer + # so let's create an actual qpid message here and get some + # value-add on the go. + # + # WARNING: Request timeout happens to be in the same units as + # qpid's TTL (seconds). If this changes in the future, then this + # will need to be altered accordingly. + # + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) + self.publisher_send(TopicPublisher, topic, qpid_message) def fanout_send(self, topic, msg): """Send a 'fanout' message""" diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index a3e4888634..532708ab85 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -17,7 +17,6 @@ import os import pprint import socket -import string import sys import types import uuid @@ -90,7 +89,7 @@ def _serialize(data): Error if a developer passes us bad data. """ try: - return str(jsonutils.dumps(data, ensure_ascii=True)) + return jsonutils.dumps(data, ensure_ascii=True) except TypeError: LOG.error(_("JSON serialization failed.")) raise @@ -218,10 +217,11 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + msg_id = msg_id or 0 + if serialize: data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) def close(self): self.outq.close() @@ -295,13 +295,13 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(CONF, ctx, topic, { + _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { - 'msg_id': msg_id, + 'msg_id': msg_id, # Include for Folsom compat. 'response': response } - }) + }, _msg_id=msg_id) class ConsumerBase(object): @@ -321,21 +321,22 @@ class ConsumerBase(object): return [result] def process(self, style, target, proxy, ctx, data): + data.setdefault('version', None) + data.setdefault('args', {}) + # Method starting with - are # processed internally. (non-valid method name) - method = data['method'] + method = data.get('method') + if not method: + LOG.error(_("RPC message did not include method.")) + return # Internal method # uses internal context for safety. - if data['method'][0] == '-': - # For reply / process_reply - method = method[1:] - if method == 'reply': - self.private_ctx.reply(ctx, proxy, **data['args']) + if method == '-reply': + self.private_ctx.reply(ctx, proxy, **data['args']) return - data.setdefault('version', None) - data.setdefault('args', {}) proxy.dispatch(ctx, data['version'], data['method'], **data['args']) @@ -436,20 +437,12 @@ class ZmqProxy(ZmqBaseReactor): LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - # Handle zmq_replies magic - if topic.startswith('fanout~'): + if topic.startswith('fanout~') or topic.startswith('zmq_replies'): sock_type = zmq.PUB - elif topic.startswith('zmq_replies'): - sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(_deserialize(in_msg)) - msg_id = inside[-1]['args']['msg_id'] - response = inside[-1]['args']['response'] - LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: + if topic not in self.topic_proxy: def publisher(waiter): LOG.info(_("Creating proxy for topic: %s"), topic) @@ -600,8 +593,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, - force_envelope=False): +def _cast(addr, context, topic, msg, timeout=None, serialize=True, + force_envelope=False, _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -610,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -618,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None, +def _call(addr, context, topic, msg, timeout=None, serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -654,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload, + _cast(addr, context, topic, payload, serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) @@ -662,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = _deserialize(msg[-1])[-1]['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") + except (IndexError, KeyError): + raise RPCException(_("RPC Message Invalid.")) finally: if 'msg_waiter' in vars(): msg_waiter.close() @@ -682,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None, def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False): + force_envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -708,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize, - force_envelope) + _topic, msg, timeout, serialize, + force_envelope, _msg_id) return - return method(_addr, context, _topic, _topic, msg, timeout, + return method(_addr, context, _topic, msg, timeout, serialize, force_envelope) @@ -777,21 +772,9 @@ def _get_ctxt(): return ZMQ_CTX -def _get_matchmaker(): +def _get_matchmaker(*args, **kwargs): global matchmaker if not matchmaker: - # rpc_zmq_matchmaker should be set to a 'module.Class' - mm_path = CONF.rpc_zmq_matchmaker.split('.') - mm_module = '.'.join(mm_path[:-1]) - mm_class = mm_path[-1] - - # Only initialize a class. - if mm_path[-1][0] not in string.ascii_uppercase: - LOG.error(_("Matchmaker could not be loaded.\n" - "rpc_zmq_matchmaker is not a class.")) - raise RPCException(_("Error loading Matchmaker.")) - - mm_impl = importutils.import_module(mm_module) - mm_constructor = getattr(mm_impl, mm_class) - matchmaker = mm_constructor() + matchmaker = importutils.import_object( + CONF.rpc_zmq_matchmaker, *args, **kwargs) return matchmaker diff --git a/quantum/openstack/common/rpc/matchmaker.py b/quantum/openstack/common/rpc/matchmaker.py index 3182d37ed5..2d97ac5e69 100644 --- a/quantum/openstack/common/rpc/matchmaker.py +++ b/quantum/openstack/common/rpc/matchmaker.py @@ -201,24 +201,25 @@ class FanoutRingExchange(RingExchange): class LocalhostExchange(Exchange): """Exchange where all direct topics are local.""" - def __init__(self): + def __init__(self, host='localhost'): + self.host = host super(Exchange, self).__init__() def run(self, key): - return [(key.split('.')[0] + '.localhost', 'localhost')] + return [('.'.join((key.split('.')[0], self.host)), self.host)] class DirectExchange(Exchange): """ Exchange where all topic keys are split, sending to second half. - i.e. "compute.host" sends a message to "compute" running on "host" + i.e. "compute.host" sends a message to "compute.host" running on "host" """ def __init__(self): super(Exchange, self).__init__() def run(self, key): - b, e = key.split('.', 1) - return [(b, e)] + e = key.split('.', 1)[1] + return [(key, e)] class MatchMakerRing(MatchMakerBase): @@ -237,11 +238,11 @@ class MatchMakerLocalhost(MatchMakerBase): Match Maker where all bare topics resolve to localhost. Useful for testing. """ - def __init__(self): + def __init__(self, host='localhost'): super(MatchMakerLocalhost, self).__init__() - self.add_binding(FanoutBinding(), LocalhostExchange()) + self.add_binding(FanoutBinding(), LocalhostExchange(host)) self.add_binding(DirectBinding(), DirectExchange()) - self.add_binding(TopicBinding(), LocalhostExchange()) + self.add_binding(TopicBinding(), LocalhostExchange(host)) class MatchMakerStub(MatchMakerBase): diff --git a/quantum/openstack/common/service.py b/quantum/openstack/common/service.py index e51b8f7d27..d702506627 100644 --- a/quantum/openstack/common/service.py +++ b/quantum/openstack/common/service.py @@ -51,7 +51,7 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup('launcher') + self._services = threadgroup.ThreadGroup() eventlet_backdoor.initialize_if_enabled() @staticmethod @@ -310,7 +310,7 @@ class Service(object): """Service object for binaries running on hosts.""" def __init__(self, threads=1000): - self.tg = threadgroup.ThreadGroup('service', threads) + self.tg = threadgroup.ThreadGroup(threads) def start(self): pass diff --git a/quantum/openstack/common/setup.py b/quantum/openstack/common/setup.py index fb187fff46..35680b3048 100644 --- a/quantum/openstack/common/setup.py +++ b/quantum/openstack/common/setup.py @@ -274,7 +274,7 @@ def _get_revno(): return len(revlist.splitlines()) -def get_version_from_git(pre_version): +def _get_version_from_git(pre_version): """Return a version which is equal to the tag that's on the current revision if there is one, or tag plus number of additional revisions if the current revision has no tag.""" @@ -294,7 +294,7 @@ def get_version_from_git(pre_version): return None -def get_version_from_pkg_info(package_name): +def _get_version_from_pkg_info(package_name): """Get the version from PKG-INFO file if we can.""" try: pkg_info_file = open('PKG-INFO', 'r') @@ -325,10 +325,10 @@ def get_version(package_name, pre_version=None): version = os.environ.get("OSLO_PACKAGE_VERSION", None) if version: return version - version = get_version_from_pkg_info(package_name) + version = _get_version_from_pkg_info(package_name) if version: return version - version = get_version_from_git(pre_version) + version = _get_version_from_git(pre_version) if version: return version raise Exception("Versioning for this project requires either an sdist" diff --git a/quantum/openstack/common/threadgroup.py b/quantum/openstack/common/threadgroup.py index d1e12715e8..ecabe0d05a 100644 --- a/quantum/openstack/common/threadgroup.py +++ b/quantum/openstack/common/threadgroup.py @@ -38,8 +38,7 @@ class Thread(object): :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when it has done so it can be removed from the threads list. """ - def __init__(self, name, thread, group): - self.name = name + def __init__(self, thread, group): self.thread = thread self.thread.link(_thread_done, group=group, thread=self) @@ -57,8 +56,7 @@ class ThreadGroup(object): when need be). * provide an easy API to add timers. """ - def __init__(self, name, thread_pool_size=10): - self.name = name + def __init__(self, thread_pool_size=10): self.pool = greenpool.GreenPool(thread_pool_size) self.threads = [] self.timers = [] @@ -72,7 +70,7 @@ class ThreadGroup(object): def add_thread(self, callback, *args, **kwargs): gt = self.pool.spawn(callback, *args, **kwargs) - th = Thread(callback.__name__, gt, self) + th = Thread(gt, self) self.threads.append(th) def thread_done(self, thread): diff --git a/quantum/openstack/common/timeutils.py b/quantum/openstack/common/timeutils.py index 0f346087f7..e2c2740573 100644 --- a/quantum/openstack/common/timeutils.py +++ b/quantum/openstack/common/timeutils.py @@ -98,6 +98,11 @@ def utcnow(): return datetime.datetime.utcnow() +def iso8601_from_timestamp(timestamp): + """Returns a iso8601 formated date from timestamp""" + return isotime(datetime.datetime.utcfromtimestamp(timestamp)) + + utcnow.override_time = None @@ -162,3 +167,16 @@ def delta_seconds(before, after): except AttributeError: return ((delta.days * 24 * 3600) + delta.seconds + float(delta.microseconds) / (10 ** 6)) + + +def is_soon(dt, window): + """ + Determines if time is going to happen in the next window seconds. + + :params dt: the time + :params window: minimum seconds to remain to consider the time not soon + + :return: True if expiration is within the given duration + """ + soon = (utcnow() + datetime.timedelta(seconds=window)) + return normalize_time(dt) <= soon diff --git a/quantum/openstack/common/version.py b/quantum/openstack/common/version.py index 2b6e2a23a3..3a38c584a3 100644 --- a/quantum/openstack/common/version.py +++ b/quantum/openstack/common/version.py @@ -33,6 +33,14 @@ class VersionInfo(object): self.version = None self._cached_version = None + def __str__(self): + """Make the VersionInfo object behave like a string.""" + return self.version_string() + + def __repr__(self): + """Include the name.""" + return "VersionInfo(%s:%s)" % (self.package, self.version_string()) + def _get_version_from_pkg_resources(self): """Get the version of the package from the pkg_resources record associated with the package.""" @@ -41,11 +49,11 @@ class VersionInfo(object): provider = pkg_resources.get_provider(requirement) return provider.version except pkg_resources.DistributionNotFound: - # The most likely cause for this is running tests in a tree with + # The most likely cause for this is running tests in a tree # produced from a tarball where the package itself has not been - # installed into anything. Check for a PKG-INFO file. + # installed into anything. Revert to setup-time logic. from quantum.openstack.common import setup - return setup.get_version_from_pkg_info(self.package) + return setup.get_version(self.package) def release_string(self): """Return the full version of the package including suffixes indicating diff --git a/tools/install_venv_common.py b/tools/install_venv_common.py index 2de8e1c829..8dfc4cf012 100644 --- a/tools/install_venv_common.py +++ b/tools/install_venv_common.py @@ -25,6 +25,7 @@ import os import subprocess import sys + possible_topdir = os.getcwd() if os.path.exists(os.path.join(possible_topdir, "quantum", "__init__.py")):