diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index 16f21a4e58..7743b7205c 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -22,16 +22,18 @@ import uuid import eventlet import greenlet -import qpid.messaging -import qpid.messaging.exceptions from quantum.openstack.common import cfg from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import importutils from quantum.openstack.common import jsonutils from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import amqp as rpc_amqp from quantum.openstack.common.rpc import common as rpc_common +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + LOG = logging.getLogger(__name__) qpid_opts = [ @@ -275,6 +277,9 @@ class Connection(object): pool = None def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + self.session = None self.consumers = {} self.consumer_thread = None @@ -303,7 +308,7 @@ class Connection(object): def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(broker) + self.connection = qpid_messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -328,7 +333,7 @@ class Connection(object): if self.connection.opened(): try: self.connection.close() - except qpid.messaging.exceptions.ConnectionError: + except qpid_exceptions.ConnectionError: pass attempt = 0 @@ -340,7 +345,7 @@ class Connection(object): try: self.connection_create(broker) self.connection.open() - except qpid.messaging.exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict @@ -367,8 +372,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (qpid.messaging.exceptions.Empty, - qpid.messaging.exceptions.ConnectionError), e: + except (qpid_exceptions.Empty, + qpid_exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -408,7 +413,7 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - if isinstance(exc, qpid.messaging.exceptions.Empty): + if isinstance(exc, qpid_exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % str(exc)) raise rpc_common.Timeout() diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index eef873a437..afa0bc8e1c 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -23,7 +23,6 @@ import types import uuid import eventlet -from eventlet.green import zmq import greenlet from quantum.openstack.common import cfg @@ -33,6 +32,7 @@ from quantum.openstack.common import jsonutils from quantum.openstack.common import processutils as utils from quantum.openstack.common.rpc import common as rpc_common +zmq = importutils.try_import('eventlet.green.zmq') # for convenience, are not modified. pformat = pprint.pformat @@ -76,9 +76,9 @@ zmq_opts = [ ] -# These globals are defined in register_opts(conf), -# a mandatory initialization call -CONF = None +CONF = cfg.CONF +CONF.register_opts(zmq_opts) + ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object @@ -113,7 +113,7 @@ class ZmqSocket(object): """ def __init__(self, addr, zmq_type, bind=True, subscribe=None): - self.sock = ZMQ_CTX.socket(zmq_type) + self.sock = _get_ctxt().socket(zmq_type) self.addr = addr self.type = zmq_type self.subscriptions = [] @@ -187,11 +187,15 @@ class ZmqSocket(object): pass self.subscriptions = [] - # Linger -1 prevents lost/dropped messages try: - self.sock.close(linger=-1) + # Default is to linger + self.sock.close() except Exception: - pass + # While this is a bad thing to happen, + # it would be much worse if some of the code calling this + # were to fail. For now, lets log, and later evaluate + # if we can safely raise here. + LOG.error("ZeroMQ socket could not be closed.") self.sock = None def recv(self): @@ -208,7 +212,9 @@ class ZmqSocket(object): class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + def __init__(self, addr, socket_type=None, bind=False): + if socket_type is None: + socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): @@ -685,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, conf = CONF LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) - queues = matchmaker.queues(topic) + queues = _get_matchmaker().queues(topic) LOG.debug(_("Sending message(s) to: %s"), queues) # Don't stack if we have no matchmaker results @@ -753,32 +759,29 @@ def notify(conf, context, topic, msg, **kwargs): def cleanup(): """Clean up resources in use by implementation.""" global ZMQ_CTX - global matchmaker - matchmaker = None - ZMQ_CTX.term() + if ZMQ_CTX: + ZMQ_CTX.term() ZMQ_CTX = None - -def register_opts(conf): - """Registration of options for this driver.""" - #NOTE(ewindisch): ZMQ_CTX and matchmaker - # are initialized here as this is as good - # an initialization method as any. - - # We memoize through these globals - global ZMQ_CTX global matchmaker - global CONF + matchmaker = None - if not CONF: - conf.register_opts(zmq_opts) - CONF = conf - # Don't re-set, if this method is called twice. + +def _get_ctxt(): + if not zmq: + raise ImportError("Failed to import eventlet.green.zmq") + + global ZMQ_CTX if not ZMQ_CTX: - ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) + ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) + return ZMQ_CTX + + +def _get_matchmaker(): + global matchmaker if not matchmaker: # rpc_zmq_matchmaker should be set to a 'module.Class' - mm_path = conf.rpc_zmq_matchmaker.split('.') + mm_path = CONF.rpc_zmq_matchmaker.split('.') mm_module = '.'.join(mm_path[:-1]) mm_class = mm_path[-1] @@ -791,6 +794,4 @@ def register_opts(conf): mm_impl = importutils.import_module(mm_module) mm_constructor = getattr(mm_impl, mm_class) matchmaker = mm_constructor() - - -register_opts(cfg.CONF) + return matchmaker