diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index cea63d78f..03ccea19d 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -35,10 +35,6 @@ Usage example: def main(): - logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s %(name)s ' - '%(levelname)-8s %(message)s') - parser = argparse.ArgumentParser( description='ZeroMQ proxy service', usage=USAGE @@ -46,11 +42,21 @@ def main(): parser.add_argument('--config-file', dest='config_file', type=str, help='Path to configuration file') + parser.add_argument('-d', '--debug', dest='debug', type=bool, + default=False, + help="Turn on DEBUG logging level instead of INFO") args = parser.parse_args() if args.config_file: cfg.CONF(["--config-file", args.config_file]) + log_level = logging.INFO + if args.debug: + log_level = logging.DEBUG + logging.basicConfig(level=log_level, + format='%(asctime)s %(name)s ' + '%(levelname)-8s %(message)s') + reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy) try: diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index e3e3755d3..31143646b 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -35,23 +35,32 @@ class UniversalQueueProxy(object): self.matchmaker = matchmaker self.poller = zmq_async.get_poller(zmq_concurrency='native') - self.router_socket = zmq_socket.ZmqRandomPortSocket( + self.fe_router_socket = zmq_socket.ZmqRandomPortSocket( + conf, context, zmq.ROUTER) + self.be_router_socket = zmq_socket.ZmqRandomPortSocket( conf, context, zmq.ROUTER) - self.poller.register(self.router_socket.handle, + self.poller.register(self.fe_router_socket.handle, + self._receive_in_request) + self.poller.register(self.be_router_socket.handle, self._receive_in_request) - self.router_address = zmq_address.combine_address( - self.conf.rpc_zmq_host, self.router_socket.port) + self.fe_router_address = zmq_address.combine_address( + self.conf.rpc_zmq_host, self.fe_router_socket.port) + self.be_router_address = zmq_address.combine_address( + self.conf.rpc_zmq_host, self.fe_router_socket.port) self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( conf, matchmaker) self.matchmaker.register_publisher( - (self.pub_publisher.host, self.router_address)) + (self.pub_publisher.host, self.fe_router_address)) LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"), {"pub": self.pub_publisher.host, - "router": self.router_address}) + "router": self.fe_router_address}) + self.matchmaker.register_router(self.be_router_address) + LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"), + {"router": self.be_router_address}) def run(self): message, socket = self.poller.poll(self.conf.rpc_poll_timeout) @@ -63,31 +72,40 @@ class UniversalQueueProxy(object): LOG.debug("-> Redirecting request %s to TCP publisher", envelope) self.pub_publisher.send_request(message) elif not envelope.is_mult_send: - self._redirect_message(message) + self._redirect_message(self.be_router_socket + if socket is self.fe_router_socket + else self.fe_router_socket, message) @staticmethod def _receive_in_request(socket): - reply_id = socket.recv() - assert reply_id is not None, "Valid id expected" - empty = socket.recv() - assert empty == b'', "Empty delimiter expected" - envelope = socket.recv_pyobj() - payload = socket.recv_multipart() - payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) - return payload + try: + reply_id = socket.recv() + assert reply_id is not None, "Valid id expected" + empty = socket.recv() + assert empty == b'', "Empty delimiter expected" + envelope = socket.recv_pyobj() + payload = socket.recv_multipart() + payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) + return payload + except (AssertionError, zmq.ZMQError): + LOG.error("Received message with wrong format") + return None - def _redirect_message(self, multipart_message): + @staticmethod + def _redirect_message(socket, multipart_message): envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] LOG.debug("<-> Dispatch message: %s", envelope) response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY] - self.router_socket.send(envelope.routing_key, zmq.SNDMORE) - self.router_socket.send(b'', zmq.SNDMORE) - self.router_socket.send_pyobj(envelope, zmq.SNDMORE) - self.router_socket.send(response_binary) + socket.send(envelope.routing_key, zmq.SNDMORE) + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(envelope, zmq.SNDMORE) + socket.send(response_binary) def cleanup(self): - self.router_socket.close() + self.fe_router_socket.close() + self.be_router_socket.close() self.pub_publisher.cleanup() self.matchmaker.unregister_publisher( - (self.pub_publisher.host, self.router_address)) + (self.pub_publisher.host, self.fe_router_address)) + self.matchmaker.unregister_router(self.be_router_address) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index a701dad91..51e8d4181 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -110,6 +110,7 @@ class SocketsManager(object): self.zmq_context = zmq.Context() self.outbound_sockets = {} self.socket_to_publishers = None + self.socket_to_routers = None def _track_socket(self, socket, target): self.outbound_sockets[str(target)] = (socket, time.time()) @@ -162,6 +163,16 @@ class SocketsManager(object): self.socket_to_publishers.connect_to_host(router_address) return self.socket_to_publishers + def get_socket_to_routers(self): + if self.socket_to_routers is not None: + return self.socket_to_routers + self.socket_to_routers = zmq_socket.ZmqSocket( + self.conf, self.zmq_context, self.socket_type) + routers = self.matchmaker.get_routers() + for router_address in routers: + self.socket_to_routers.connect_to_host(router_address) + return self.socket_to_routers + def cleanup(self): for socket, tm in self.outbound_sockets.values(): socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 8fd7b5622..dc5419450 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -87,7 +87,7 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase): self.target = server.target self.sockets_manager = zmq_publisher_base.SocketsManager( conf, self.matchmaker, zmq.ROUTER, zmq.DEALER) - self.socket = self.sockets_manager.get_socket_to_publishers() + self.socket = self.sockets_manager.get_socket_to_routers() self.poller.register(self.socket, self.receive_message) self.host = self.socket.handle.identity self.target_updater = zmq_consumer_base.TargetUpdater(