From cd92afb713e5be22a7bbfbe0d26498d1b37cdb78 Mon Sep 17 00:00:00 2001 From: Gary Kotton Date: Mon, 7 Jan 2013 13:52:21 +0000 Subject: [PATCH] Latest OSLO updates Change-Id: Ibf223203c8b34f614357fa4539d0dfa953765d6b --- quantum/common/config.py | 5 +- quantum/openstack/common/eventlet_backdoor.py | 10 +- quantum/openstack/common/exception.py | 4 +- quantum/openstack/common/excutils.py | 6 +- quantum/openstack/common/importutils.py | 2 +- quantum/openstack/common/jsonutils.py | 2 +- quantum/openstack/common/lockutils.py | 1 + quantum/openstack/common/log.py | 19 +- quantum/openstack/common/loopingcall.py | 9 +- quantum/openstack/common/notifier/api.py | 7 +- .../openstack/common/notifier/rpc_notifier.py | 2 +- .../common/notifier/rpc_notifier2.py | 51 +++++ quantum/openstack/common/periodic_task.py | 12 +- quantum/openstack/common/rpc/__init__.py | 22 +- quantum/openstack/common/rpc/amqp.py | 56 ++--- quantum/openstack/common/rpc/common.py | 191 ++++++++++++++++-- quantum/openstack/common/rpc/dispatcher.py | 28 +-- quantum/openstack/common/rpc/impl_fake.py | 11 +- quantum/openstack/common/rpc/impl_kombu.py | 28 ++- quantum/openstack/common/rpc/impl_qpid.py | 50 +++-- quantum/openstack/common/rpc/impl_zmq.py | 32 ++- quantum/openstack/common/rpc/matchmaker.py | 2 +- quantum/openstack/common/rpc/service.py | 5 + quantum/openstack/common/service.py | 42 ++-- quantum/openstack/common/setup.py | 18 +- quantum/openstack/common/threadgroup.py | 25 +-- quantum/openstack/common/timeutils.py | 33 ++- quantum/openstack/common/version.py | 25 +-- tools/pip-requires | 1 + 29 files changed, 512 insertions(+), 187 deletions(-) create mode 100644 quantum/openstack/common/notifier/rpc_notifier2.py diff --git a/quantum/common/config.py b/quantum/common/config.py index 079714de61..0293151082 100644 --- a/quantum/common/config.py +++ b/quantum/common/config.py @@ -27,6 +27,7 @@ from quantum.api.v2 import attributes from quantum.common import utils from quantum.openstack.common import cfg from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc from quantum.version import version_info as quantum_version @@ -49,9 +50,6 @@ core_opts = [ cfg.IntOpt('max_subnet_host_routes', default=20), cfg.IntOpt('dhcp_lease_duration', default=120), cfg.BoolOpt('allow_overlapping_ips', default=False), - cfg.StrOpt('control_exchange', - default='quantum', - help='AMQP exchange to connect to if using RabbitMQ or Qpid'), cfg.StrOpt('host', default=utils.get_hostname()), cfg.BoolOpt('force_gateway_on_subnet', default=False, help=_("Ensure that configured gateway is on subnet")), @@ -67,6 +65,7 @@ cfg.CONF.register_cli_opts(core_cli_opts) def parse(args): + rpc.set_defaults(control_exchange='quantum') cfg.CONF(args=args, project='quantum', version='%%prog %s' % quantum_version.version_string_with_vcs()) diff --git a/quantum/openstack/common/eventlet_backdoor.py b/quantum/openstack/common/eventlet_backdoor.py index cee9bb55d5..6af0f9da49 100644 --- a/quantum/openstack/common/eventlet_backdoor.py +++ b/quantum/openstack/common/eventlet_backdoor.py @@ -46,7 +46,7 @@ def _find_objects(t): def _print_greenthreads(): - for i, gt in enumerate(find_objects(greenlet.greenlet)): + for i, gt in enumerate(_find_objects(greenlet.greenlet)): print i, gt traceback.print_stack(gt.gr_frame) print @@ -61,7 +61,7 @@ def initialize_if_enabled(): } if CONF.backdoor_port is None: - return + return None # NOTE(johannes): The standard sys.displayhook will print the value of # the last expression and set it to __builtin__._, which overwrites @@ -73,6 +73,8 @@ def initialize_if_enabled(): pprint.pprint(val) sys.displayhook = displayhook - eventlet.spawn_n(eventlet.backdoor.backdoor_server, - eventlet.listen(('localhost', CONF.backdoor_port)), + sock = eventlet.listen(('localhost', CONF.backdoor_port)) + port = sock.getsockname()[1] + eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock, locals=backdoor_locals) + return port diff --git a/quantum/openstack/common/exception.py b/quantum/openstack/common/exception.py index 4866de2fd2..20634b31bb 100644 --- a/quantum/openstack/common/exception.py +++ b/quantum/openstack/common/exception.py @@ -21,6 +21,8 @@ Exceptions common to OpenStack projects import logging +from quantum.openstack.common.gettextutils import _ + class Error(Exception): def __init__(self, message=None): @@ -97,7 +99,7 @@ def wrap_exception(f): except Exception, e: if not isinstance(e, Error): #exc_type, exc_value, exc_traceback = sys.exc_info() - logging.exception('Uncaught exception') + logging.exception(_('Uncaught exception')) #logging.error(traceback.extract_stack(exc_traceback)) raise Error(str(e)) raise diff --git a/quantum/openstack/common/excutils.py b/quantum/openstack/common/excutils.py index 5dd4830176..6f6a81a0b9 100644 --- a/quantum/openstack/common/excutils.py +++ b/quantum/openstack/common/excutils.py @@ -24,6 +24,8 @@ import logging import sys import traceback +from quantum.openstack.common.gettextutils import _ + @contextlib.contextmanager def save_and_reraise_exception(): @@ -43,7 +45,7 @@ def save_and_reraise_exception(): try: yield except Exception: - logging.error('Original exception being dropped: %s' % - (traceback.format_exception(type_, value, tb))) + logging.error(_('Original exception being dropped: %s'), + traceback.format_exception(type_, value, tb)) raise raise type_, value, tb diff --git a/quantum/openstack/common/importutils.py b/quantum/openstack/common/importutils.py index f45372b4db..2a28b455e8 100644 --- a/quantum/openstack/common/importutils.py +++ b/quantum/openstack/common/importutils.py @@ -29,7 +29,7 @@ def import_class(import_str): try: __import__(mod_str) return getattr(sys.modules[mod_str], class_str) - except (ValueError, AttributeError), exc: + except (ValueError, AttributeError): raise ImportError('Class %s cannot be found (%s)' % (class_str, traceback.format_exception(*sys.exc_info()))) diff --git a/quantum/openstack/common/jsonutils.py b/quantum/openstack/common/jsonutils.py index 9324a61fa3..72f39fa1b9 100644 --- a/quantum/openstack/common/jsonutils.py +++ b/quantum/openstack/common/jsonutils.py @@ -120,7 +120,7 @@ def to_primitive(value, convert_instances=False, level=0): level=level + 1) else: return value - except TypeError, e: + except TypeError: # Class objects are tricky since they may define something like # __iter__ defined but it isn't callable as list(). return unicode(value) diff --git a/quantum/openstack/common/lockutils.py b/quantum/openstack/common/lockutils.py index 9f4eddf57c..ead7e13c7f 100644 --- a/quantum/openstack/common/lockutils.py +++ b/quantum/openstack/common/lockutils.py @@ -28,6 +28,7 @@ from eventlet import semaphore from quantum.openstack.common import cfg from quantum.openstack.common import fileutils +from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import log as logging diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py index 9a9dfc577f..0640c30711 100644 --- a/quantum/openstack/common/log.py +++ b/quantum/openstack/common/log.py @@ -49,19 +49,20 @@ from quantum.openstack.common import notifier log_opts = [ cfg.StrOpt('logging_context_format_string', - default='%(asctime)s %(levelname)s %(name)s [%(request_id)s ' - '%(user_id)s %(project_id)s] %(instance)s' + default='%(asctime)s.%(msecs)d %(levelname)s %(name)s ' + '[%(request_id)s %(user)s %(tenant)s] %(instance)s' '%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', - default='%(asctime)s %(process)d %(levelname)s %(name)s [-]' - ' %(instance)s%(message)s', + default='%(asctime)s.%(msecs)d %(process)d %(levelname)s ' + '%(name)s [-] %(instance)s%(message)s', help='format string to use for log messages without context'), cfg.StrOpt('logging_debug_format_suffix', default='%(funcName)s %(pathname)s:%(lineno)d', help='data to append to log format when level is DEBUG'), cfg.StrOpt('logging_exception_prefix', - default='%(asctime)s %(process)d TRACE %(name)s %(instance)s', + default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s ' + '%(instance)s', help='prefix each line of exception output with this format'), cfg.ListOpt('default_log_levels', default=[ @@ -174,7 +175,7 @@ class ContextAdapter(logging.LoggerAdapter): self.log(logging.AUDIT, msg, *args, **kwargs) def deprecated(self, msg, *args, **kwargs): - stdmsg = _("Deprecated Config: %s") % msg + stdmsg = _("Deprecated: %s") % msg if CONF.fatal_deprecations: self.critical(stdmsg, *args, **kwargs) raise DeprecatedConfig(msg=stdmsg) @@ -289,6 +290,12 @@ def setup(product_name): _setup_logging_from_conf(product_name) +def set_defaults(logging_context_format_string): + cfg.set_defaults(log_opts, + logging_context_format_string= + logging_context_format_string) + + def _find_facility_from_conf(): facility_names = logging.handlers.SysLogHandler.facility_names facility = getattr(logging.handlers.SysLogHandler, diff --git a/quantum/openstack/common/loopingcall.py b/quantum/openstack/common/loopingcall.py index a07216768d..efce59528e 100644 --- a/quantum/openstack/common/loopingcall.py +++ b/quantum/openstack/common/loopingcall.py @@ -24,6 +24,7 @@ from eventlet import greenthread from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import log as logging +from quantum.openstack.common import timeutils LOG = logging.getLogger(__name__) @@ -62,10 +63,16 @@ class LoopingCall(object): try: while self._running: + start = timeutils.utcnow() self.f(*self.args, **self.kw) + end = timeutils.utcnow() if not self._running: break - greenthread.sleep(interval) + delay = interval - timeutils.delta_seconds(start, end) + if delay <= 0: + LOG.warn(_('task run outlasted interval by %s sec') % + -delay) + greenthread.sleep(delay if delay > 0 else 0) except LoopingCallDone, e: self.stop() done.send(e.retvalue) diff --git a/quantum/openstack/common/notifier/api.py b/quantum/openstack/common/notifier/api.py index d3d4d0f6d0..09ab447a36 100644 --- a/quantum/openstack/common/notifier/api.py +++ b/quantum/openstack/common/notifier/api.py @@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload): for driver in _get_drivers(): try: driver.notify(context, msg) - except Exception, e: + except Exception as e: LOG.exception(_("Problem '%(e)s' attempting to " "send to notification system. " - "Payload=%(payload)s") % locals()) + "Payload=%(payload)s") + % dict(e=e, payload=payload)) _drivers = None @@ -166,7 +167,7 @@ def add_driver(notification_driver): try: driver = importutils.import_module(notification_driver) _drivers[notification_driver] = driver - except ImportError as e: + except ImportError: LOG.exception(_("Failed to load notifier %s. " "These notifications will not be sent.") % notification_driver) diff --git a/quantum/openstack/common/notifier/rpc_notifier.py b/quantum/openstack/common/notifier/rpc_notifier.py index c1650537da..213779777b 100644 --- a/quantum/openstack/common/notifier/rpc_notifier.py +++ b/quantum/openstack/common/notifier/rpc_notifier.py @@ -41,6 +41,6 @@ def notify(context, message): topic = '%s.%s' % (topic, priority) try: rpc.notify(context, topic, message) - except Exception, e: + except Exception: LOG.exception(_("Could not send notification to %(topic)s. " "Payload=%(message)s"), locals()) diff --git a/quantum/openstack/common/notifier/rpc_notifier2.py b/quantum/openstack/common/notifier/rpc_notifier2.py new file mode 100644 index 0000000000..b0b9e66756 --- /dev/null +++ b/quantum/openstack/common/notifier/rpc_notifier2.py @@ -0,0 +1,51 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +'''messaging based notification driver, with message envelopes''' + +from quantum.openstack.common import cfg +from quantum.openstack.common import context as req_context +from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'topics', default=['notifications', ], + help='AMQP topic(s) used for openstack notifications') + +opt_group = cfg.OptGroup(name='rpc_notifier2', + title='Options for rpc_notifier2') + +CONF = cfg.CONF +CONF.register_group(opt_group) +CONF.register_opt(notification_topic_opt, opt_group) + + +def notify(context, message): + """Sends a notification via RPC""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.rpc_notifier2.topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message, envelope=True) + except Exception: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), locals()) diff --git a/quantum/openstack/common/periodic_task.py b/quantum/openstack/common/periodic_task.py index ba2f5119a9..43d125b50d 100644 --- a/quantum/openstack/common/periodic_task.py +++ b/quantum/openstack/common/periodic_task.py @@ -95,17 +95,21 @@ class PeriodicTasks(object): ticks_to_skip = self._ticks_to_skip[task_name] if ticks_to_skip > 0: LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" - " ticks left until next run"), locals()) + " ticks left until next run"), + dict(full_task_name=full_task_name, + ticks_to_skip=ticks_to_skip)) self._ticks_to_skip[task_name] -= 1 continue self._ticks_to_skip[task_name] = task._ticks_between_runs - LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) + LOG.debug(_("Running periodic task %(full_task_name)s"), + dict(full_task_name=full_task_name)) try: task(self, context) except Exception as e: if raise_on_error: raise - LOG.exception(_("Error during %(full_task_name)s: %(e)s"), - locals()) + LOG.exception(_("Error during %(full_task_name)s:" + " %(e)s"), + dict(e=e, full_task_name=full_task_name)) diff --git a/quantum/openstack/common/rpc/__init__.py b/quantum/openstack/common/rpc/__init__.py index f26919306d..8ce6456d5b 100644 --- a/quantum/openstack/common/rpc/__init__.py +++ b/quantum/openstack/common/rpc/__init__.py @@ -50,25 +50,26 @@ rpc_opts = [ default=['quantum.openstack.common.exception', 'nova.exception', 'cinder.exception', + 'exceptions', ], help='Modules of exceptions that are permitted to be recreated' 'upon receiving exception data from an rpc call.'), cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), - # - # The following options are not registered here, but are expected to be - # present. The project using this library must register these options with - # the configuration so that project-specific defaults may be defined. - # - #cfg.StrOpt('control_exchange', - # default='nova', - # help='AMQP exchange to connect to if using RabbitMQ or Qpid'), + cfg.StrOpt('control_exchange', + default='openstack', + help='AMQP exchange to connect to if using RabbitMQ or Qpid'), ] cfg.CONF.register_opts(rpc_opts) +def set_defaults(control_exchange): + cfg.set_defaults(rpc_opts, + control_exchange=control_exchange) + + def create_connection(new=True): """Create a connection to the message bus used for rpc. @@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None): return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) -def notify(context, topic, msg): +def notify(context, topic, msg, envelope=False): """Send notification event. :param context: Information that identifies the user that has made this request. :param topic: The topic to send the notification to. :param msg: This is a dict of content of event. + :param envelope: Set to True to enable message envelope for notifications. :returns: None """ - return _get_impl().notify(cfg.CONF, context, topic, msg) + return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) def cleanup(): diff --git a/quantum/openstack/common/rpc/amqp.py b/quantum/openstack/common/rpc/amqp.py index e763369622..42fce2dd2c 100644 --- a/quantum/openstack/common/rpc/amqp.py +++ b/quantum/openstack/common/rpc/amqp.py @@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code. """ import inspect -import logging import sys import uuid @@ -34,10 +33,10 @@ from eventlet import greenpool from eventlet import pools from eventlet import semaphore -from quantum.openstack.common import cfg from quantum.openstack.common import excutils from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import local +from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import common as rpc_common @@ -55,7 +54,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug('Pool creating new connection') + LOG.debug(_('Pool creating new connection')) return self.connection_cls(self.conf) def empty(self): @@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection): def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, - ending=False): + ending=False, log_failure=True): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, """ with ConnectionContext(conf, connection_pool) as conn: if failure: - failure = rpc_common.serialize_remote_exception(failure) + failure = rpc_common.serialize_remote_exception(failure, + log_failure) try: msg = {'result': reply, 'failure': failure} @@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, msg) + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): @@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext): return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, - connection_pool=None): + connection_pool=None, log_failure=True): if self.msg_id: msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, - ending) + ending, log_failure) if ending: self.msg_id = None @@ -282,11 +282,21 @@ class ProxyCallback(object): ctxt.reply(rval, None, connection_pool=self.connection_pool) # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) - except Exception as e: - LOG.exception('Exception during message handling') + except rpc_common.ClientException as e: + LOG.debug(_('Expected exception during message handling (%s)') % + e._exc_info[1]) + ctxt.reply(None, e._exc_info, + connection_pool=self.connection_pool, + log_failure=False) + except Exception: + LOG.exception(_('Exception during message handling')) ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + class MulticallWaiter(object): def __init__(self, conf, connection, timeout): @@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): # that will continue to use the connection. When it's done, # connection.close() will get called which will put it back into # the pool - LOG.debug(_('Making asynchronous call on %s ...'), topic) + LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) @@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) return wait_msg @@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -402,15 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) -def notify(conf, context, topic, msg, connection_pool): +def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" - event_type = msg.get('event_type') - LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) + LOG.debug(_('Sending %(event_type)s on %(topic)s'), + dict(event_type=msg.get('event_type'), + topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: + if envelope: + msg = rpc_common.serialize_msg(msg, force_envelope=True) conn.notify_send(topic, msg) @@ -420,7 +433,4 @@ def cleanup(connection_pool): def get_control_exchange(conf): - try: - return conf.control_exchange - except cfg.NoSuchOptError: - return 'openstack' + return conf.control_exchange diff --git a/quantum/openstack/common/rpc/common.py b/quantum/openstack/common/rpc/common.py index cade2181cf..ca248eca9d 100644 --- a/quantum/openstack/common/rpc/common.py +++ b/quantum/openstack/common/rpc/common.py @@ -18,18 +18,61 @@ # under the License. import copy -import logging +import sys import traceback +from quantum.openstack.common import cfg from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import importutils from quantum.openstack.common import jsonutils from quantum.openstack.common import local +from quantum.openstack.common import log as logging +CONF = cfg.CONF LOG = logging.getLogger(__name__) +'''RPC Envelope Version. + +This version number applies to the top level structure of messages sent out. +It does *not* apply to the message payload, which must be versioned +independently. For example, when using rpc APIs, a version number is applied +for changes to the API being exposed over rpc. This version number is handled +in the rpc proxy and dispatcher modules. + +This version number applies to the message envelope that is used in the +serialization done inside the rpc layer. See serialize_msg() and +deserialize_msg(). + +The current message format (version 2.0) is very simple. It is: + + { + 'quantum.version': , + 'quantum.message': + } + +Message format version '1.0' is just considered to be the messages we sent +without a message envelope. + +So, the current message envelope just includes the envelope version. It may +eventually contain additional information, such as a signature for the message +payload. + +We will JSON encode the application message payload. The message envelope, +which includes the JSON encoded application message body, will be passed down +to the messaging libraries as a dict. +''' +_RPC_ENVELOPE_VERSION = '2.0' + +_VERSION_KEY = 'quantum.version' +_MESSAGE_KEY = 'quantum.message' + + +# TODO(russellb) Turn this on after Grizzly. +_SEND_RPC_ENVELOPE = False + + class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -40,7 +83,7 @@ class RPCException(Exception): try: message = self.message % kwargs - except Exception as e: + except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs LOG.exception(_('Exception in string format operation')) @@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException): "this endpoint.") +class UnsupportedRpcEnvelopeVersion(RPCException): + message = _("Specified RPC envelope version, %(version)s, " + "not supported by this endpoint.") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -164,8 +212,12 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = {'set_admin_password': ('new_pass',), - 'run_instance': ('admin_password',), } + SANITIZE = {'set_admin_password': [('args', 'new_pass')], + 'run_instance': [('args', 'admin_password')], + 'route_message': [('args', 'message', 'args', 'method_info', + 'method_kwargs', 'password'), + ('args', 'message', 'args', 'method_info', + 'method_kwargs', 'admin_password')]} has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_context_token = '_context_auth_token' in msg_data @@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data): msg_data = copy.deepcopy(msg_data) if has_method: - method = msg_data['method'] - if method in SANITIZE: - args_to_sanitize = SANITIZE[method] - for arg in args_to_sanitize: - try: - msg_data['args'][arg] = "" - except KeyError: - pass + for arg in SANITIZE.get(msg_data['method'], []): + try: + d = msg_data + for elem in arg[:-1]: + d = d[elem] + d[arg[-1]] = '' + except KeyError, e: + LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), + {'item': arg, + 'err': e}) if has_context_token: msg_data['_context_auth_token'] = '' @@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data): return log_func(msg, msg_data) -def serialize_remote_exception(failure_info): +def serialize_remote_exception(failure_info, log_failure=True): """Prepares exception data to be sent over rpc. Failure_info should be a sys.exc_info() tuple. @@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info): """ tb = traceback.format_exception(*failure_info) failure = failure_info[1] - LOG.error(_("Returning exception %s to caller"), unicode(failure)) - LOG.error(tb) + if log_failure: + LOG.error(_("Returning exception %s to caller"), unicode(failure)) + LOG.error(tb) kwargs = {} if hasattr(failure, 'kwargs'): @@ -258,7 +313,7 @@ def deserialize_remote_exception(conf, data): # we cannot necessarily change an exception message so we must override # the __str__ method. failure.__class__ = new_ex_type - except TypeError as e: + except TypeError: # NOTE(ameade): If a core exception then just add the traceback to the # first exception argument. failure.args = (message,) + failure.args[1:] @@ -309,3 +364,107 @@ class CommonRpcContext(object): context.values['read_deleted'] = read_deleted return context + + +class ClientException(Exception): + """This encapsulates some actual exception that is expected to be + hit by an RPC proxy object. Merely instantiating it records the + current exception information, which will be passed back to the + RPC client without exceptional logging.""" + def __init__(self): + self._exc_info = sys.exc_info() + + +def catch_client_exception(exceptions, func, *args, **kwargs): + try: + return func(*args, **kwargs) + except Exception, e: + if type(e) in exceptions: + raise ClientException() + else: + raise + + +def client_exceptions(*exceptions): + """Decorator for manager methods that raise expected exceptions. + Marking a Manager method with this decorator allows the declaration + of expected exceptions that the RPC layer should not consider fatal, + and not log as if they were generated in a real error scenario. Note + that this will cause listed exceptions to be wrapped in a + ClientException, which is used internally by the RPC layer.""" + def outer(func): + def inner(*args, **kwargs): + return catch_client_exception(exceptions, func, *args, **kwargs) + return inner + return outer + + +def version_is_compatible(imp_version, version): + """Determine whether versions are compatible. + + :param imp_version: The version implemented + :param version: The version requested by an incoming message. + """ + version_parts = version.split('.') + imp_version_parts = imp_version.split('.') + if int(version_parts[0]) != int(imp_version_parts[0]): # Major + return False + if int(version_parts[1]) > int(imp_version_parts[1]): # Minor + return False + return True + + +def serialize_msg(raw_msg, force_envelope=False): + if not _SEND_RPC_ENVELOPE and not force_envelope: + return raw_msg + + # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more + # information about this format. + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, + _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + + return msg + + +def deserialize_msg(msg): + # NOTE(russellb): Hang on to your hats, this road is about to + # get a little bumpy. + # + # Robustness Principle: + # "Be strict in what you send, liberal in what you accept." + # + # At this point we have to do a bit of guessing about what it + # is we just received. Here is the set of possibilities: + # + # 1) We received a dict. This could be 2 things: + # + # a) Inspect it to see if it looks like a standard message envelope. + # If so, great! + # + # b) If it doesn't look like a standard message envelope, it could either + # be a notification, or a message from before we added a message + # envelope (referred to as version 1.0). + # Just return the message as-is. + # + # 2) It's any other non-dict type. Just return it and hope for the best. + # This case covers return values from rpc.call() from before message + # envelopes were used. (messages to call a method were always a dict) + + if not isinstance(msg, dict): + # See #2 above. + return msg + + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) + if not all(map(lambda key: key in msg, base_envelope_keys)): + # See #1.b above. + return msg + + # At this point we think we have the message envelope + # format we were expecting. (#1.a above) + + if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): + raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) + + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + + return raw_msg diff --git a/quantum/openstack/common/rpc/dispatcher.py b/quantum/openstack/common/rpc/dispatcher.py index 14dd180b68..de212227cb 100644 --- a/quantum/openstack/common/rpc/dispatcher.py +++ b/quantum/openstack/common/rpc/dispatcher.py @@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today, there can be both versioned and unversioned APIs implemented in the same code base. - -EXAMPLES: +EXAMPLES +======== Nova was the first project to use versioned rpc APIs. Consider the compute rpc API as an example. The client side is in nova/compute/rpcapi.py and the server @@ -50,12 +50,13 @@ side is in nova/compute/manager.py. Example 1) Adding a new method. +------------------------------- Adding a new method is a backwards compatible change. It should be added to nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should have a specific version specified to indicate the minimum API version that must -be implemented for the method to be supported. For example: +be implemented for the method to be supported. For example:: def get_host_uptime(self, ctxt, host): topic = _compute_topic(self.topic, ctxt, host, None) @@ -67,10 +68,11 @@ get_host_uptime() method. Example 2) Adding a new parameter. +---------------------------------- Adding a new parameter to an rpc method can be made backwards compatible. The RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. -The implementation of the method must not expect the parameter to be present. +The implementation of the method must not expect the parameter to be present.:: def some_remote_method(self, arg1, arg2, newarg=None): # The code needs to deal with newarg=None for cases @@ -101,21 +103,6 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - @staticmethod - def _is_compatible(mversion, version): - """Determine whether versions are compatible. - - :param mversion: The API version implemented by a callback. - :param version: The API version requested by an incoming message. - """ - version_parts = version.split('.') - mversion_parts = mversion.split('.') - if int(version_parts[0]) != int(mversion_parts[0]): # Major - return False - if int(version_parts[1]) > int(mversion_parts[1]): # Minor - return False - return True - def dispatch(self, ctxt, version, method, **kwargs): """Dispatch a message based on a requested version. @@ -137,7 +124,8 @@ class RpcDispatcher(object): rpc_api_version = proxyobj.RPC_API_VERSION else: rpc_api_version = '1.0' - is_compatible = self._is_compatible(rpc_api_version, version) + is_compatible = rpc_common.version_is_compatible(rpc_api_version, + version) had_compatible = had_compatible or is_compatible if not hasattr(proxyobj, method): continue diff --git a/quantum/openstack/common/rpc/impl_fake.py b/quantum/openstack/common/rpc/impl_fake.py index 49124ef625..af1406615a 100644 --- a/quantum/openstack/common/rpc/impl_fake.py +++ b/quantum/openstack/common/rpc/impl_fake.py @@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests. """ import inspect +# NOTE(russellb): We specifically want to use json, not our own jsonutils. +# jsonutils has some extra logic to automatically convert objects to primitive +# types so that they can be serialized. We want to catch all cases where +# non-primitive types make it into this code and treat it as an error. +import json import time import eventlet -from quantum.openstack.common import jsonutils from quantum.openstack.common.rpc import common as rpc_common CONSUMERS = {} @@ -75,6 +79,8 @@ class Consumer(object): else: res.append(rval) done.send(res) + except rpc_common.ClientException as e: + done.send_exception(e._exc_info[1]) except Exception as e: done.send_exception(e) @@ -121,7 +127,7 @@ def create_connection(conf, new=True): def check_serialize(msg): """Make sure a message intended for rpc can be serialized.""" - jsonutils.dumps(msg) + json.dumps(msg) def multicall(conf, context, topic, msg, timeout=None): @@ -154,6 +160,7 @@ def call(conf, context, topic, msg, timeout=None): def cast(conf, context, topic, msg): + check_serialize(msg) try: call(conf, context, topic, msg) except Exception: diff --git a/quantum/openstack/common/rpc/impl_kombu.py b/quantum/openstack/common/rpc/impl_kombu.py index b0b292794c..9e2620ffd7 100644 --- a/quantum/openstack/common/rpc/impl_kombu.py +++ b/quantum/openstack/common/rpc/impl_kombu.py @@ -162,7 +162,8 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - callback(message.payload) + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) @@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase): # Default options options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) exchange = kombu.entity.Exchange(name=msg_id, type='direct', @@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase): options = {'durable': False, 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', durable=options['durable'], @@ -316,7 +317,7 @@ class DirectPublisher(Publisher): options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) super(DirectPublisher, self).__init__(channel, msg_id, msg_id, type='direct', **options) @@ -350,7 +351,7 @@ class FanoutPublisher(Publisher): """ options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, None, type='fanout', **options) @@ -387,6 +388,7 @@ class Connection(object): def __init__(self, conf, server_params=None): self.consumers = [] self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf self.max_retries = self.conf.rabbit_max_retries # Try forever? @@ -469,7 +471,7 @@ class Connection(object): LOG.info(_("Reconnecting to AMQP server on " "%(hostname)s:%(port)d") % params) try: - self.connection.close() + self.connection.release() except self.connection_errors: pass # Setting this in case the next statement fails, though @@ -573,12 +575,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.channel.close() self.channel = self.connection.channel() # work around 'memory' transport bug in 1.1.3 @@ -644,6 +648,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg, **kwargs): """Send to a publisher based on the publisher class""" @@ -719,6 +728,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -730,6 +740,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) @@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify( conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index d8813bbfdf..16f21a4e58 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -17,7 +17,6 @@ import functools import itertools -import logging import time import uuid @@ -29,6 +28,7 @@ import qpid.messaging.exceptions from quantum.openstack.common import cfg from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import jsonutils +from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import amqp as rpc_amqp from quantum.openstack.common.rpc import common as rpc_common @@ -41,6 +41,9 @@ qpid_opts = [ cfg.StrOpt('qpid_port', default='5672', help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), cfg.StrOpt('qpid_username', default='', help='Username for qpid connection'), @@ -121,7 +124,8 @@ class ConsumerBase(object): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() try: - self.callback(message.content) + msg = rpc_common.deserialize_msg(message.content) + self.callback(msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -274,25 +278,32 @@ class Connection(object): self.session = None self.consumers = {} self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf + if server_params and 'hostname' in server_params: + # NOTE(russellb) This enables support for cast_to_server. + server_params['qpid_hosts'] = [ + '%s:%d' % (server_params['hostname'], + server_params.get('port', 5672)) + ] + params = { - 'hostname': self.conf.qpid_hostname, - 'port': self.conf.qpid_port, + 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, 'password': self.conf.qpid_password, } params.update(server_params or {}) - self.broker = params['hostname'] + ":" + str(params['port']) + self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create() + self.connection_create(self.brokers[0]) self.reconnect() - def connection_create(self): + def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(self.broker) + self.connection = qpid.messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -320,10 +331,14 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + attempt = 0 delay = 1 while True: + broker = self.brokers[attempt % len(self.brokers)] + attempt += 1 + try: - self.connection_create() + self.connection_create(broker) self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) @@ -333,10 +348,9 @@ class Connection(object): time.sleep(delay) delay = min(2 * delay, 60) else: + LOG.info(_('Connected to AMQP server on %s'), broker) break - LOG.info(_('Connected to AMQP server on %s'), self.broker) - self.session = self.connection.session() if self.consumers: @@ -362,12 +376,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.close() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.session.close() self.session = self.connection.session() self.consumers = {} @@ -422,6 +438,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg): """Send to a publisher based on the publisher class""" @@ -497,6 +518,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -512,6 +534,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) @@ -570,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index 02a44d21ba..efee8461cb 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -205,7 +205,9 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data): + def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + if serialize: + data = rpc_common.serialize_msg(data, force_envelope) self.outq.send([str(msg_id), str(topic), str('cast'), _serialize(data)]) @@ -250,7 +252,7 @@ class InternalContext(object): """Process a curried message and cast the result to topic.""" LOG.debug(_("Running func with context: %s"), ctx.to_dict()) data.setdefault('version', None) - data.setdefault('args', []) + data.setdefault('args', {}) try: result = proxy.dispatch( @@ -259,7 +261,14 @@ class InternalContext(object): except greenlet.GreenletExit: # ignore these since they are just from shutdowns pass + except rpc_common.ClientException, e: + LOG.debug(_("Expected exception during message handling (%s)") % + e._exc_info[1]) + return {'exc': + rpc_common.serialize_remote_exception(e._exc_info, + log_failure=False)} except Exception: + LOG.error(_("Exception during message handling")) return {'exc': rpc_common.serialize_remote_exception(sys.exc_info())} @@ -314,7 +323,7 @@ class ConsumerBase(object): return data.setdefault('version', None) - data.setdefault('args', []) + data.setdefault('args', {}) proxy.dispatch(ctx, data['version'], data['method'], **data['args']) @@ -426,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = _deserialize(in_msg) + inside = rpc_common.deserialize_msg(_deserialize(in_msg)) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) @@ -473,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = _deserialize(in_msg) + ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -524,7 +533,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, + force_envelope=False): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -533,7 +543,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload) + conn.cast(msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -602,7 +612,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True, + force_envelope=False): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -628,7 +639,8 @@ def _multi_send(method, context, topic, msg, timeout=None): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout) + _topic, _topic, msg, timeout, serialize, + force_envelope) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -669,6 +681,8 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') + kwargs['serialize'] = kwargs.pop('envelope') + kwargs['force_envelope'] = True cast(conf, context, topic, msg, **kwargs) diff --git a/quantum/openstack/common/rpc/matchmaker.py b/quantum/openstack/common/rpc/matchmaker.py index ecb54eb8ce..3182d37ed5 100644 --- a/quantum/openstack/common/rpc/matchmaker.py +++ b/quantum/openstack/common/rpc/matchmaker.py @@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance. import contextlib import itertools import json -import logging from quantum.openstack.common import cfg from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import log as logging matchmaker_opts = [ diff --git a/quantum/openstack/common/rpc/service.py b/quantum/openstack/common/rpc/service.py index 30ffaaec4d..a239d8d2ba 100644 --- a/quantum/openstack/common/rpc/service.py +++ b/quantum/openstack/common/rpc/service.py @@ -57,6 +57,11 @@ class Service(service.Service): self.conn.create_consumer(self.topic, dispatcher, fanout=True) + # Hook to allow the manager to do other initializations after + # the rpc connection is created. + if callable(getattr(self.manager, 'initialize_service_hook', None)): + self.manager.initialize_service_hook(self) + # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/quantum/openstack/common/service.py b/quantum/openstack/common/service.py index d967e7095b..9e34d9ff12 100644 --- a/quantum/openstack/common/service.py +++ b/quantum/openstack/common/service.py @@ -27,7 +27,7 @@ import sys import time import eventlet -import greenlet +import extras import logging as std_logging from quantum.openstack.common import cfg @@ -36,11 +36,8 @@ from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import log as logging from quantum.openstack.common import threadgroup -try: - from quantum.openstack.common import rpc -except ImportError: - rpc = None +rpc = extras.try_import('quantum.openstack.common.rpc') CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -54,7 +51,7 @@ class Launcher(object): :returns: None """ - self._services = [] + self._services = threadgroup.ThreadGroup('launcher') eventlet_backdoor.initialize_if_enabled() @staticmethod @@ -75,8 +72,7 @@ class Launcher(object): :returns: None """ - gt = eventlet.spawn(self.run_service, service) - self._services.append(gt) + self._services.add_thread(self.run_service, service) def stop(self): """Stop all services which are currently running. @@ -84,8 +80,7 @@ class Launcher(object): :returns: None """ - for service in self._services: - service.kill() + self._services.stop() def wait(self): """Waits until all services have been stopped, and then returns. @@ -93,11 +88,7 @@ class Launcher(object): :returns: None """ - for service in self._services: - try: - service.wait() - except greenlet.GreenletExit: - pass + self._services.wait() class SignalExit(SystemExit): @@ -132,9 +123,9 @@ class ServiceLauncher(Launcher): except SystemExit as exc: status = exc.code finally: - self.stop() if rpc: rpc.cleanup() + self.stop() return status @@ -252,7 +243,10 @@ class ProcessLauncher(object): def _wait_child(self): try: - pid, status = os.wait() + # Don't block if no child processes have exited + pid, status = os.waitpid(0, os.WNOHANG) + if not pid: + return None except OSError as exc: if exc.errno not in (errno.EINTR, errno.ECHILD): raise @@ -260,10 +254,12 @@ class ProcessLauncher(object): if os.WIFSIGNALED(status): sig = os.WTERMSIG(status) - LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) + LOG.info(_('Child %(pid)d killed by signal %(sig)d'), + dict(pid=pid, sig=sig)) else: code = os.WEXITSTATUS(status) - LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) + LOG.info(_('Child %(pid)s exited with status %(code)d'), + dict(pid=pid, code=code)) if pid not in self.children: LOG.warning(_('pid %d not in child list'), pid) @@ -282,6 +278,10 @@ class ProcessLauncher(object): while self.running: wrap = self._wait_child() if not wrap: + # Yield to other threads if no children have exited + # Sleep for a short time to avoid excessive CPU usage + # (see bug #1095346) + eventlet.greenthread.sleep(.01) continue while self.running and len(wrap.children) < wrap.workers: @@ -309,8 +309,8 @@ class ProcessLauncher(object): class Service(object): """Service object for binaries running on hosts.""" - def __init__(self): - self.tg = threadgroup.ThreadGroup('service') + def __init__(self, threads=1000): + self.tg = threadgroup.ThreadGroup('service', threads) def start(self): pass diff --git a/quantum/openstack/common/setup.py b/quantum/openstack/common/setup.py index 83eef07a7b..ec37a7f83a 100644 --- a/quantum/openstack/common/setup.py +++ b/quantum/openstack/common/setup.py @@ -117,8 +117,12 @@ def write_requirements(): def _run_shell_command(cmd): - output = subprocess.Popen(["/bin/sh", "-c", cmd], - stdout=subprocess.PIPE) + if os.name == 'nt': + output = subprocess.Popen(["cmd.exe", "/C", cmd], + stdout=subprocess.PIPE) + else: + output = subprocess.Popen(["/bin/sh", "-c", cmd], + stdout=subprocess.PIPE) out = output.communicate() if len(out) == 0: return None @@ -272,6 +276,9 @@ def get_cmdclass(): from sphinx.setup_command import BuildDoc class LocalBuildDoc(BuildDoc): + + builders = ['html', 'man'] + def generate_autoindex(self): print "**Autodocumenting from %s" % os.path.abspath(os.curdir) modules = {} @@ -307,14 +314,19 @@ def get_cmdclass(): if not os.getenv('SPHINX_DEBUG'): self.generate_autoindex() - for builder in ['html', 'man']: + for builder in self.builders: self.builder = builder self.finalize_options() self.project = self.distribution.get_name() self.version = self.distribution.get_version() self.release = self.distribution.get_version() BuildDoc.run(self) + + class LocalBuildLatex(LocalBuildDoc): + builders = ['latex'] + cmdclass['build_sphinx'] = LocalBuildDoc + cmdclass['build_sphinx_latex'] = LocalBuildLatex except ImportError: pass diff --git a/quantum/openstack/common/threadgroup.py b/quantum/openstack/common/threadgroup.py index 0aea30ff62..d1e12715e8 100644 --- a/quantum/openstack/common/threadgroup.py +++ b/quantum/openstack/common/threadgroup.py @@ -18,7 +18,6 @@ from eventlet import greenlet from eventlet import greenpool from eventlet import greenthread -from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import log as logging from quantum.openstack.common import loopingcall @@ -27,19 +26,17 @@ LOG = logging.getLogger(__name__) def _thread_done(gt, *args, **kwargs): - ''' - Callback function to be passed to GreenThread.link() when we spawn() - Calls the ThreadGroup to notify if. - ''' + """ Callback function to be passed to GreenThread.link() when we spawn() + Calls the :class:`ThreadGroup` to notify if. + + """ kwargs['group'].thread_done(kwargs['thread']) class Thread(object): - """ - Wrapper around a greenthread, that holds a reference to - the ThreadGroup. The Thread will notify the ThreadGroup - when it has done so it can be removed from the threads - list. + """ Wrapper around a greenthread, that holds a reference to the + :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when + it has done so it can be removed from the threads list. """ def __init__(self, name, thread, group): self.name = name @@ -54,11 +51,11 @@ class Thread(object): class ThreadGroup(object): - """ - The point of this class is to: - - keep track of timers and greenthreads (making it easier to stop them + """ The point of the ThreadGroup classis to: + + * keep track of timers and greenthreads (making it easier to stop them when need be). - - provide an easy API to add timers. + * provide an easy API to add timers. """ def __init__(self, name, thread_pool_size=10): self.name = name diff --git a/quantum/openstack/common/timeutils.py b/quantum/openstack/common/timeutils.py index 86004391de..0f346087f7 100644 --- a/quantum/openstack/common/timeutils.py +++ b/quantum/openstack/common/timeutils.py @@ -71,11 +71,15 @@ def normalize_time(timestamp): def is_older_than(before, seconds): """Return True if before is older than seconds.""" + if isinstance(before, basestring): + before = parse_strtime(before).replace(tzinfo=None) return utcnow() - before > datetime.timedelta(seconds=seconds) def is_newer_than(after, seconds): """Return True if after is newer than seconds.""" + if isinstance(after, basestring): + after = parse_strtime(after).replace(tzinfo=None) return after - utcnow() > datetime.timedelta(seconds=seconds) @@ -87,7 +91,10 @@ def utcnow_ts(): def utcnow(): """Overridable version of utils.utcnow.""" if utcnow.override_time: - return utcnow.override_time + try: + return utcnow.override_time.pop(0) + except AttributeError: + return utcnow.override_time return datetime.datetime.utcnow() @@ -95,14 +102,21 @@ utcnow.override_time = None def set_time_override(override_time=datetime.datetime.utcnow()): - """Override utils.utcnow to return a constant time.""" + """ + Override utils.utcnow to return a constant time or a list thereof, + one at a time. + """ utcnow.override_time = override_time def advance_time_delta(timedelta): """Advance overridden time using a datetime.timedelta.""" assert(not utcnow.override_time is None) - utcnow.override_time += timedelta + try: + for dt in utcnow.override_time: + dt += timedelta + except TypeError: + utcnow.override_time += timedelta def advance_time_seconds(seconds): @@ -135,3 +149,16 @@ def unmarshall_time(tyme): minute=tyme['minute'], second=tyme['second'], microsecond=tyme['microsecond']) + + +def delta_seconds(before, after): + """ + Compute the difference in seconds between two date, time, or + datetime objects (as a float, to microsecond resolution). + """ + delta = after - before + try: + return delta.total_seconds() + except AttributeError: + return ((delta.days * 24 * 3600) + delta.seconds + + float(delta.microseconds) / (10 ** 6)) diff --git a/quantum/openstack/common/version.py b/quantum/openstack/common/version.py index a19e422652..c04a695ff4 100644 --- a/quantum/openstack/common/version.py +++ b/quantum/openstack/common/version.py @@ -24,19 +24,6 @@ import pkg_resources import setup -class _deferred_version_string(object): - """Internal helper class which provides delayed version calculation.""" - def __init__(self, version_info, prefix): - self.version_info = version_info - self.prefix = prefix - - def __str__(self): - return "%s%s" % (self.prefix, self.version_info.version_string()) - - def __repr__(self): - return "%s%s" % (self.prefix, self.version_info.version_string()) - - class VersionInfo(object): def __init__(self, package, python_package=None, pre_version=None): @@ -57,14 +44,15 @@ class VersionInfo(object): self.python_package = python_package self.pre_version = pre_version self.version = None + self._cached_version = None def _generate_version(self): """Defer to the openstack.common.setup routines for making a version from git.""" if self.pre_version is None: - return setup.get_post_version(self.python_package) + return setup.get_post_version(self.package) else: - return setup.get_pre_version(self.python_package, self.pre_version) + return setup.get_pre_version(self.package, self.pre_version) def _newer_version(self, pending_version): """Check to see if we're working with a stale version or not. @@ -138,11 +126,14 @@ class VersionInfo(object): else: return '%s-dev' % (version_parts[0],) - def deferred_version_string(self, prefix=""): + def cached_version_string(self, prefix=""): """Generate an object which will expand in a string context to the results of version_string(). We do this so that don't call into pkg_resources every time we start up a program when passing version information into the CONF constructor, but rather only do the calculation when and if a version is requested """ - return _deferred_version_string(self, prefix) + if not self._cached_version: + self._cached_version = "%s%s" % (prefix, + self.version_string()) + return self._cached_version diff --git a/tools/pip-requires b/tools/pip-requires index ff31a51410..331b5beb9b 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -5,6 +5,7 @@ amqplib==0.6.1 anyjson>=0.2.4 argparse eventlet>=0.9.17 +extras greenlet>=0.3.1 httplib2 iso8601>=0.1.4