diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index 27e5f6e71..890d5a168 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -67,7 +67,7 @@ class SocketsManager(object): socket = self._check_for_new_hosts(target) else: socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type) + self.socket_type, immediate=False) self._get_hosts_and_connect(socket, target) return socket diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py index 2e053f5c9..39de56698 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py @@ -90,7 +90,7 @@ class UniversalQueueProxy(object): payload.insert(0, routing_key) payload.insert(0, msg_type) return payload - except (AssertionError, zmq.ZMQError): + except (AssertionError, ValueError, zmq.ZMQError): LOG.error("Received message with wrong format") return None diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 3715b8466..5f5e8ef2e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -76,7 +76,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): else: LOG.error(_LE("Unknown message type: %s"), zmq_names.message_type_str(message_type)) - except (zmq.ZMQError, AssertionError) as e: + except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failure: %s"), str(e)) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 99b65ed76..f9913cb00 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -63,7 +63,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): else: LOG.error(_LE("Unknown message type: %s"), zmq_names.message_type_str(msg_type)) - except (zmq.ZMQError, AssertionError) as e: + except (zmq.ZMQError, AssertionError, ValueError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index c50ffe4ed..14061b2eb 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -38,7 +38,8 @@ class ZmqSocket(object): 'msgpack': msgpack_serializer.MessagePackSerializer() } - def __init__(self, conf, context, socket_type, high_watermark=0): + def __init__(self, conf, context, socket_type, immediate=True, + high_watermark=0): self.conf = conf self.context = context self.socket_type = socket_type @@ -49,6 +50,8 @@ class ZmqSocket(object): if self.conf.rpc_cast_timeout > 0: self.close_linger = self.conf.rpc_cast_timeout * 1000 self.handle.setsockopt(zmq.LINGER, self.close_linger) + # Put messages to only connected queues + self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0) self.handle.identity = six.b(str(uuid.uuid4())) self.connections = set() @@ -162,8 +165,9 @@ class ZmqRandomPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type, host=None, high_watermark=0): - super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type, - high_watermark) + super(ZmqRandomPortSocket, self).__init__( + conf, context, socket_type, immediate=False, + high_watermark=high_watermark) self.bind_address = zmq_address.get_tcp_random_address(self.conf) if host is None: host = conf.rpc_zmq_host @@ -183,8 +187,9 @@ class ZmqFixedPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type, host, port, high_watermark=0): - super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type, - high_watermark) + super(ZmqFixedPortSocket, self).__init__( + conf, context, socket_type, immediate=False, + high_watermark=high_watermark) self.connect_address = zmq_address.combine_address(host, port) self.bind_address = zmq_address.get_tcp_direct_address( zmq_address.combine_address(conf.rpc_zmq_bind_address, port))