diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 5a3e3888c..529063def 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -87,7 +87,7 @@ zmq_opts = [ 'PUB/SUB always uses proxy.'), cfg.BoolOpt('use_router_proxy', default=True, - help='Use ROUTER remote proxy for direct methods.'), + help='Use ROUTER remote proxy.'), cfg.PortOpt('rpc_zmq_min_port', default=49153, diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 61f3e37a0..215f0a347 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -73,7 +73,7 @@ class UniversalQueueProxy(object): if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE, zmq_names.NOTIFY_TYPE): self.pub_publisher.send_request(message) - elif msg_type in zmq_names.DIRECT_TYPES: + else: self._redirect_message(self.be_router_socket if socket is self.fe_router_socket else self.fe_router_socket, message) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 1064e7279..5cba7820a 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -35,6 +35,7 @@ class DealerPublisherProxy(object): """Used when publishing to a proxy. """ def __init__(self, conf, matchmaker, socket_to_proxy): + self.conf = conf self.sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) self.socket = socket_to_proxy @@ -45,10 +46,17 @@ class DealerPublisherProxy(object): raise zmq_publisher_base.UnsupportedSendPattern( request.msg_type) - routing_key = self.routing_table.get_routable_host(request.target) \ - if request.msg_type in zmq_names.DIRECT_TYPES else \ - zmq_address.target_to_subscribe_filter(request.target) + if self.conf.use_pub_sub: + routing_key = self.routing_table.get_routable_host(request.target) \ + if request.msg_type in zmq_names.DIRECT_TYPES else \ + zmq_address.target_to_subscribe_filter(request.target) + self._do_send_request(request, routing_key) + else: + routing_keys = self.routing_table.get_all_hosts(request.target) + for routing_key in routing_keys: + self._do_send_request(request, routing_key) + def _do_send_request(self, request, routing_key): self.socket.send(b'', zmq.SNDMORE) self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) self.socket.send(six.b(routing_key), zmq.SNDMORE) @@ -132,6 +140,10 @@ class RoutingTable(object): self.routing_table = {} self.routable_hosts = {} + def get_all_hosts(self, target): + self._update_routing_table(target) + return list(self.routable_hosts.get(str(target)) or []) + def get_routable_host(self, target): self._update_routing_table(target) hosts_for_target = self.routable_hosts[str(target)]