Latest OSLO updates

Change-Id: Ibf223203c8b34f614357fa4539d0dfa953765d6b
This commit is contained in:
Gary Kotton 2013-01-07 13:52:21 +00:00
parent ced155e44c
commit cd92afb713
29 changed files with 512 additions and 187 deletions

View File

@ -27,6 +27,7 @@ from quantum.api.v2 import attributes
from quantum.common import utils from quantum.common import utils
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.version import version_info as quantum_version from quantum.version import version_info as quantum_version
@ -49,9 +50,6 @@ core_opts = [
cfg.IntOpt('max_subnet_host_routes', default=20), cfg.IntOpt('max_subnet_host_routes', default=20),
cfg.IntOpt('dhcp_lease_duration', default=120), cfg.IntOpt('dhcp_lease_duration', default=120),
cfg.BoolOpt('allow_overlapping_ips', default=False), cfg.BoolOpt('allow_overlapping_ips', default=False),
cfg.StrOpt('control_exchange',
default='quantum',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('host', default=utils.get_hostname()), cfg.StrOpt('host', default=utils.get_hostname()),
cfg.BoolOpt('force_gateway_on_subnet', default=False, cfg.BoolOpt('force_gateway_on_subnet', default=False,
help=_("Ensure that configured gateway is on subnet")), help=_("Ensure that configured gateway is on subnet")),
@ -67,6 +65,7 @@ cfg.CONF.register_cli_opts(core_cli_opts)
def parse(args): def parse(args):
rpc.set_defaults(control_exchange='quantum')
cfg.CONF(args=args, project='quantum', cfg.CONF(args=args, project='quantum',
version='%%prog %s' % quantum_version.version_string_with_vcs()) version='%%prog %s' % quantum_version.version_string_with_vcs())

View File

@ -46,7 +46,7 @@ def _find_objects(t):
def _print_greenthreads(): def _print_greenthreads():
for i, gt in enumerate(find_objects(greenlet.greenlet)): for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt print i, gt
traceback.print_stack(gt.gr_frame) traceback.print_stack(gt.gr_frame)
print print
@ -61,7 +61,7 @@ def initialize_if_enabled():
} }
if CONF.backdoor_port is None: if CONF.backdoor_port is None:
return return None
# NOTE(johannes): The standard sys.displayhook will print the value of # NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites # the last expression and set it to __builtin__._, which overwrites
@ -73,6 +73,8 @@ def initialize_if_enabled():
pprint.pprint(val) pprint.pprint(val)
sys.displayhook = displayhook sys.displayhook = displayhook
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock = eventlet.listen(('localhost', CONF.backdoor_port))
eventlet.listen(('localhost', CONF.backdoor_port)), port = sock.getsockname()[1]
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
locals=backdoor_locals) locals=backdoor_locals)
return port

View File

@ -21,6 +21,8 @@ Exceptions common to OpenStack projects
import logging import logging
from quantum.openstack.common.gettextutils import _
class Error(Exception): class Error(Exception):
def __init__(self, message=None): def __init__(self, message=None):
@ -97,7 +99,7 @@ def wrap_exception(f):
except Exception, e: except Exception, e:
if not isinstance(e, Error): if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info() #exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception('Uncaught exception') logging.exception(_('Uncaught exception'))
#logging.error(traceback.extract_stack(exc_traceback)) #logging.error(traceback.extract_stack(exc_traceback))
raise Error(str(e)) raise Error(str(e))
raise raise

View File

@ -24,6 +24,8 @@ import logging
import sys import sys
import traceback import traceback
from quantum.openstack.common.gettextutils import _
@contextlib.contextmanager @contextlib.contextmanager
def save_and_reraise_exception(): def save_and_reraise_exception():
@ -43,7 +45,7 @@ def save_and_reraise_exception():
try: try:
yield yield
except Exception: except Exception:
logging.error('Original exception being dropped: %s' % logging.error(_('Original exception being dropped: %s'),
(traceback.format_exception(type_, value, tb))) traceback.format_exception(type_, value, tb))
raise raise
raise type_, value, tb raise type_, value, tb

View File

@ -29,7 +29,7 @@ def import_class(import_str):
try: try:
__import__(mod_str) __import__(mod_str)
return getattr(sys.modules[mod_str], class_str) return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError), exc: except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' % raise ImportError('Class %s cannot be found (%s)' %
(class_str, (class_str,
traceback.format_exception(*sys.exc_info()))) traceback.format_exception(*sys.exc_info())))

View File

@ -120,7 +120,7 @@ def to_primitive(value, convert_instances=False, level=0):
level=level + 1) level=level + 1)
else: else:
return value return value
except TypeError, e: except TypeError:
# Class objects are tricky since they may define something like # Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list(). # __iter__ defined but it isn't callable as list().
return unicode(value) return unicode(value)

View File

@ -28,6 +28,7 @@ from eventlet import semaphore
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common import fileutils from quantum.openstack.common import fileutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging

View File

@ -49,19 +49,20 @@ from quantum.openstack.common import notifier
log_opts = [ log_opts = [
cfg.StrOpt('logging_context_format_string', cfg.StrOpt('logging_context_format_string',
default='%(asctime)s %(levelname)s %(name)s [%(request_id)s ' default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
'%(user_id)s %(project_id)s] %(instance)s' '[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s', '%(message)s',
help='format string to use for log messages with context'), help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string', cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(process)d %(levelname)s %(name)s [-]' default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
' %(instance)s%(message)s', '%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'), help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix', cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d', default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'), help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix', cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s %(process)d TRACE %(name)s %(instance)s', default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'), help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels', cfg.ListOpt('default_log_levels',
default=[ default=[
@ -174,7 +175,7 @@ class ContextAdapter(logging.LoggerAdapter):
self.log(logging.AUDIT, msg, *args, **kwargs) self.log(logging.AUDIT, msg, *args, **kwargs)
def deprecated(self, msg, *args, **kwargs): def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated Config: %s") % msg stdmsg = _("Deprecated: %s") % msg
if CONF.fatal_deprecations: if CONF.fatal_deprecations:
self.critical(stdmsg, *args, **kwargs) self.critical(stdmsg, *args, **kwargs)
raise DeprecatedConfig(msg=stdmsg) raise DeprecatedConfig(msg=stdmsg)
@ -289,6 +290,12 @@ def setup(product_name):
_setup_logging_from_conf(product_name) _setup_logging_from_conf(product_name)
def set_defaults(logging_context_format_string):
cfg.set_defaults(log_opts,
logging_context_format_string=
logging_context_format_string)
def _find_facility_from_conf(): def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler, facility = getattr(logging.handlers.SysLogHandler,

View File

@ -24,6 +24,7 @@ from eventlet import greenthread
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import timeutils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -62,10 +63,16 @@ class LoopingCall(object):
try: try:
while self._running: while self._running:
start = timeutils.utcnow()
self.f(*self.args, **self.kw) self.f(*self.args, **self.kw)
end = timeutils.utcnow()
if not self._running: if not self._running:
break break
greenthread.sleep(interval) delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_('task run outlasted interval by %s sec') %
-delay)
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e: except LoopingCallDone, e:
self.stop() self.stop()
done.send(e.retvalue) done.send(e.retvalue)

View File

@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload):
for driver in _get_drivers(): for driver in _get_drivers():
try: try:
driver.notify(context, msg) driver.notify(context, msg)
except Exception, e: except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to " LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. " "send to notification system. "
"Payload=%(payload)s") % locals()) "Payload=%(payload)s")
% dict(e=e, payload=payload))
_drivers = None _drivers = None
@ -166,7 +167,7 @@ def add_driver(notification_driver):
try: try:
driver = importutils.import_module(notification_driver) driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver _drivers[notification_driver] = driver
except ImportError as e: except ImportError:
LOG.exception(_("Failed to load notifier %s. " LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") % "These notifications will not be sent.") %
notification_driver) notification_driver)

View File

@ -41,6 +41,6 @@ def notify(context, message):
topic = '%s.%s' % (topic, priority) topic = '%s.%s' % (topic, priority)
try: try:
rpc.notify(context, topic, message) rpc.notify(context, topic, message)
except Exception, e: except Exception:
LOG.exception(_("Could not send notification to %(topic)s. " LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals()) "Payload=%(message)s"), locals())

View File

@ -0,0 +1,51 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''messaging based notification driver, with message envelopes'''
from quantum.openstack.common import cfg
from quantum.openstack.common import context as req_context
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'topics', default=['notifications', ],
help='AMQP topic(s) used for openstack notifications')
opt_group = cfg.OptGroup(name='rpc_notifier2',
title='Options for rpc_notifier2')
CONF = cfg.CONF
CONF.register_group(opt_group)
CONF.register_opt(notification_topic_opt, opt_group)
def notify(context, message):
"""Sends a notification via RPC"""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
CONF.default_notification_level)
priority = priority.lower()
for topic in CONF.rpc_notifier2.topics:
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message, envelope=True)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())

View File

@ -95,17 +95,21 @@ class PeriodicTasks(object):
ticks_to_skip = self._ticks_to_skip[task_name] ticks_to_skip = self._ticks_to_skip[task_name]
if ticks_to_skip > 0: if ticks_to_skip > 0:
LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s" LOG.debug(_("Skipping %(full_task_name)s, %(ticks_to_skip)s"
" ticks left until next run"), locals()) " ticks left until next run"),
dict(full_task_name=full_task_name,
ticks_to_skip=ticks_to_skip))
self._ticks_to_skip[task_name] -= 1 self._ticks_to_skip[task_name] -= 1
continue continue
self._ticks_to_skip[task_name] = task._ticks_between_runs self._ticks_to_skip[task_name] = task._ticks_between_runs
LOG.debug(_("Running periodic task %(full_task_name)s"), locals()) LOG.debug(_("Running periodic task %(full_task_name)s"),
dict(full_task_name=full_task_name))
try: try:
task(self, context) task(self, context)
except Exception as e: except Exception as e:
if raise_on_error: if raise_on_error:
raise raise
LOG.exception(_("Error during %(full_task_name)s: %(e)s"), LOG.exception(_("Error during %(full_task_name)s:"
locals()) " %(e)s"),
dict(e=e, full_task_name=full_task_name))

View File

@ -50,25 +50,26 @@ rpc_opts = [
default=['quantum.openstack.common.exception', default=['quantum.openstack.common.exception',
'nova.exception', 'nova.exception',
'cinder.exception', 'cinder.exception',
'exceptions',
], ],
help='Modules of exceptions that are permitted to be recreated' help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'), 'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit', cfg.BoolOpt('fake_rabbit',
default=False, default=False,
help='If passed, use a fake RabbitMQ provider'), help='If passed, use a fake RabbitMQ provider'),
# cfg.StrOpt('control_exchange',
# The following options are not registered here, but are expected to be default='openstack',
# present. The project using this library must register these options with help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
] ]
cfg.CONF.register_opts(rpc_opts) cfg.CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
cfg.set_defaults(rpc_opts,
control_exchange=control_exchange)
def create_connection(new=True): def create_connection(new=True):
"""Create a connection to the message bus used for rpc. """Create a connection to the message bus used for rpc.
@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def notify(context, topic, msg): def notify(context, topic, msg, envelope=False):
"""Send notification event. """Send notification event.
:param context: Information that identifies the user that has made this :param context: Information that identifies the user that has made this
request. request.
:param topic: The topic to send the notification to. :param topic: The topic to send the notification to.
:param msg: This is a dict of content of event. :param msg: This is a dict of content of event.
:param envelope: Set to True to enable message envelope for notifications.
:returns: None :returns: None
""" """
return _get_impl().notify(cfg.CONF, context, topic, msg) return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup(): def cleanup():

View File

@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code.
""" """
import inspect import inspect
import logging
import sys import sys
import uuid import uuid
@ -34,10 +33,10 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import semaphore from eventlet import semaphore
from quantum.openstack.common import cfg
from quantum.openstack.common import excutils from quantum.openstack.common import excutils
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local from quantum.openstack.common import local
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common
@ -55,7 +54,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while # TODO(comstud): Timeout connections not used in a while
def create(self): def create(self):
LOG.debug('Pool creating new connection') LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf) return self.connection_cls(self.conf)
def empty(self): def empty(self):
@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False): ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id. """Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple. Failure should be a sys.exc_info() tuple.
@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
""" """
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
if failure: if failure:
failure = rpc_common.serialize_remote_exception(failure) failure = rpc_common.serialize_remote_exception(failure,
log_failure)
try: try:
msg = {'result': reply, 'failure': failure} msg = {'result': reply, 'failure': failure}
@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure} 'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
conn.direct_send(msg_id, msg) conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext): class RpcContext(rpc_common.CommonRpcContext):
@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values) return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False, def reply(self, reply=None, failure=None, ending=False,
connection_pool=None): connection_pool=None, log_failure=True):
if self.msg_id: if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending) ending, log_failure)
if ending: if ending:
self.msg_id = None self.msg_id = None
@ -282,11 +282,21 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool) ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done. # This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool) ctxt.reply(ending=True, connection_pool=self.connection_pool)
except Exception as e: except rpc_common.ClientException as e:
LOG.exception('Exception during message handling') LOG.debug(_('Expected exception during message handling (%s)') %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(), ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool) connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class MulticallWaiter(object): class MulticallWaiter(object):
def __init__(self, conf, connection, timeout): def __init__(self, conf, connection, timeout):
@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
# that will continue to use the connection. When it's done, # that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into # connection.close() will get called which will put it back into
# the pool # the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic) LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id)) LOG.debug(_('MSG_ID is %s') % (msg_id))
@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool) conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout) wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg) conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg return wait_msg
@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic) LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool):
@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...')) LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg, def fanout_cast_to_server(conf, context, server_params, topic, msg,
@ -402,15 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool): def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
event_type = msg.get('event_type') LOG.debug(_('Sending %(event_type)s on %(topic)s'),
LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg) conn.notify_send(topic, msg)
@ -420,7 +433,4 @@ def cleanup(connection_pool):
def get_control_exchange(conf): def get_control_exchange(conf):
try: return conf.control_exchange
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'

View File

@ -18,18 +18,61 @@
# under the License. # under the License.
import copy import copy
import logging import sys
import traceback import traceback
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils from quantum.openstack.common import jsonutils
from quantum.openstack.common import local from quantum.openstack.common import local
from quantum.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
It does *not* apply to the message payload, which must be versioned
independently. For example, when using rpc APIs, a version number is applied
for changes to the API being exposed over rpc. This version number is handled
in the rpc proxy and dispatcher modules.
This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'quantum.version': <RPC Envelope Version as a String>,
'quantum.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
So, the current message envelope just includes the envelope version. It may
eventually contain additional information, such as a signature for the message
payload.
We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'quantum.version'
_MESSAGE_KEY = 'quantum.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
class RPCException(Exception): class RPCException(Exception):
message = _("An unknown RPC related exception occurred.") message = _("An unknown RPC related exception occurred.")
@ -40,7 +83,7 @@ class RPCException(Exception):
try: try:
message = self.message % kwargs message = self.message % kwargs
except Exception as e: except Exception:
# kwargs doesn't match a variable in the message # kwargs doesn't match a variable in the message
# log the issue and the kwargs # log the issue and the kwargs
LOG.exception(_('Exception in string format operation')) LOG.exception(_('Exception in string format operation'))
@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.") "this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class Connection(object): class Connection(object):
"""A connection, returned by rpc.create_connection(). """A connection, returned by rpc.create_connection().
@ -164,8 +212,12 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': ('new_pass',), SANITIZE = {'set_admin_password': [('args', 'new_pass')],
'run_instance': ('admin_password',), } 'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data has_context_token = '_context_auth_token' in msg_data
@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
msg_data = copy.deepcopy(msg_data) msg_data = copy.deepcopy(msg_data)
if has_method: if has_method:
method = msg_data['method'] for arg in SANITIZE.get(msg_data['method'], []):
if method in SANITIZE: try:
args_to_sanitize = SANITIZE[method] d = msg_data
for arg in args_to_sanitize: for elem in arg[:-1]:
try: d = d[elem]
msg_data['args'][arg] = "<SANITIZED>" d[arg[-1]] = '<SANITIZED>'
except KeyError: except KeyError, e:
pass LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token: if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>' msg_data['_context_auth_token'] = '<SANITIZED>'
@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
return log_func(msg, msg_data) return log_func(msg, msg_data)
def serialize_remote_exception(failure_info): def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc. """Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple. Failure_info should be a sys.exc_info() tuple.
@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
""" """
tb = traceback.format_exception(*failure_info) tb = traceback.format_exception(*failure_info)
failure = failure_info[1] failure = failure_info[1]
LOG.error(_("Returning exception %s to caller"), unicode(failure)) if log_failure:
LOG.error(tb) LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
kwargs = {} kwargs = {}
if hasattr(failure, 'kwargs'): if hasattr(failure, 'kwargs'):
@ -258,7 +313,7 @@ def deserialize_remote_exception(conf, data):
# we cannot necessarily change an exception message so we must override # we cannot necessarily change an exception message so we must override
# the __str__ method. # the __str__ method.
failure.__class__ = new_ex_type failure.__class__ = new_ex_type
except TypeError as e: except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the # NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument. # first exception argument.
failure.args = (message,) + failure.args[1:] failure.args = (message,) + failure.args[1:]
@ -309,3 +364,107 @@ class CommonRpcContext(object):
context.values['read_deleted'] = read_deleted context.values['read_deleted'] = read_deleted
return context return context
class ClientException(Exception):
"""This encapsulates some actual exception that is expected to be
hit by an RPC proxy object. Merely instantiating it records the
current exception information, which will be passed back to the
RPC client without exceptional logging."""
def __init__(self):
self._exc_info = sys.exc_info()
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception, e:
if type(e) in exceptions:
raise ClientException()
else:
raise
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
that this will cause listed exceptions to be wrapped in a
ClientException, which is used internally by the RPC layer."""
def outer(func):
def inner(*args, **kwargs):
return catch_client_exception(exceptions, func, *args, **kwargs)
return inner
return outer
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
_MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
#
# Robustness Principle:
# "Be strict in what you send, liberal in what you accept."
#
# At this point we have to do a bit of guessing about what it
# is we just received. Here is the set of possibilities:
#
# 1) We received a dict. This could be 2 things:
#
# a) Inspect it to see if it looks like a standard message envelope.
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
# be a notification, or a message from before we added a message
# envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
if not isinstance(msg, dict):
# See #2 above.
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
return msg
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg

View File

@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code there can be both versioned and unversioned APIs implemented in the same code
base. base.
EXAMPLES
EXAMPLES: ========
Nova was the first project to use versioned rpc APIs. Consider the compute rpc Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server API as an example. The client side is in nova/compute/rpcapi.py and the server
@ -50,12 +50,13 @@ side is in nova/compute/manager.py.
Example 1) Adding a new method. Example 1) Adding a new method.
-------------------------------
Adding a new method is a backwards compatible change. It should be added to Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must have a specific version specified to indicate the minimum API version that must
be implemented for the method to be supported. For example: be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host): def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None) topic = _compute_topic(self.topic, ctxt, host, None)
@ -67,10 +68,11 @@ get_host_uptime() method.
Example 2) Adding a new parameter. Example 2) Adding a new parameter.
----------------------------------
Adding a new parameter to an rpc method can be made backwards compatible. The Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
The implementation of the method must not expect the parameter to be present. The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None): def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases # The code needs to deal with newarg=None for cases
@ -101,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks self.callbacks = callbacks
super(RpcDispatcher, self).__init__() super(RpcDispatcher, self).__init__()
@staticmethod
def _is_compatible(mversion, version):
"""Determine whether versions are compatible.
:param mversion: The API version implemented by a callback.
:param version: The API version requested by an incoming message.
"""
version_parts = version.split('.')
mversion_parts = mversion.split('.')
if int(version_parts[0]) != int(mversion_parts[0]): # Major
return False
if int(version_parts[1]) > int(mversion_parts[1]): # Minor
return False
return True
def dispatch(self, ctxt, version, method, **kwargs): def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version. """Dispatch a message based on a requested version.
@ -137,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION rpc_api_version = proxyobj.RPC_API_VERSION
else: else:
rpc_api_version = '1.0' rpc_api_version = '1.0'
is_compatible = self._is_compatible(rpc_api_version, version) is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method): if not hasattr(proxyobj, method):
continue continue

View File

@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests.
""" """
import inspect import inspect
# NOTE(russellb): We specifically want to use json, not our own jsonutils.
# jsonutils has some extra logic to automatically convert objects to primitive
# types so that they can be serialized. We want to catch all cases where
# non-primitive types make it into this code and treat it as an error.
import json
import time import time
import eventlet import eventlet
from quantum.openstack.common import jsonutils
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common
CONSUMERS = {} CONSUMERS = {}
@ -75,6 +79,8 @@ class Consumer(object):
else: else:
res.append(rval) res.append(rval)
done.send(res) done.send(res)
except rpc_common.ClientException as e:
done.send_exception(e._exc_info[1])
except Exception as e: except Exception as e:
done.send_exception(e) done.send_exception(e)
@ -121,7 +127,7 @@ def create_connection(conf, new=True):
def check_serialize(msg): def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized.""" """Make sure a message intended for rpc can be serialized."""
jsonutils.dumps(msg) json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None): def multicall(conf, context, topic, msg, timeout=None):
@ -154,6 +160,7 @@ def call(conf, context, topic, msg, timeout=None):
def cast(conf, context, topic, msg): def cast(conf, context, topic, msg):
check_serialize(msg)
try: try:
call(conf, context, topic, msg) call(conf, context, topic, msg)
except Exception: except Exception:

View File

@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message): def _callback(raw_message):
message = self.channel.message_to_python(raw_message) message = self.channel.message_to_python(raw_message)
try: try:
callback(message.payload) msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack() message.ack()
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
# Default options # Default options
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id, exchange = kombu.entity.Exchange(name=msg_id,
type='direct', type='direct',
@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
options = {'durable': False, options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf), 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'], durable=options['durable'],
@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id, super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options) type='direct', **options)
@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
""" """
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options) None, type='fanout', **options)
@ -387,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None): def __init__(self, conf, server_params=None):
self.consumers = [] self.consumers = []
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
self.max_retries = self.conf.rabbit_max_retries self.max_retries = self.conf.rabbit_max_retries
# Try forever? # Try forever?
@ -469,7 +471,7 @@ class Connection(object):
LOG.info(_("Reconnecting to AMQP server on " LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params) "%(hostname)s:%(port)d") % params)
try: try:
self.connection.close() self.connection.release()
except self.connection_errors: except self.connection_errors:
pass pass
# Setting this in case the next statement fails, though # Setting this in case the next statement fails, though
@ -573,12 +575,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release() self.connection.release()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close() self.channel.close()
self.channel = self.connection.channel() self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3 # work around 'memory' transport bug in 1.1.3
@ -644,6 +648,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs): def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@ -719,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
self.declare_fanout_consumer(topic, proxy_cb) self.declare_fanout_consumer(topic, proxy_cb)
@ -730,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name) self.declare_topic_consumer(topic, proxy_cb, pool_name)
@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify( return rpc_amqp.notify(
conf, context, topic, msg, conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@ -17,7 +17,6 @@
import functools import functools
import itertools import itertools
import logging
import time import time
import uuid import uuid
@ -29,6 +28,7 @@ import qpid.messaging.exceptions
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import jsonutils from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import amqp as rpc_amqp from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common
@ -41,6 +41,9 @@ qpid_opts = [
cfg.StrOpt('qpid_port', cfg.StrOpt('qpid_port',
default='5672', default='5672',
help='Qpid broker port'), help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username', cfg.StrOpt('qpid_username',
default='', default='',
help='Username for qpid connection'), help='Username for qpid connection'),
@ -121,7 +124,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object""" """Fetch the message and pass it to the callback object"""
message = self.receiver.fetch() message = self.receiver.fetch()
try: try:
self.callback(message.content) msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally: finally:
@ -274,25 +278,32 @@ class Connection(object):
self.session = None self.session = None
self.consumers = {} self.consumers = {}
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
if server_params and 'hostname' in server_params:
# NOTE(russellb) This enables support for cast_to_server.
server_params['qpid_hosts'] = [
'%s:%d' % (server_params['hostname'],
server_params.get('port', 5672))
]
params = { params = {
'hostname': self.conf.qpid_hostname, 'qpid_hosts': self.conf.qpid_hosts,
'port': self.conf.qpid_port,
'username': self.conf.qpid_username, 'username': self.conf.qpid_username,
'password': self.conf.qpid_password, 'password': self.conf.qpid_password,
} }
params.update(server_params or {}) params.update(server_params or {})
self.broker = params['hostname'] + ":" + str(params['port']) self.brokers = params['qpid_hosts']
self.username = params['username'] self.username = params['username']
self.password = params['password'] self.password = params['password']
self.connection_create() self.connection_create(self.brokers[0])
self.reconnect() self.reconnect()
def connection_create(self): def connection_create(self, broker):
# Create the connection - this does not open the connection # Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker) self.connection = qpid.messaging.Connection(broker)
# Check if flags are set and if so set them for the connection # Check if flags are set and if so set them for the connection
# before we call open # before we call open
@ -320,10 +331,14 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError: except qpid.messaging.exceptions.ConnectionError:
pass pass
attempt = 0
delay = 1 delay = 1
while True: while True:
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
try: try:
self.connection_create() self.connection_create(broker)
self.connection.open() self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e: except qpid.messaging.exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay) msg_dict = dict(e=e, delay=delay)
@ -333,10 +348,9 @@ class Connection(object):
time.sleep(delay) time.sleep(delay)
delay = min(2 * delay, 60) delay = min(2 * delay, 60)
else: else:
LOG.info(_('Connected to AMQP server on %s'), broker)
break break
LOG.info(_('Connected to AMQP server on %s'), self.broker)
self.session = self.connection.session() self.session = self.connection.session()
if self.consumers: if self.consumers:
@ -362,12 +376,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close() self.connection.close()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close() self.session.close()
self.session = self.connection.session() self.session = self.connection.session()
self.consumers = {} self.consumers = {}
@ -422,6 +438,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg): def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@ -497,6 +518,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@ -512,6 +534,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name) name=pool_name)
@ -570,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg, return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@ -205,7 +205,9 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False): def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)]) _serialize(data)])
@ -250,7 +252,7 @@ class InternalContext(object):
"""Process a curried message and cast the result to topic.""" """Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict()) LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None) data.setdefault('version', None)
data.setdefault('args', []) data.setdefault('args', {})
try: try:
result = proxy.dispatch( result = proxy.dispatch(
@ -259,7 +261,14 @@ class InternalContext(object):
except greenlet.GreenletExit: except greenlet.GreenletExit:
# ignore these since they are just from shutdowns # ignore these since they are just from shutdowns
pass pass
except rpc_common.ClientException, e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception: except Exception:
LOG.error(_("Exception during message handling"))
return {'exc': return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())} rpc_common.serialize_remote_exception(sys.exc_info())}
@ -314,7 +323,7 @@ class ConsumerBase(object):
return return
data.setdefault('version', None) data.setdefault('version', None)
data.setdefault('args', []) data.setdefault('args', {})
proxy.dispatch(ctx, data['version'], proxy.dispatch(ctx, data['version'],
data['method'], **data['args']) data['method'], **data['args'])
@ -426,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB sock_type = zmq.PUB
elif topic.startswith('zmq_replies'): elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB sock_type = zmq.PUB
inside = _deserialize(in_msg) inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id'] msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response'] response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response) LOG.debug(_("->response->%s"), response)
@ -473,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg) ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx) ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock] proxy = self.proxies[sock]
@ -524,7 +533,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread() self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None): def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
@ -533,7 +543,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr) conn = ZmqClient(addr)
# assumes cast can't return an exception # assumes cast can't return an exception
conn.cast(msg_id, topic, payload) conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception") raise RPCException("Cast failed. ZMQ Socket Exception")
finally: finally:
@ -602,7 +612,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1] return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None): def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
""" """
Wraps the sending of messages, Wraps the sending of messages,
dispatches to the matchmaker and sends dispatches to the matchmaker and sends
@ -628,7 +639,8 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast': if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context, eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout) _topic, _topic, msg, timeout, serialize,
force_envelope)
return return
return method(_addr, context, _topic, _topic, msg, timeout) return method(_addr, context, _topic, _topic, msg, timeout)
@ -669,6 +681,8 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not # NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions. # work with our assumptions.
topic.replace('.', '-') topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs) cast(conf, context, topic, msg, **kwargs)

View File

@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
import contextlib import contextlib
import itertools import itertools
import json import json
import logging
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
matchmaker_opts = [ matchmaker_opts = [

View File

@ -57,6 +57,11 @@ class Service(service.Service):
self.conn.create_consumer(self.topic, dispatcher, fanout=True) self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in a thread # Consume from all consumers in a thread
self.conn.consume_in_thread() self.conn.consume_in_thread()

View File

@ -27,7 +27,7 @@ import sys
import time import time
import eventlet import eventlet
import greenlet import extras
import logging as std_logging import logging as std_logging
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
@ -36,11 +36,8 @@ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import threadgroup from quantum.openstack.common import threadgroup
try:
from quantum.openstack.common import rpc
except ImportError:
rpc = None
rpc = extras.try_import('quantum.openstack.common.rpc')
CONF = cfg.CONF CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -54,7 +51,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
self._services = [] self._services = threadgroup.ThreadGroup('launcher')
eventlet_backdoor.initialize_if_enabled() eventlet_backdoor.initialize_if_enabled()
@staticmethod @staticmethod
@ -75,8 +72,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
gt = eventlet.spawn(self.run_service, service) self._services.add_thread(self.run_service, service)
self._services.append(gt)
def stop(self): def stop(self):
"""Stop all services which are currently running. """Stop all services which are currently running.
@ -84,8 +80,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
for service in self._services: self._services.stop()
service.kill()
def wait(self): def wait(self):
"""Waits until all services have been stopped, and then returns. """Waits until all services have been stopped, and then returns.
@ -93,11 +88,7 @@ class Launcher(object):
:returns: None :returns: None
""" """
for service in self._services: self._services.wait()
try:
service.wait()
except greenlet.GreenletExit:
pass
class SignalExit(SystemExit): class SignalExit(SystemExit):
@ -132,9 +123,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc: except SystemExit as exc:
status = exc.code status = exc.code
finally: finally:
self.stop()
if rpc: if rpc:
rpc.cleanup() rpc.cleanup()
self.stop()
return status return status
@ -252,7 +243,10 @@ class ProcessLauncher(object):
def _wait_child(self): def _wait_child(self):
try: try:
pid, status = os.wait() # Don't block if no child processes have exited
pid, status = os.waitpid(0, os.WNOHANG)
if not pid:
return None
except OSError as exc: except OSError as exc:
if exc.errno not in (errno.EINTR, errno.ECHILD): if exc.errno not in (errno.EINTR, errno.ECHILD):
raise raise
@ -260,10 +254,12 @@ class ProcessLauncher(object):
if os.WIFSIGNALED(status): if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status) sig = os.WTERMSIG(status)
LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) LOG.info(_('Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
else: else:
code = os.WEXITSTATUS(status) code = os.WEXITSTATUS(status)
LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) LOG.info(_('Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children: if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid) LOG.warning(_('pid %d not in child list'), pid)
@ -282,6 +278,10 @@ class ProcessLauncher(object):
while self.running: while self.running:
wrap = self._wait_child() wrap = self._wait_child()
if not wrap: if not wrap:
# Yield to other threads if no children have exited
# Sleep for a short time to avoid excessive CPU usage
# (see bug #1095346)
eventlet.greenthread.sleep(.01)
continue continue
while self.running and len(wrap.children) < wrap.workers: while self.running and len(wrap.children) < wrap.workers:
@ -309,8 +309,8 @@ class ProcessLauncher(object):
class Service(object): class Service(object):
"""Service object for binaries running on hosts.""" """Service object for binaries running on hosts."""
def __init__(self): def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service') self.tg = threadgroup.ThreadGroup('service', threads)
def start(self): def start(self):
pass pass

View File

@ -117,8 +117,12 @@ def write_requirements():
def _run_shell_command(cmd): def _run_shell_command(cmd):
output = subprocess.Popen(["/bin/sh", "-c", cmd], if os.name == 'nt':
stdout=subprocess.PIPE) output = subprocess.Popen(["cmd.exe", "/C", cmd],
stdout=subprocess.PIPE)
else:
output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE)
out = output.communicate() out = output.communicate()
if len(out) == 0: if len(out) == 0:
return None return None
@ -272,6 +276,9 @@ def get_cmdclass():
from sphinx.setup_command import BuildDoc from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc): class LocalBuildDoc(BuildDoc):
builders = ['html', 'man']
def generate_autoindex(self): def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir) print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {} modules = {}
@ -307,14 +314,19 @@ def get_cmdclass():
if not os.getenv('SPHINX_DEBUG'): if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex() self.generate_autoindex()
for builder in ['html', 'man']: for builder in self.builders:
self.builder = builder self.builder = builder
self.finalize_options() self.finalize_options()
self.project = self.distribution.get_name() self.project = self.distribution.get_name()
self.version = self.distribution.get_version() self.version = self.distribution.get_version()
self.release = self.distribution.get_version() self.release = self.distribution.get_version()
BuildDoc.run(self) BuildDoc.run(self)
class LocalBuildLatex(LocalBuildDoc):
builders = ['latex']
cmdclass['build_sphinx'] = LocalBuildDoc cmdclass['build_sphinx'] = LocalBuildDoc
cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError: except ImportError:
pass pass

View File

@ -18,7 +18,6 @@ from eventlet import greenlet
from eventlet import greenpool from eventlet import greenpool
from eventlet import greenthread from eventlet import greenthread
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall from quantum.openstack.common import loopingcall
@ -27,19 +26,17 @@ LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs): def _thread_done(gt, *args, **kwargs):
''' """ Callback function to be passed to GreenThread.link() when we spawn()
Callback function to be passed to GreenThread.link() when we spawn() Calls the :class:`ThreadGroup` to notify if.
Calls the ThreadGroup to notify if.
''' """
kwargs['group'].thread_done(kwargs['thread']) kwargs['group'].thread_done(kwargs['thread'])
class Thread(object): class Thread(object):
""" """ Wrapper around a greenthread, that holds a reference to the
Wrapper around a greenthread, that holds a reference to :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
the ThreadGroup. The Thread will notify the ThreadGroup it has done so it can be removed from the threads list.
when it has done so it can be removed from the threads
list.
""" """
def __init__(self, name, thread, group): def __init__(self, name, thread, group):
self.name = name self.name = name
@ -54,11 +51,11 @@ class Thread(object):
class ThreadGroup(object): class ThreadGroup(object):
""" """ The point of the ThreadGroup classis to:
The point of this class is to:
- keep track of timers and greenthreads (making it easier to stop them * keep track of timers and greenthreads (making it easier to stop them
when need be). when need be).
- provide an easy API to add timers. * provide an easy API to add timers.
""" """
def __init__(self, name, thread_pool_size=10): def __init__(self, name, thread_pool_size=10):
self.name = name self.name = name

View File

@ -71,11 +71,15 @@ def normalize_time(timestamp):
def is_older_than(before, seconds): def is_older_than(before, seconds):
"""Return True if before is older than seconds.""" """Return True if before is older than seconds."""
if isinstance(before, basestring):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds) return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds): def is_newer_than(after, seconds):
"""Return True if after is newer than seconds.""" """Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds) return after - utcnow() > datetime.timedelta(seconds=seconds)
@ -87,7 +91,10 @@ def utcnow_ts():
def utcnow(): def utcnow():
"""Overridable version of utils.utcnow.""" """Overridable version of utils.utcnow."""
if utcnow.override_time: if utcnow.override_time:
return utcnow.override_time try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow() return datetime.datetime.utcnow()
@ -95,14 +102,21 @@ utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()): def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time.""" """
Override utils.utcnow to return a constant time or a list thereof,
one at a time.
"""
utcnow.override_time = override_time utcnow.override_time = override_time
def advance_time_delta(timedelta): def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta.""" """Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None) assert(not utcnow.override_time is None)
utcnow.override_time += timedelta try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds): def advance_time_seconds(seconds):
@ -135,3 +149,16 @@ def unmarshall_time(tyme):
minute=tyme['minute'], minute=tyme['minute'],
second=tyme['second'], second=tyme['second'],
microsecond=tyme['microsecond']) microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))

View File

@ -24,19 +24,6 @@ import pkg_resources
import setup import setup
class _deferred_version_string(object):
"""Internal helper class which provides delayed version calculation."""
def __init__(self, version_info, prefix):
self.version_info = version_info
self.prefix = prefix
def __str__(self):
return "%s%s" % (self.prefix, self.version_info.version_string())
def __repr__(self):
return "%s%s" % (self.prefix, self.version_info.version_string())
class VersionInfo(object): class VersionInfo(object):
def __init__(self, package, python_package=None, pre_version=None): def __init__(self, package, python_package=None, pre_version=None):
@ -57,14 +44,15 @@ class VersionInfo(object):
self.python_package = python_package self.python_package = python_package
self.pre_version = pre_version self.pre_version = pre_version
self.version = None self.version = None
self._cached_version = None
def _generate_version(self): def _generate_version(self):
"""Defer to the openstack.common.setup routines for making a """Defer to the openstack.common.setup routines for making a
version from git.""" version from git."""
if self.pre_version is None: if self.pre_version is None:
return setup.get_post_version(self.python_package) return setup.get_post_version(self.package)
else: else:
return setup.get_pre_version(self.python_package, self.pre_version) return setup.get_pre_version(self.package, self.pre_version)
def _newer_version(self, pending_version): def _newer_version(self, pending_version):
"""Check to see if we're working with a stale version or not. """Check to see if we're working with a stale version or not.
@ -138,11 +126,14 @@ class VersionInfo(object):
else: else:
return '%s-dev' % (version_parts[0],) return '%s-dev' % (version_parts[0],)
def deferred_version_string(self, prefix=""): def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to """Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested rather only do the calculation when and if a version is requested
""" """
return _deferred_version_string(self, prefix) if not self._cached_version:
self._cached_version = "%s%s" % (prefix,
self.version_string())
return self._cached_version

View File

@ -5,6 +5,7 @@ amqplib==0.6.1
anyjson>=0.2.4 anyjson>=0.2.4
argparse argparse
eventlet>=0.9.17 eventlet>=0.9.17
extras
greenlet>=0.3.1 greenlet>=0.3.1
httplib2 httplib2
iso8601>=0.1.4 iso8601>=0.1.4