[zmq] Use zmq.IMMEDIATE option for round-robin

This options helps to prevent message loss by scheduling
messages only to a connected queue. If there is no connections
socket hangs waiting.

Change-Id: I87b97c8b77887f53599a28e0d05fc2c71c149499
Closes-Bug: #1606272
This commit is contained in:
ozamiatin 2016-07-26 12:52:11 +03:00
parent 474d26b34e
commit 9e61efa67d
5 changed files with 14 additions and 9 deletions

View File

@ -67,7 +67,7 @@ class SocketsManager(object):
socket = self._check_for_new_hosts(target)
else:
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
self.socket_type)
self.socket_type, immediate=False)
self._get_hosts_and_connect(socket, target)
return socket

View File

@ -90,7 +90,7 @@ class UniversalQueueProxy(object):
payload.insert(0, routing_key)
payload.insert(0, msg_type)
return payload
except (AssertionError, zmq.ZMQError):
except (AssertionError, ValueError, zmq.ZMQError):
LOG.error("Received message with wrong format")
return None

View File

@ -76,7 +76,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(message_type))
except (zmq.ZMQError, AssertionError) as e:
except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failure: %s"), str(e))
def cleanup(self):

View File

@ -63,7 +63,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(msg_type))
except (zmq.ZMQError, AssertionError) as e:
except (zmq.ZMQError, AssertionError, ValueError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))
def cleanup(self):

View File

@ -38,7 +38,8 @@ class ZmqSocket(object):
'msgpack': msgpack_serializer.MessagePackSerializer()
}
def __init__(self, conf, context, socket_type, high_watermark=0):
def __init__(self, conf, context, socket_type, immediate=True,
high_watermark=0):
self.conf = conf
self.context = context
self.socket_type = socket_type
@ -49,6 +50,8 @@ class ZmqSocket(object):
if self.conf.rpc_cast_timeout > 0:
self.close_linger = self.conf.rpc_cast_timeout * 1000
self.handle.setsockopt(zmq.LINGER, self.close_linger)
# Put messages to only connected queues
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
self.handle.identity = six.b(str(uuid.uuid4()))
self.connections = set()
@ -162,8 +165,9 @@ class ZmqRandomPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type, host=None,
high_watermark=0):
super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type,
high_watermark)
super(ZmqRandomPortSocket, self).__init__(
conf, context, socket_type, immediate=False,
high_watermark=high_watermark)
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
if host is None:
host = conf.rpc_zmq_host
@ -183,8 +187,9 @@ class ZmqFixedPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type, host, port,
high_watermark=0):
super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type,
high_watermark)
super(ZmqFixedPortSocket, self).__init__(
conf, context, socket_type, immediate=False,
high_watermark=high_watermark)
self.connect_address = zmq_address.combine_address(host, port)
self.bind_address = zmq_address.get_tcp_direct_address(
zmq_address.combine_address(conf.rpc_zmq_bind_address, port))