Update Oslo rpc module

Updates rpc/impl_qpid and rpc/impl_zmq

Change-Id: Ie1b2a9d9dc3228528c501ef92d3db2133c3fe127
This commit is contained in:
Zhongyue Luo 2013-01-17 13:00:46 +08:00
parent eef903f64b
commit dae07713fe
2 changed files with 46 additions and 40 deletions

View File

@ -22,16 +22,18 @@ import uuid
import eventlet import eventlet
import greenlet import greenlet
import qpid.messaging
import qpid.messaging.exceptions
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _ from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import amqp as rpc_amqp from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
qpid_opts = [ qpid_opts = [
@ -275,6 +277,9 @@ class Connection(object):
pool = None pool = None
def __init__(self, conf, server_params=None): def __init__(self, conf, server_params=None):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
self.session = None self.session = None
self.consumers = {} self.consumers = {}
self.consumer_thread = None self.consumer_thread = None
@ -303,7 +308,7 @@ class Connection(object):
def connection_create(self, broker): def connection_create(self, broker):
# Create the connection - this does not open the connection # Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(broker) self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection # Check if flags are set and if so set them for the connection
# before we call open # before we call open
@ -328,7 +333,7 @@ class Connection(object):
if self.connection.opened(): if self.connection.opened():
try: try:
self.connection.close() self.connection.close()
except qpid.messaging.exceptions.ConnectionError: except qpid_exceptions.ConnectionError:
pass pass
attempt = 0 attempt = 0
@ -340,7 +345,7 @@ class Connection(object):
try: try:
self.connection_create(broker) self.connection_create(broker)
self.connection.open() self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e: except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay) msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. " msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict "Sleeping %(delay)s seconds") % msg_dict
@ -367,8 +372,8 @@ class Connection(object):
while True: while True:
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (qpid.messaging.exceptions.Empty, except (qpid_exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e: qpid_exceptions.ConnectionError), e:
if error_callback: if error_callback:
error_callback(e) error_callback(e)
self.reconnect() self.reconnect()
@ -408,7 +413,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers""" """Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc): def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty): if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') % LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc)) str(exc))
raise rpc_common.Timeout() raise rpc_common.Timeout()

View File

@ -23,7 +23,6 @@ import types
import uuid import uuid
import eventlet import eventlet
from eventlet.green import zmq
import greenlet import greenlet
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
@ -33,6 +32,7 @@ from quantum.openstack.common import jsonutils
from quantum.openstack.common import processutils as utils from quantum.openstack.common import processutils as utils
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified. # for convenience, are not modified.
pformat = pprint.pformat pformat = pprint.pformat
@ -76,9 +76,9 @@ zmq_opts = [
] ]
# These globals are defined in register_opts(conf), CONF = cfg.CONF
# a mandatory initialization call CONF.register_opts(zmq_opts)
CONF = None
ZMQ_CTX = None # ZeroMQ Context, must be global. ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object matchmaker = None # memoized matchmaker object
@ -113,7 +113,7 @@ class ZmqSocket(object):
""" """
def __init__(self, addr, zmq_type, bind=True, subscribe=None): def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = ZMQ_CTX.socket(zmq_type) self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr self.addr = addr
self.type = zmq_type self.type = zmq_type
self.subscriptions = [] self.subscriptions = []
@ -187,11 +187,15 @@ class ZmqSocket(object):
pass pass
self.subscriptions = [] self.subscriptions = []
# Linger -1 prevents lost/dropped messages
try: try:
self.sock.close(linger=-1) # Default is to linger
self.sock.close()
except Exception: except Exception:
pass # While this is a bad thing to happen,
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None self.sock = None
def recv(self): def recv(self):
@ -208,7 +212,9 @@ class ZmqSocket(object):
class ZmqClient(object): class ZmqClient(object):
"""Client for ZMQ sockets.""" """Client for ZMQ sockets."""
def __init__(self, addr, socket_type=zmq.PUSH, bind=False): def __init__(self, addr, socket_type=None, bind=False):
if socket_type is None:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
@ -685,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
conf = CONF conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic) queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues) LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results # Don't stack if we have no matchmaker results
@ -753,32 +759,29 @@ def notify(conf, context, topic, msg, **kwargs):
def cleanup(): def cleanup():
"""Clean up resources in use by implementation.""" """Clean up resources in use by implementation."""
global ZMQ_CTX global ZMQ_CTX
global matchmaker if ZMQ_CTX:
matchmaker = None ZMQ_CTX.term()
ZMQ_CTX.term()
ZMQ_CTX = None ZMQ_CTX = None
def register_opts(conf):
"""Registration of options for this driver."""
#NOTE(ewindisch): ZMQ_CTX and matchmaker
# are initialized here as this is as good
# an initialization method as any.
# We memoize through these globals
global ZMQ_CTX
global matchmaker global matchmaker
global CONF matchmaker = None
if not CONF:
conf.register_opts(zmq_opts) def _get_ctxt():
CONF = conf if not zmq:
# Don't re-set, if this method is called twice. raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX: if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
return ZMQ_CTX
def _get_matchmaker():
global matchmaker
if not matchmaker: if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class' # rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = conf.rpc_zmq_matchmaker.split('.') mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1]) mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1] mm_class = mm_path[-1]
@ -791,6 +794,4 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module) mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class) mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor() matchmaker = mm_constructor()
return matchmaker
register_opts(cfg.CONF)