Update rpc and notifier libs from openstack.common

Bring in the latest versions of the rpc and notifications
libraries to fix broken imports in the current version
of master.

Change-Id: I6f545df4622eabdf2f7bf4e9cf155db20bd2c4c1
Signed-off-by: Doug Hellmann <doug.hellmann@dreamhost.com>
This commit is contained in:
Doug Hellmann 2012-10-02 12:34:30 -04:00
parent f723e1a359
commit 3399a0c821
6 changed files with 108 additions and 23 deletions

View File

@ -139,8 +139,8 @@ def notify(context, publisher_id, event_type, priority, payload):
driver.notify(context, msg)
except Exception, e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. Payload=%(payload)s") %
locals())
"send to notification system. "
"Payload=%(payload)s") % locals())
_drivers = None
@ -169,7 +169,7 @@ def add_driver(notification_driver):
except ImportError as e:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %
notification_driver)
notification_driver)
else:
# Driver is already loaded; just add the object.
_drivers[notification_driver] = notification_driver

View File

@ -49,15 +49,21 @@ rpc_opts = [
cfg.ListOpt('allowed_rpc_exception_modules',
default=['quantum.openstack.common.exception',
'nova.exception',
'cinder.exception',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange',
default='nova',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
#
# The following options are not registered here, but are expected to be
# present. The project using this library must register these options with
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)

View File

@ -34,6 +34,7 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
from quantum.openstack.common import cfg
from quantum.openstack.common import excutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local
@ -416,3 +417,10 @@ def notify(conf, context, topic, msg, connection_pool):
def cleanup(connection_pool):
if connection_pool:
connection_pool.empty()
def get_control_exchange(conf):
try:
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'

View File

@ -29,11 +29,11 @@ import kombu.connection
import kombu.entity
import kombu.messaging
from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from openstack.common import network_utils
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common
from quantum.openstack.common import network_utils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@ -790,4 +790,3 @@ def notify(conf, context, topic, msg):
def cleanup():
return rpc_amqp.cleanup(Connection.pool)

View File

@ -170,7 +170,8 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, session, topic, callback, name=None):
def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
@ -180,9 +181,9 @@ class TopicConsumer(ConsumerBase):
:param name: optional queue name, defaults to topic
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (conf.control_exchange,
topic),
"%s/%s" % (exchange_name, topic),
{}, name or topic, {})
@ -256,9 +257,9 @@ class TopicPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
super(TopicPublisher, self).__init__(
session,
"%s/%s" % (conf.control_exchange, topic))
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic))
class FanoutPublisher(Publisher):
@ -276,10 +277,10 @@ class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
super(NotifyPublisher, self).__init__(
session,
"%s/%s" % (conf.control_exchange, topic),
{"durable": True})
exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic),
{"durable": True})
class Connection(object):
@ -464,10 +465,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
),
topic, callback)

View File

@ -0,0 +1,69 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2011 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common import service
LOG = logging.getLogger(__name__)
class Service(service.Service):
"""Service object for binaries running on hosts.
A service enables rpc by listening to queues based on topic and host."""
def __init__(self, host, topic, manager=None):
super(Service, self).__init__()
self.host = host
self.topic = topic
if manager is None:
self.manager = self
else:
self.manager = manager
def start(self):
super(Service, self).start()
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.conn.close()
except Exception:
pass
super(Service, self).stop()