Merge "[zmq] Use zmq.IMMEDIATE option for round-robin"
This commit is contained in:
commit
b259f88b09
@ -67,7 +67,7 @@ class SocketsManager(object):
|
||||
socket = self._check_for_new_hosts(target)
|
||||
else:
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type)
|
||||
self.socket_type, immediate=False)
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
return socket
|
||||
|
||||
|
@ -90,7 +90,7 @@ class UniversalQueueProxy(object):
|
||||
payload.insert(0, routing_key)
|
||||
payload.insert(0, msg_type)
|
||||
return payload
|
||||
except (AssertionError, zmq.ZMQError):
|
||||
except (AssertionError, ValueError, zmq.ZMQError):
|
||||
LOG.error("Received message with wrong format")
|
||||
return None
|
||||
|
||||
|
@ -76,7 +76,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
else:
|
||||
LOG.error(_LE("Unknown message type: %s"),
|
||||
zmq_names.message_type_str(message_type))
|
||||
except (zmq.ZMQError, AssertionError) as e:
|
||||
except (zmq.ZMQError, AssertionError, ValueError) as e:
|
||||
LOG.error(_LE("Receiving message failure: %s"), str(e))
|
||||
|
||||
def cleanup(self):
|
||||
|
@ -63,7 +63,7 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
else:
|
||||
LOG.error(_LE("Unknown message type: %s"),
|
||||
zmq_names.message_type_str(msg_type))
|
||||
except (zmq.ZMQError, AssertionError) as e:
|
||||
except (zmq.ZMQError, AssertionError, ValueError) as e:
|
||||
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||
|
||||
def cleanup(self):
|
||||
|
@ -38,7 +38,8 @@ class ZmqSocket(object):
|
||||
'msgpack': msgpack_serializer.MessagePackSerializer()
|
||||
}
|
||||
|
||||
def __init__(self, conf, context, socket_type, high_watermark=0):
|
||||
def __init__(self, conf, context, socket_type, immediate=True,
|
||||
high_watermark=0):
|
||||
self.conf = conf
|
||||
self.context = context
|
||||
self.socket_type = socket_type
|
||||
@ -49,6 +50,8 @@ class ZmqSocket(object):
|
||||
if self.conf.rpc_cast_timeout > 0:
|
||||
self.close_linger = self.conf.rpc_cast_timeout * 1000
|
||||
self.handle.setsockopt(zmq.LINGER, self.close_linger)
|
||||
# Put messages to only connected queues
|
||||
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
|
||||
self.handle.identity = six.b(str(uuid.uuid4()))
|
||||
self.connections = set()
|
||||
|
||||
@ -162,8 +165,9 @@ class ZmqRandomPortSocket(ZmqSocket):
|
||||
|
||||
def __init__(self, conf, context, socket_type, host=None,
|
||||
high_watermark=0):
|
||||
super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type,
|
||||
high_watermark)
|
||||
super(ZmqRandomPortSocket, self).__init__(
|
||||
conf, context, socket_type, immediate=False,
|
||||
high_watermark=high_watermark)
|
||||
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
||||
if host is None:
|
||||
host = conf.rpc_zmq_host
|
||||
@ -183,8 +187,9 @@ class ZmqFixedPortSocket(ZmqSocket):
|
||||
|
||||
def __init__(self, conf, context, socket_type, host, port,
|
||||
high_watermark=0):
|
||||
super(ZmqFixedPortSocket, self).__init__(conf, context, socket_type,
|
||||
high_watermark)
|
||||
super(ZmqFixedPortSocket, self).__init__(
|
||||
conf, context, socket_type, immediate=False,
|
||||
high_watermark=high_watermark)
|
||||
self.connect_address = zmq_address.combine_address(host, port)
|
||||
self.bind_address = zmq_address.get_tcp_direct_address(
|
||||
zmq_address.combine_address(conf.rpc_zmq_bind_address, port))
|
||||
|
Loading…
x
Reference in New Issue
Block a user