Merge "[zmq] Don't skip non-direct message types"
This commit is contained in:
commit
e3d55aa275
@ -87,7 +87,7 @@ zmq_opts = [
|
||||
'PUB/SUB always uses proxy.'),
|
||||
|
||||
cfg.BoolOpt('use_router_proxy', default=True,
|
||||
help='Use ROUTER remote proxy for direct methods.'),
|
||||
help='Use ROUTER remote proxy.'),
|
||||
|
||||
cfg.PortOpt('rpc_zmq_min_port',
|
||||
default=49153,
|
||||
|
@ -73,7 +73,7 @@ class UniversalQueueProxy(object):
|
||||
if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE,
|
||||
zmq_names.NOTIFY_TYPE):
|
||||
self.pub_publisher.send_request(message)
|
||||
elif msg_type in zmq_names.DIRECT_TYPES:
|
||||
else:
|
||||
self._redirect_message(self.be_router_socket
|
||||
if socket is self.fe_router_socket
|
||||
else self.fe_router_socket, message)
|
||||
|
@ -35,6 +35,7 @@ class DealerPublisherProxy(object):
|
||||
"""Used when publishing to a proxy. """
|
||||
|
||||
def __init__(self, conf, matchmaker, socket_to_proxy):
|
||||
self.conf = conf
|
||||
self.sockets_manager = zmq_publisher_base.SocketsManager(
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
self.socket = socket_to_proxy
|
||||
@ -45,10 +46,17 @@ class DealerPublisherProxy(object):
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(
|
||||
request.msg_type)
|
||||
|
||||
if self.conf.use_pub_sub:
|
||||
routing_key = self.routing_table.get_routable_host(request.target) \
|
||||
if request.msg_type in zmq_names.DIRECT_TYPES else \
|
||||
zmq_address.target_to_subscribe_filter(request.target)
|
||||
self._do_send_request(request, routing_key)
|
||||
else:
|
||||
routing_keys = self.routing_table.get_all_hosts(request.target)
|
||||
for routing_key in routing_keys:
|
||||
self._do_send_request(request, routing_key)
|
||||
|
||||
def _do_send_request(self, request, routing_key):
|
||||
self.socket.send(b'', zmq.SNDMORE)
|
||||
self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
|
||||
self.socket.send(six.b(routing_key), zmq.SNDMORE)
|
||||
@ -132,6 +140,10 @@ class RoutingTable(object):
|
||||
self.routing_table = {}
|
||||
self.routable_hosts = {}
|
||||
|
||||
def get_all_hosts(self, target):
|
||||
self._update_routing_table(target)
|
||||
return list(self.routable_hosts.get(str(target)) or [])
|
||||
|
||||
def get_routable_host(self, target):
|
||||
self._update_routing_table(target)
|
||||
hosts_for_target = self.routable_hosts[str(target)]
|
||||
|
Loading…
x
Reference in New Issue
Block a user