Sync Qpid RPC fix from Oslo

Qpid cannot serialize dicts containing strings longer than 65535
characters.  This change syncs the fix from Oslo to Quantum.

Fixes bug 1175808

Change-Id: I48071abffa86e71727deed05aca08ac475cbaf05
This commit is contained in:
Ben Nemec 2013-06-07 16:35:14 +00:00
parent f83931af80
commit 68a5a38702

View File

@ -31,6 +31,7 @@ from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import amqp as rpc_amqp from quantum.openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common
qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging") qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
@ -69,6 +70,8 @@ qpid_opts = [
cfg.CONF.register_opts(qpid_opts) cfg.CONF.register_opts(qpid_opts)
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""
@ -118,15 +121,32 @@ class ConsumerBase(object):
self.reconnect(session) self.reconnect(session)
def reconnect(self, session): def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect""" """Re-declare the receiver after a qpid reconnect."""
self.session = session self.session = session
self.receiver = session.receiver(self.address) self.receiver = session.receiver(self.address)
self.receiver.capacity = 1 self.receiver.capacity = 1
def _unpack_json_msg(self, msg):
"""Load the JSON data in msg if msg.content_type indicates that it
is necessary. Put the loaded data back into msg.content and
update msg.content_type appropriately.
A Qpid Message containing a dict will have a content_type of
'amqp/map', whereas one containing a string that needs to be converted
back from JSON will have a content_type of JSON_CONTENT_TYPE.
:param msg: a Qpid Message object
:returns: None
"""
if msg.content_type == JSON_CONTENT_TYPE:
msg.content = jsonutils.loads(msg.content)
msg.content_type = 'amqp/map'
def consume(self): def consume(self):
"""Fetch the message and pass it to the callback object""" """Fetch the message and pass it to the callback object."""
message = self.receiver.fetch() message = self.receiver.fetch()
try: try:
self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content) msg = rpc_common.deserialize_msg(message.content)
self.callback(msg) self.callback(msg)
except Exception: except Exception:
@ -139,7 +159,7 @@ class ConsumerBase(object):
class DirectConsumer(ConsumerBase): class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'""" """Queue/consumer class for 'direct'."""
def __init__(self, conf, session, msg_id, callback): def __init__(self, conf, session, msg_id, callback):
"""Init a 'direct' queue. """Init a 'direct' queue.
@ -157,7 +177,7 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase): class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'""" """Consumer class for 'topic'."""
def __init__(self, conf, session, topic, callback, name=None, def __init__(self, conf, session, topic, callback, name=None,
exchange_name=None): exchange_name=None):
@ -177,7 +197,7 @@ class TopicConsumer(ConsumerBase):
class FanoutConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'""" """Consumer class for 'fanout'."""
def __init__(self, conf, session, topic, callback): def __init__(self, conf, session, topic, callback):
"""Init a 'fanout' queue. """Init a 'fanout' queue.
@ -196,7 +216,7 @@ class FanoutConsumer(ConsumerBase):
class Publisher(object): class Publisher(object):
"""Base Publisher class""" """Base Publisher class."""
def __init__(self, session, node_name, node_opts=None): def __init__(self, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key, """Init the Publisher class with the exchange_name, routing_key,
@ -225,16 +245,43 @@ class Publisher(object):
self.reconnect(session) self.reconnect(session)
def reconnect(self, session): def reconnect(self, session):
"""Re-establish the Sender after a reconnection""" """Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address) self.sender = session.sender(self.address)
def _pack_json_msg(self, msg):
"""Qpid cannot serialize dicts containing strings longer than 65535
characters. This function dumps the message content to a JSON
string, which Qpid is able to handle.
:param msg: May be either a Qpid Message object or a bare dict.
:returns: A Qpid Message with its content field JSON encoded.
"""
try:
msg.content = jsonutils.dumps(msg.content)
except AttributeError:
# Need to have a Qpid message so we can set the content_type.
msg = qpid_messaging.Message(jsonutils.dumps(msg))
msg.content_type = JSON_CONTENT_TYPE
return msg
def send(self, msg): def send(self, msg):
"""Send a message""" """Send a message."""
try:
# Check if Qpid can encode the message
check_msg = msg
if not hasattr(check_msg, 'content_type'):
check_msg = qpid_messaging.Message(msg)
content_type = check_msg.content_type
enc, dec = qpid_messaging.message.get_codec(content_type)
enc(check_msg.content)
except qpid_codec.CodecException:
# This means the message couldn't be serialized as a dict.
msg = self._pack_json_msg(msg)
self.sender.send(msg) self.sender.send(msg)
class DirectPublisher(Publisher): class DirectPublisher(Publisher):
"""Publisher class for 'direct'""" """Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id): def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher.""" """Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id, super(DirectPublisher, self).__init__(session, msg_id,
@ -242,7 +289,7 @@ class DirectPublisher(Publisher):
class TopicPublisher(Publisher): class TopicPublisher(Publisher):
"""Publisher class for 'topic'""" """Publisher class for 'topic'."""
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
@ -252,7 +299,7 @@ class TopicPublisher(Publisher):
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'""" """Publisher class for 'fanout'."""
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'fanout' publisher. """init a 'fanout' publisher.
""" """
@ -262,7 +309,7 @@ class FanoutPublisher(Publisher):
class NotifyPublisher(Publisher): class NotifyPublisher(Publisher):
"""Publisher class for notifications""" """Publisher class for notifications."""
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
@ -330,7 +377,7 @@ class Connection(object):
return self.consumers[str(receiver)] return self.consumers[str(receiver)]
def reconnect(self): def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues""" """Handles reconnecting and re-establishing sessions and queues."""
attempt = 0 attempt = 0
delay = 1 delay = 1
while True: while True:
@ -381,14 +428,20 @@ class Connection(object):
self.reconnect() self.reconnect()
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection."""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks() self.wait_on_proxy_callbacks()
self.connection.close() try:
self.connection.close()
except Exception:
# NOTE(dripton) Logging exceptions that happen during cleanup just
# causes confusion; there's really nothing useful we can do with
# them.
pass
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again."""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks() self.wait_on_proxy_callbacks()
self.session.close() self.session.close()
@ -412,7 +465,7 @@ class Connection(object):
return self.ensure(_connect_error, _declare_consumer) return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None): def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers""" """Return an iterator that will consume from all queues/consumers."""
def _error_callback(exc): def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty): if isinstance(exc, qpid_exceptions.Empty):
@ -436,7 +489,7 @@ class Connection(object):
yield self.ensure(_error_callback, _consume) yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self): def cancel_consumer_thread(self):
"""Cancel a consumer thread""" """Cancel a consumer thread."""
if self.consumer_thread is not None: if self.consumer_thread is not None:
self.consumer_thread.kill() self.consumer_thread.kill()
try: try:
@ -451,7 +504,7 @@ class Connection(object):
proxy_cb.wait() proxy_cb.wait()
def publisher_send(self, cls, topic, msg): def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class."""
def _connect_error(exc): def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)} log_info = {'topic': topic, 'err_str': str(exc)}
@ -481,15 +534,15 @@ class Connection(object):
topic, callback) topic, callback)
def declare_fanout_consumer(self, topic, callback): def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer""" """Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback) self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg): def direct_send(self, msg_id, msg):
"""Send a 'direct' message""" """Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg) self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg, timeout=None): def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message""" """Send a 'topic' message."""
# #
# We want to create a message with attributes, e.g. a TTL. We # We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer # don't really need to keep 'msg' in its JSON format any longer
@ -504,15 +557,15 @@ class Connection(object):
self.publisher_send(TopicPublisher, topic, qpid_message) self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg): def fanout_send(self, topic, msg):
"""Send a 'fanout' message""" """Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg) self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs): def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic""" """Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg) self.publisher_send(NotifyPublisher, topic, msg)
def consume(self, limit=None): def consume(self, limit=None):
"""Consume from all queues/consumers""" """Consume from all queues/consumers."""
it = self.iterconsume(limit=limit) it = self.iterconsume(limit=limit)
while True: while True:
try: try:
@ -521,7 +574,7 @@ class Connection(object):
return return
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread""" """Consumer from all queues/consumers in a greenthread."""
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()
@ -532,7 +585,7 @@ class Connection(object):
return self.consumer_thread return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False): def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object""" """Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
@ -548,7 +601,7 @@ class Connection(object):
return consumer return consumer
def create_worker(self, topic, proxy, pool_name): def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object""" """Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
@ -591,7 +644,7 @@ class Connection(object):
def create_connection(conf, new=True): def create_connection(conf, new=True):
"""Create a connection""" """Create a connection."""
return rpc_amqp.create_connection( return rpc_amqp.create_connection(
conf, new, conf, new,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))