From b5955b6ca938bfccfeefb37062bb5ff81c8895a4 Mon Sep 17 00:00:00 2001 From: ozamiatin Date: Thu, 21 Apr 2016 01:06:39 +0300 Subject: [PATCH] [zmq] Redesign router proxy In this change router was redesigned in a way most appropriate for routing concept of zmq.ROUTER socket. DEALER(cli)-ROUTER(proxy)-DEALER(srv) instead of DEALER-ROUTER-DEALER-ROUTER (3 layers instead of 4) The main reason is to use zmq.DEALER identity in message routing. For this reason DealerConsumer was introduced server-side. RouterConsumer is left for peer-to-peer DEALER-ROUTER deployment option. Also handled assertions in receive-methods in order to not stop server when received message with wrong format. Change-Id: If25edf500fa8d220d4233bb13d67121824e841c6 Closes-Bug: #1558601 Related-Bug: #1555007 --- doc/source/zmq_driver.rst | 82 +++++------ oslo_messaging/_cmd/zmq_proxy.py | 27 +--- oslo_messaging/_drivers/impl_zmq.py | 2 +- .../zmq_driver/broker/zmq_queue_proxy.py | 106 ++++---------- .../dealer/zmq_dealer_call_publisher.py | 117 ++++----------- .../publishers/dealer/zmq_dealer_publisher.py | 29 ---- .../dealer/zmq_dealer_publisher_proxy.py | 133 ++++++++++++++---- .../publishers/dealer/zmq_reply_waiter.py | 69 +++++++++ .../client/publishers/zmq_pub_publisher.py | 40 ------ .../client/publishers/zmq_publisher_base.py | 19 +-- .../_drivers/zmq_driver/client/zmq_client.py | 30 ++-- .../zmq_driver/client/zmq_envelope.py | 22 ++- .../_drivers/zmq_driver/client/zmq_request.py | 10 +- .../zmq_driver/matchmaker/matchmaker_redis.py | 6 +- .../server/consumers/zmq_dealer_consumer.py | 123 ++++++++++++++++ .../server/consumers/zmq_pull_consumer.py | 2 +- .../server/consumers/zmq_router_consumer.py | 2 +- .../server/consumers/zmq_sub_consumer.py | 7 +- .../_drivers/zmq_driver/server/zmq_server.py | 20 ++- .../_drivers/zmq_driver/zmq_names.py | 2 +- .../zmq/matchmaker/test_impl_matchmaker.py | 6 +- .../tests/functional/zmq/test_startup.py | 3 +- setup-test-env-zmq.sh | 3 +- 23 files changed, 477 insertions(+), 383 deletions(-) create mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py create mode 100644 oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst index 4c5e4119b..e34fe3fca 100644 --- a/doc/source/zmq_driver.rst +++ b/doc/source/zmq_driver.rst @@ -42,6 +42,11 @@ services) is both ZeroMQ client and server. As a result, each host needs to listen to a certain TCP port for incoming connections and directly connect to other hosts simultaneously. +Another option is to use a router proxy. It is not a broker because it +doesn't assume any message ownership or persistence or replication etc. It +performs only a redirection of messages to endpoints taking routing info from +message envelope. + Topics are used to identify the destination for a ZeroMQ RPC call. There are two types of topics, bare topics and directed topics. Bare topics look like 'compute', while directed topics look like 'compute.machine1'. @@ -66,9 +71,10 @@ Assuming the following systems as a goal. | Keystone | | Nova | | Glance | | nova-compute | | Neutron | | Ceilometer | - | Cinder | | Oslo-zmq-receiver | + | Cinder | | | | Ceilometer | +------------------------+ - | Oslo-zmq-receiver | + | zmq-proxy | + | Redis | | Horizon | +---------------------+ @@ -125,6 +131,7 @@ which is 120 (seconds) by default. The option is related not specifically to redis so it is also defined in [DEFAULT] section. If option value is <= 0 then keys don't expire and live forever in the storage. + MatchMaker Data Source (mandatory) ---------------------------------- @@ -137,50 +144,34 @@ stored in Redis is that the key is a base topic and the corresponding values are hostname arrays to be sent to. -Proxy and huge number of TCP sockets ------------------------------------- +Restrict the number of TCP sockets on controller +------------------------------------------------ + +The most heavily used RPC pattern (CALL) may consume too many TCP sockets on +controller node in directly connected configuration. To solve the issue +ROUTER proxy may be used. -The most heavily used RPC pattern (CALL) may consume too many TCP sockets in -directly connected configuration. To solve the issue ROUTER proxy may be used. In order to configure driver to use ROUTER proxy set up the 'use_router_proxy' -option to True in [DEFAULT] section (False is set by default). +option to true in [DEFAULT] section (false is set by default). For example:: - use_router_proxy = True + use_router_proxy = true Not less than 3 proxies should be running on controllers or on stand alone nodes. The parameters for the script oslo-messaging-zmq-proxy should be:: oslo-messaging-zmq-proxy - --type ROUTER --config-file /etc/oslo/zeromq.conf --log-file /var/log/oslo/zmq-router-proxy.log - -Proxy for fanout publishing ---------------------------- - Fanout-based patterns like CAST+Fanout and notifications always use proxy -as they act over PUB/SUB, 'use_pub_sub' option defaults to True. In such case -publisher proxy should be running. Publisher-proxies are independent from each -other. Recommended number of proxies in the cloud is not less than 3. You -may run them on a standalone nodes or on controller nodes. -The parameters for the script oslo-messaging-zmq-proxy should be:: +as they act over PUB/SUB, 'use_pub_sub' option defaults to true. In such case +publisher proxy should be running. Actually proxy does both: routing to a +DEALER endpoint for direct messages and publishing to all subscribers over +zmq.PUB socket. - oslo-messaging-zmq-proxy - --type PUBLISHER - --config-file /etc/oslo/zeromq.conf - --log-file /var/log/oslo/zmq-publisher-proxy.log - -Actually PUBLISHER is the default value for the parameter --type, so -could be omitted:: - - oslo-messaging-zmq-proxy - --config-file /etc/oslo/zeromq.conf - --log-file /var/log/oslo/zmq-publisher-proxy.log - -If not using PUB/SUB (use_pub_sub = False) then fanout will be emulated over +If not using PUB/SUB (use_pub_sub = false) then fanout will be emulated over direct DEALER/ROUTER unicast which is possible but less efficient and therefore is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not needed. @@ -189,7 +180,7 @@ This option can be set in [DEFAULT] section. For example:: - use_pub_sub = True + use_pub_sub = true In case of using a proxy all publishers (clients) talk to servers over @@ -239,12 +230,23 @@ For example:: enable_plugin zmq https://github.com/openstack/devstack-plugin-zmq.git + +Example of local.conf:: + + [[local|localrc]] + DATABASE_PASSWORD=password + ADMIN_PASSWORD=password + SERVICE_PASSWORD=password + SERVICE_TOKEN=password + + enable_plugin zmq https://github.com/openstack/devstack-plugin-zmq.git + + OSLOMSG_REPO=https://review.openstack.org/openstack/oslo.messaging + OSLOMSG_BRANCH=master + + ZEROMQ_MATCHMAKER=redis + LIBS_FROM_GIT=oslo.messaging + ENABLE_DEBUG_LOG_LEVEL=True + + .. _devstack-plugin-zmq: https://github.com/openstack/devstack-plugin-zmq.git - - -Current Status --------------- - -The current development status of ZeroMQ driver is shown in `wiki`_. - -.. _wiki: https://wiki.openstack.org/ZeroMQ diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index bb75f8c9a..cea63d78f 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -28,29 +28,22 @@ CONF.register_opts(server._pool_opts) CONF.rpc_zmq_native = True -USAGE = """ Usage: ./zmq-proxy.py --type {PUBLISHER,ROUTER} [-h] [] ... +USAGE = """ Usage: ./zmq-proxy.py [-h] [] ... Usage example: - python oslo_messaging/_cmd/zmq-proxy.py\ - --type PUBLISHER""" - - -PUBLISHER = 'PUBLISHER' -ROUTER = 'ROUTER' -PROXY_TYPES = (PUBLISHER, ROUTER) + python oslo_messaging/_cmd/zmq-proxy.py""" def main(): - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(name)s ' + '%(levelname)-8s %(message)s') parser = argparse.ArgumentParser( description='ZeroMQ proxy service', usage=USAGE ) - parser.add_argument('--type', dest='proxy_type', type=str, - default=PUBLISHER, - help='Proxy type PUBLISHER or ROUTER') parser.add_argument('--config-file', dest='config_file', type=str, help='Path to configuration file') args = parser.parse_args() @@ -58,18 +51,12 @@ def main(): if args.config_file: cfg.CONF(["--config-file", args.config_file]) - if args.proxy_type not in PROXY_TYPES: - raise Exception("Bad proxy type %s, should be one of %s" % - (args.proxy_type, PROXY_TYPES)) - - reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.PublisherProxy) \ - if args.proxy_type == PUBLISHER \ - else zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.RouterProxy) + reactor = zmq_proxy.ZmqProxy(CONF, zmq_queue_proxy.UniversalQueueProxy) try: while True: reactor.run() - except KeyboardInterrupt: + except (KeyboardInterrupt, SystemExit): reactor.close() if __name__ == "__main__": diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 739525670..979148c52 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -86,7 +86,7 @@ zmq_opts = [ help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), - cfg.BoolOpt('use_router_proxy', default=False, + cfg.BoolOpt('use_router_proxy', default=True, help='Use ROUTER remote proxy for direct methods.'), cfg.PortOpt('rpc_zmq_min_port', diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 7afb94f73..6f14651ef 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -12,11 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. -import abc import logging -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher_proxy from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_pub_publisher from oslo_messaging._drivers.zmq_driver import zmq_address @@ -47,45 +44,6 @@ class UniversalQueueProxy(object): self.router_address = zmq_address.combine_address( self.conf.rpc_zmq_host, self.router_socket.port) - def run(self): - message, socket = self.poller.poll(self.conf.rpc_poll_timeout) - if message is None: - return - - if socket == self.router_socket.handle: - self._redirect_in_request(message) - else: - self._redirect_reply(message) - - @abc.abstractmethod - def _redirect_in_request(self, multipart_message): - """Redirect incoming request to a publisher.""" - - @abc.abstractmethod - def _redirect_reply(self, multipart_message): - """Redirect reply to client. Implement in a concrete proxy.""" - - def _receive_in_request(self, socket): - reply_id = socket.recv() - assert reply_id is not None, "Valid id expected" - empty = socket.recv() - assert empty == b'', "Empty delimiter expected" - envelope = socket.recv_pyobj() - envelope.reply_id = reply_id - payload = socket.recv_multipart() - payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) - return payload - - def cleanup(self): - self.router_socket.close() - - -class PublisherProxy(UniversalQueueProxy): - - def __init__(self, conf, context, matchmaker): - super(PublisherProxy, self).__init__(conf, context, matchmaker) - LOG.info(_LI("Polling at PUBLISHER proxy")) - self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( conf, matchmaker) @@ -95,53 +53,41 @@ class PublisherProxy(UniversalQueueProxy): {"pub": self.pub_publisher.host, "router": self.router_address}) - def _redirect_in_request(self, multipart_message): - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] + def run(self): + message, socket = self.poller.poll(self.conf.rpc_poll_timeout) + if message is None: + return + + envelope = message[zmq_names.MULTIPART_IDX_ENVELOPE] if self.conf.use_pub_sub and envelope.is_mult_send: LOG.debug("-> Redirecting request %s to TCP publisher", envelope) - self.pub_publisher.send_request(multipart_message) + self.pub_publisher.send_request(message) + elif not envelope.is_mult_send: + self._redirect_message(message) - def _redirect_reply(self, multipart_message): - """No reply is possible for publisher.""" + @staticmethod + def _receive_in_request(socket): + reply_id = socket.recv() + assert reply_id is not None, "Valid id expected" + empty = socket.recv() + assert empty == b'', "Empty delimiter expected" + envelope = socket.recv_pyobj() + payload = socket.recv_multipart() + payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) + return payload - def cleanup(self): - super(PublisherProxy, self).cleanup() - self.pub_publisher.cleanup() - self.matchmaker.unregister_publisher( - (self.pub_publisher.host, self.router_address)) - - -class RouterProxy(UniversalQueueProxy): - - def __init__(self, conf, context, matchmaker): - super(RouterProxy, self).__init__(conf, context, matchmaker) - LOG.info(_LI("Polling at ROUTER proxy")) - - self.dealer_publisher \ - = zmq_dealer_publisher_proxy.DealerPublisherProxy( - conf, matchmaker, self.poller) - - self.matchmaker.register_router(self.router_address) - LOG.info(_LI("ROUTER:%(router)s] Run ROUTER publisher"), - {"router": self.router_address}) - - def _redirect_in_request(self, multipart_message): + def _redirect_message(self, multipart_message): envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - LOG.debug("-> Redirecting request %s to TCP publisher", envelope) - if not envelope.is_mult_send: - self.dealer_publisher.send_request(multipart_message) - - def _redirect_reply(self, multipart_message): - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - LOG.debug("<- Redirecting reply: %s", envelope) + LOG.debug("<-> Route message: %s", envelope) response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY] - self.router_socket.send(envelope.reply_id, zmq.SNDMORE) + self.router_socket.send(envelope.routing_key, zmq.SNDMORE) self.router_socket.send(b'', zmq.SNDMORE) self.router_socket.send_pyobj(envelope, zmq.SNDMORE) self.router_socket.send(response_binary) def cleanup(self): - super(RouterProxy, self).cleanup() - self.dealer_publisher.cleanup() - self.matchmaker.unregister_router(self.router_address) + self.router_socket.close() + self.pub_publisher.cleanup() + self.matchmaker.unregister_publisher( + (self.pub_publisher.host, self.router_address)) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index 19e9d65f6..a8f2d71f8 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -13,17 +13,18 @@ # under the License. import logging -import threading from concurrent import futures import futurist import oslo_messaging from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_reply_waiter from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._i18n import _LW +from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -37,43 +38,33 @@ class DealerCallPublisher(object): instead of ReqPublisher. """ - def __init__(self, conf, matchmaker): + def __init__(self, conf, matchmaker, sockets_manager, sender=None, + reply_waiter=None): super(DealerCallPublisher, self).__init__() self.conf = conf self.matchmaker = matchmaker - self.reply_waiter = ReplyWaiter(conf) - self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) - - def _do_send_request(socket, request): - target_hosts = self.sockets_manager.get_hosts(request.target) - envelope = request.create_envelope(target_hosts) - # DEALER socket specific envelope empty delimiter - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) - socket.send_pyobj(request) - - LOG.debug("Sent message_id %(message)s to a target %(target)s", - {"message": request.message_id, - "target": request.target}) - - self.sender = CallSender(self.sockets_manager, _do_send_request, - self.reply_waiter) \ - if not conf.use_router_proxy else \ - CallSenderLight(self.sockets_manager, _do_send_request, - self.reply_waiter) + self.reply_waiter = reply_waiter or zmq_reply_waiter.ReplyWaiter(conf) + self.sockets_manager = sockets_manager + self.sender = sender or CallSender(self.sockets_manager, + self.reply_waiter) def send_request(self, request): reply_future = self.sender.send_request(request) try: reply = reply_future.result(timeout=request.timeout) + LOG.debug("Received reply %s", request.message_id) + except AssertionError: + LOG.error(_LE("Message format error in reply %s"), + request.message_id) + return None except futures.TimeoutError: raise oslo_messaging.MessagingTimeout( - "Timeout %s seconds was reached" % request.timeout) + "Timeout %(tout)s seconds was reached for message %(id)s" % + {"tout": request.timeout, + "id": request.message_id}) finally: self.reply_waiter.untrack_id(request.message_id) - LOG.debug("Received reply %s", reply) if reply.failure: raise rpc_common.deserialize_remote_exception( reply.failure, @@ -88,11 +79,23 @@ class DealerCallPublisher(object): class CallSender(zmq_publisher_base.QueuedSender): - def __init__(self, sockets_manager, _do_send_request, reply_waiter): - super(CallSender, self).__init__(sockets_manager, _do_send_request) + def __init__(self, sockets_manager, reply_waiter): + super(CallSender, self).__init__(sockets_manager, + self._do_send_request) assert reply_waiter, "Valid ReplyWaiter expected!" self.reply_waiter = reply_waiter + def _do_send_request(self, socket, request): + envelope = request.create_envelope() + # DEALER socket specific envelope empty delimiter + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(envelope, zmq.SNDMORE) + socket.send_pyobj(request) + + LOG.debug("Sent message_id %(message)s to a target %(target)s", + {"message": request.message_id, + "target": request.target}) + def send_request(self, request): reply_future = futurist.Future() self.reply_waiter.track_reply(reply_future, request.message_id) @@ -103,61 +106,3 @@ class CallSender(zmq_publisher_base.QueuedSender): socket = self.outbound_sockets.get_socket(target) self.reply_waiter.poll_socket(socket) return socket - - -class CallSenderLight(CallSender): - - def __init__(self, sockets_manager, _do_send_request, reply_waiter): - super(CallSenderLight, self).__init__( - sockets_manager, _do_send_request, reply_waiter) - self.socket = self.outbound_sockets.get_socket_to_routers() - self.reply_waiter.poll_socket(self.socket) - - def _connect_socket(self, target): - return self.socket - - -class ReplyWaiter(object): - - def __init__(self, conf): - self.conf = conf - self.replies = {} - self.poller = zmq_async.get_poller() - self.executor = zmq_async.get_executor(self.run_loop) - self.executor.execute() - self._lock = threading.Lock() - - def track_reply(self, reply_future, message_id): - with self._lock: - self.replies[message_id] = reply_future - - def untrack_id(self, message_id): - with self._lock: - self.replies.pop(message_id) - - def poll_socket(self, socket): - - def _receive_method(socket): - empty = socket.recv() - assert empty == b'', "Empty expected!" - envelope = socket.recv_pyobj() - assert envelope is not None, "Invalid envelope!" - reply = socket.recv_pyobj() - LOG.debug("Received reply %s", reply) - return reply - - self.poller.register(socket, recv_method=_receive_method) - - def run_loop(self): - reply, socket = self.poller.poll( - timeout=self.conf.rpc_poll_timeout) - if reply is not None: - call_future = self.replies.get(reply.message_id) - if call_future: - call_future.set_result(reply) - else: - LOG.warning(_LW("Received timed out reply: %s"), - reply.message_id) - - def cleanup(self): - self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index caec2d0ab..0cd13ff07 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -89,32 +89,3 @@ class DealerPublisherAsync(object): def cleanup(self): self.sockets_manager.cleanup() - - -class DealerPublisherLight(object): - """Used when publishing to a proxy. """ - - def __init__(self, conf, matchmaker): - self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) - self.socket = self.sockets_manager.get_socket_to_publishers() - - def send_request(self, request): - if request.msg_type == zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern( - request.msg_type) - - envelope = request.create_envelope() - - self.socket.send(b'', zmq.SNDMORE) - self.socket.send_pyobj(envelope, zmq.SNDMORE) - self.socket.send_pyobj(request) - - LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " - "a target %(target)s", - {"message": request.message_id, - "target": request.target, - "addr": list(self.socket.connections)}) - - def cleanup(self): - self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index a4c81516f..9bd7118fe 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -13,7 +13,12 @@ # under the License. import logging +import time +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_call_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_reply_waiter from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async @@ -25,43 +30,109 @@ LOG = logging.getLogger(__name__) class DealerPublisherProxy(object): + """Used when publishing to a proxy. """ - def __init__(self, conf, matchmaker, poller): - super(DealerPublisherProxy, self).__init__() - self.conf = conf - self.matchmaker = matchmaker - self.poller = poller + def __init__(self, conf, matchmaker, socket_to_proxy): self.sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) + self.socket = socket_to_proxy + self.routing_table = RoutingTable(conf, matchmaker) - def send_request(self, multipart_message): - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - if envelope.is_mult_send: - raise zmq_publisher_base.UnsupportedSendPattern(envelope.msg_type) - if not envelope.target_hosts: - raise Exception("Target hosts are expected!") + def send_request(self, request): + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern( + request.msg_type) - dealer_socket = self.sockets_manager.get_socket_to_hosts( - envelope.target, envelope.target_hosts) - self.poller.register(dealer_socket.handle, self.receive_reply) + envelope = request.create_envelope( + routing_key=self.routing_table.get_routable_host(request.target) + if request.msg_type in zmq_names.DIRECT_TYPES else None) - LOG.debug("Sending message %(message)s to a target %(target)s" - % {"message": envelope.message_id, - "target": envelope.target}) + self.socket.send(b'', zmq.SNDMORE) + self.socket.send_pyobj(envelope, zmq.SNDMORE) + self.socket.send_pyobj(request) - # Empty delimiter - DEALER socket specific - dealer_socket.send(b'', zmq.SNDMORE) - dealer_socket.send_pyobj(envelope, zmq.SNDMORE) - dealer_socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) - - def receive_reply(self, socket): - empty = socket.recv() - assert empty == b'', "Empty expected!" - envelope = socket.recv_pyobj() - assert envelope is not None, "Invalid envelope!" - reply = socket.recv() - LOG.debug("Received reply %s", envelope) - return [envelope, reply] + LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " + "a target %(target)s", + {"message": request.message_id, + "target": request.target, + "addr": list(self.socket.connections)}) def cleanup(self): - self.sockets_manager.cleanup() + self.socket.close() + + +class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher): + + def __init__(self, conf, matchmaker, sockets_manager): + reply_waiter = zmq_reply_waiter.ReplyWaiter(conf) + sender = CallSenderProxy(conf, matchmaker, sockets_manager, + reply_waiter) + super(DealerCallPublisherProxy, self).__init__( + conf, matchmaker, sockets_manager, sender, reply_waiter) + + +class CallSenderProxy(zmq_dealer_call_publisher.CallSender): + + def __init__(self, conf, matchmaker, sockets_manager, reply_waiter): + super(CallSenderProxy, self).__init__( + sockets_manager, reply_waiter) + self.socket = self.outbound_sockets.get_socket_to_publishers() + self.reply_waiter.poll_socket(self.socket) + self.routing_table = RoutingTable(conf, matchmaker) + + def _connect_socket(self, target): + return self.socket + + def _do_send_request(self, socket, request): + envelope = request.create_envelope( + routing_key=self.routing_table.get_routable_host(request.target), + reply_id=self.socket.handle.identity) + # DEALER socket specific envelope empty delimiter + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(envelope, zmq.SNDMORE) + socket.send_pyobj(request) + + LOG.debug("Sent message_id %(message)s to a target %(target)s", + {"message": request.message_id, + "target": request.target}) + + +class RoutingTable(object): + """This class implements local routing-table cache + taken from matchmaker. Its purpose is to give the next routable + host id (remote DEALER's id) by request for specific target in + round-robin fashion. + """ + + def __init__(self, conf, matchmaker): + self.conf = conf + self.matchmaker = matchmaker + self.routing_table = {} + self.routable_hosts = {} + + def get_routable_host(self, target): + self._update_routing_table(target) + hosts_for_target = self.routable_hosts[str(target)] + host = hosts_for_target.pop(0) + if not hosts_for_target: + self._renew_routable_hosts(target) + return host + + def _is_tm_expired(self, tm): + return 0 <= self.conf.zmq_target_expire <= time.time() - tm + + def _update_routing_table(self, target): + routing_record = self.routing_table.get(str(target)) + if routing_record is None: + self._fetch_hosts(target) + self._renew_routable_hosts(target) + elif self._is_tm_expired(routing_record[1]): + self._fetch_hosts(target) + + def _fetch_hosts(self, target): + self.routing_table[str(target)] = (self.matchmaker.get_hosts( + target, zmq_names.socket_type_str(zmq.DEALER)), time.time()) + + def _renew_routable_hosts(self, target): + hosts, _ = self.routing_table[str(target)] + self.routable_hosts[str(target)] = list(hosts) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py new file mode 100644 index 000000000..b7d8a7b76 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py @@ -0,0 +1,69 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import threading + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _LW + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ReplyWaiter(object): + + def __init__(self, conf): + self.conf = conf + self.replies = {} + self.poller = zmq_async.get_poller() + self.executor = zmq_async.get_executor(self.run_loop) + self.executor.execute() + self._lock = threading.Lock() + + def track_reply(self, reply_future, message_id): + with self._lock: + self.replies[message_id] = reply_future + + def untrack_id(self, message_id): + with self._lock: + self.replies.pop(message_id) + + def poll_socket(self, socket): + + def _receive_method(socket): + empty = socket.recv() + assert empty == b'', "Empty expected!" + envelope = socket.recv_pyobj() + assert envelope is not None, "Invalid envelope!" + reply = socket.recv_pyobj() + LOG.debug("Received reply %s", envelope) + return reply + + self.poller.register(socket, recv_method=_receive_method) + + def run_loop(self): + reply, socket = self.poller.poll( + timeout=self.conf.rpc_poll_timeout) + if reply is not None: + call_future = self.replies.get(reply.message_id) + if call_future: + call_future.set_result(reply) + else: + LOG.warning(_LW("Received timed out reply: %s"), + reply.message_id) + + def cleanup(self): + self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 890a32392..4054194c5 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -52,8 +52,6 @@ class PubPublisherProxy(object): self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, self.socket.port) - self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context) - def send_request(self, multipart_message): envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] @@ -72,42 +70,4 @@ class PubPublisherProxy(object): "topic": topic_filter}) def cleanup(self): - self.matchmaker.unregister_publisher( - (self.host, self.sync_channel.sync_host)) self.socket.close() - - -class SyncChannel(object): - """Subscribers synchronization channel - - As far as PUB/SUB is one directed way pattern we need some - backwards channel to have a possibility of subscribers - to talk back to publisher. - - May be used for heartbeats or some kind of acknowledgments etc. - """ - - def __init__(self, conf, matchmaker, context): - self.conf = conf - self.matchmaker = matchmaker - self.context = context - self._ready = None - - # NOTE(ozamiatin): May be used for heartbeats when we - # implement them - self.sync_socket = zmq_socket.ZmqRandomPortSocket( - self.conf, self.context, zmq.PULL) - self.poller = zmq_async.get_poller() - self.poller.register(self.sync_socket) - - self.sync_host = zmq_address.combine_address(self.conf.rpc_zmq_host, - self.sync_socket.port) - - def is_ready(self): - LOG.debug("[%s] Waiting for ready from first subscriber", - self.sync_host) - if self._ready is None: - self._ready = self.poller.poll() - LOG.debug("[%s] Received ready from first subscriber", - self.sync_host) - return self._ready is not None diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 7fb839fa9..a701dad91 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -109,6 +109,7 @@ class SocketsManager(object): self.socket_type = socket_type self.zmq_context = zmq.Context() self.outbound_sockets = {} + self.socket_to_publishers = None def _track_socket(self, socket, target): self.outbound_sockets[str(target)] = (socket, time.time()) @@ -152,20 +153,14 @@ class SocketsManager(object): return socket def get_socket_to_publishers(self): - socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type) + if self.socket_to_publishers is not None: + return self.socket_to_publishers + self.socket_to_publishers = zmq_socket.ZmqSocket( + self.conf, self.zmq_context, self.socket_type) publishers = self.matchmaker.get_publishers() for pub_address, router_address in publishers: - socket.connect_to_host(router_address) - return socket - - def get_socket_to_routers(self): - socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type) - routers = self.matchmaker.get_routers() - for router_address in routers: - socket.connect_to_host(router_address) - return socket + self.socket_to_publishers.connect_to_host(router_address) + return self.socket_to_publishers def cleanup(self): for socket, tm in self.outbound_sockets.values(): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index a413d19b1..9f01eb23f 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -17,6 +17,10 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_call_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher_proxy +from oslo_messaging._drivers.zmq_driver.client.publishers \ + import zmq_publisher_base from oslo_messaging._drivers.zmq_driver.client import zmq_client_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -28,23 +32,31 @@ class ZmqClient(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): + self.sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + default_publisher = zmq_dealer_publisher.DealerPublisher( conf, matchmaker) - cast_publisher = zmq_dealer_publisher.DealerPublisherAsync( - conf, matchmaker) \ - if zmq_async.is_eventlet_concurrency(conf) \ - else default_publisher + publisher_to_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy( + conf, matchmaker, self.sockets_manager.get_socket_to_publishers()) - fanout_publisher = zmq_dealer_publisher.DealerPublisherLight( - conf, matchmaker) if conf.use_pub_sub else default_publisher + call_publisher = zmq_dealer_publisher_proxy.DealerCallPublisherProxy( + conf, matchmaker, self.sockets_manager) if conf.use_router_proxy \ + else zmq_dealer_call_publisher.DealerCallPublisher( + conf, matchmaker, self.sockets_manager) + + cast_publisher = publisher_to_proxy if conf.use_router_proxy \ + else zmq_dealer_publisher.DealerPublisherAsync( + conf, matchmaker) + + fanout_publisher = publisher_to_proxy \ + if conf.use_pub_sub else default_publisher super(ZmqClient, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ - zmq_names.CALL_TYPE: - zmq_dealer_call_publisher.DealerCallPublisher( - conf, matchmaker), + zmq_names.CALL_TYPE: call_publisher, zmq_names.CAST_TYPE: cast_publisher, diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py index 68ef29189..d1913b430 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_envelope.py @@ -19,12 +19,12 @@ from oslo_messaging._drivers.zmq_driver import zmq_names class Envelope(object): def __init__(self, msg_type=None, message_id=None, target=None, - target_hosts=None, **kwargs): + routing_key=None, **kwargs): self._msg_type = msg_type self._message_id = message_id self._target = target - self._target_hosts = target_hosts self._reply_id = None + self._routing_key = routing_key self._kwargs = kwargs @property @@ -35,10 +35,22 @@ class Envelope(object): def reply_id(self, value): self._reply_id = value + @property + def routing_key(self): + return self._routing_key + + @routing_key.setter + def routing_key(self, value): + self._routing_key = value + @property def msg_type(self): return self._msg_type + @msg_type.setter + def msg_type(self, value): + self._msg_type = value + @property def message_id(self): return self._message_id @@ -47,10 +59,6 @@ class Envelope(object): def target(self): return self._target - @property - def target_hosts(self): - return self._target_hosts - @property def is_mult_send(self): return self._msg_type in zmq_names.MULTISEND_TYPES @@ -72,7 +80,7 @@ class Envelope(object): envelope = {zmq_names.FIELD_MSG_TYPE: self._msg_type, zmq_names.FIELD_MSG_ID: self._message_id, zmq_names.FIELD_TARGET: self._target, - zmq_names.FIELD_TARGET_HOSTS: self._target_hosts} + zmq_names.FIELD_ROUTING_KEY: self._routing_key} envelope.update({k: v for k, v in self._kwargs.items() if v is not None}) return envelope diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index 372a0c9e1..3f3ea0ff7 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -70,11 +70,12 @@ class Request(object): self.message_id = str(uuid.uuid1()) - def create_envelope(self, hosts=None): + def create_envelope(self, routing_key=None, reply_id=None): envelope = zmq_envelope.Envelope(msg_type=self.msg_type, message_id=self.message_id, target=self.target, - target_hosts=hosts) + routing_key=routing_key) + envelope.reply_id = reply_id return envelope @abc.abstractproperty @@ -114,8 +115,9 @@ class CallRequest(RpcRequest): super(CallRequest, self).__init__(*args, **kwargs) - def create_envelope(self, hosts=None): - envelope = super(CallRequest, self).create_envelope(hosts) + def create_envelope(self, routing_key=None, reply_id=None): + envelope = super(CallRequest, self).create_envelope( + routing_key, reply_id) envelope.set('timeout', self.timeout) return envelope diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index b463a8b5e..74325a227 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -45,13 +45,13 @@ matchmaker_redis_opts = [ default='oslo-messaging-zeromq', help='Redis replica set name.'), cfg.IntOpt('wait_timeout', - default=500, + default=5000, help='Time in ms to wait between connection attempts.'), cfg.IntOpt('check_timeout', - default=20000, + default=60000, help='Time in ms to wait before the transaction is killed.'), cfg.IntOpt('socket_timeout', - default=1000, + default=10000, help='Timeout in ms on blocking socket operations'), ] diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py new file mode 100644 index 000000000..8fd7b5622 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -0,0 +1,123 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers import base +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver.client import zmq_response +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_consumer_base +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class DealerIncomingMessage(base.RpcIncomingMessage): + + def __init__(self, context, message, msg_id): + super(DealerIncomingMessage, self).__init__(context, message) + self.msg_id = msg_id + + def reply(self, reply=None, failure=None, log_failure=True): + """Reply is not needed for non-call messages""" + + def acknowledge(self): + LOG.debug("Not sending acknowledge for %s", self.msg_id) + + def requeue(self): + """Requeue is not supported""" + + +class DealerIncomingRequest(base.RpcIncomingMessage): + + def __init__(self, socket, request, envelope): + super(DealerIncomingRequest, self).__init__(request.context, + request.message) + self.reply_socket = socket + self.request = request + self.envelope = envelope + + def reply(self, reply=None, failure=None, log_failure=True): + if failure is not None: + failure = rpc_common.serialize_remote_exception(failure, + log_failure) + response = zmq_response.Response(type=zmq_names.REPLY_TYPE, + message_id=self.request.message_id, + reply_id=self.envelope.reply_id, + reply_body=reply, + failure=failure, + log_failure=log_failure) + + LOG.debug("Replying %s", (str(self.request.message_id))) + + self.envelope.routing_key = self.envelope.reply_id + self.envelope.msg_type = zmq_names.REPLY_TYPE + + self.reply_socket.send(b'', zmq.SNDMORE) + self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE) + self.reply_socket.send_pyobj(response) + + def requeue(self): + """Requeue is not supported""" + + +class DealerConsumer(zmq_consumer_base.ConsumerBase): + + def __init__(self, conf, poller, server): + super(DealerConsumer, self).__init__(conf, poller, server) + self.matchmaker = server.matchmaker + self.target = server.target + self.sockets_manager = zmq_publisher_base.SocketsManager( + conf, self.matchmaker, zmq.ROUTER, zmq.DEALER) + self.socket = self.sockets_manager.get_socket_to_publishers() + self.poller.register(self.socket, self.receive_message) + self.host = self.socket.handle.identity + self.target_updater = zmq_consumer_base.TargetUpdater( + conf, self.matchmaker, self.target, self.host, + zmq.DEALER) + LOG.info(_LI("[%s] Run DEALER consumer"), self.host) + + def _receive_request(self, socket): + empty = socket.recv() + assert empty == b'', 'Bad format: empty delimiter expected' + envelope = socket.recv_pyobj() + request = socket.recv_pyobj() + return request, envelope + + def receive_message(self, socket): + try: + request, envelope = self._receive_request(socket) + LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", + {"host": self.host, + "type": request.msg_type, + "id": request.message_id, + "target": request.target}) + + if request.msg_type == zmq_names.CALL_TYPE: + return DealerIncomingRequest(socket, request, envelope) + elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: + return DealerIncomingMessage(request.context, request.message, + request.message_id) + else: + LOG.error(_LE("Unknown message type: %s"), request.msg_type) + + except (zmq.ZMQError, AssertionError) as e: + LOG.error(_LE("Receiving message failure: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index 611d854aa..a69602982 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -65,5 +65,5 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): else: LOG.error(_LE("Unknown message type: %s"), msg_type) - except zmq.ZMQError as e: + except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index c8a0f58e6..8eb3dad7a 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -80,5 +80,5 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): else: LOG.error(_LE("Unknown message type: %s"), request.msg_type) - except zmq.ZMQError as e: + except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 3baffe685..32c1ec5cd 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -55,7 +55,6 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): super(SubConsumer, self).__init__(conf, poller, server) self.matchmaker = server.matchmaker self.target = server.target - self.subscriptions = set() self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB) self.sockets.append(self.socket) self.id = uuid.uuid4() @@ -75,13 +74,10 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): topic_filter = zmq_address.target_to_subscribe_filter(target) if target.topic: self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic)) - self.subscriptions.add(six.b(target.topic)) if target.server: self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server)) - self.subscriptions.add(six.b(target.server)) if target.topic and target.server: self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter) - self.subscriptions.add(topic_filter) LOG.debug("[%(host)s] Subscribing to topic %(filter)s", {"host": self.id, "filter": topic_filter}) @@ -90,7 +86,6 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): topic_filter = socket.recv() LOG.debug("[%(id)s] Received %(topic_filter)s topic", {'id': self.id, 'topic_filter': topic_filter}) - assert topic_filter in self.subscriptions request = socket.recv_pyobj() return request @@ -108,7 +103,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): LOG.error(_LE("Unknown message type: %s"), request.msg_type) else: return SubIncomingMessage(request, socket) - except zmq.ZMQError as e: + except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) def cleanup(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 17eecfff2..254c6e5ed 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -16,6 +16,8 @@ import copy import logging from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_dealer_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_router_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ @@ -37,12 +39,19 @@ class ZmqServer(base.PollStyleListener): self.matchmaker = matchmaker self.target = target self.poller = poller or zmq_async.get_poller() + self.router_consumer = zmq_router_consumer.RouterConsumer( - conf, self.poller, self) + conf, self.poller, self) if not conf.use_router_proxy else None + self.dealer_consumer = zmq_dealer_consumer.DealerConsumer( + conf, self.poller, self) if conf.use_router_proxy else None self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None - self.consumers = [self.router_consumer] + self.consumers = [] + if self.router_consumer: + self.consumers.append(self.router_consumer) + if self.dealer_consumer: + self.consumers.append(self.dealer_consumer) if self.sub_consumer: self.consumers.append(self.sub_consumer) @@ -53,9 +62,10 @@ class ZmqServer(base.PollStyleListener): return message def stop(self): - consumer = self.router_consumer - LOG.info(_LI("Stop server %(address)s:%(port)s"), - {'address': consumer.address, 'port': consumer.port}) + if self.router_consumer: + LOG.info(_LI("Stop server %(address)s:%(port)s"), + {'address': self.router_consumer.address, + 'port': self.router_consumer.port}) def cleanup(self): self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 1187a1f3e..796e94d2c 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -26,7 +26,7 @@ FIELD_MSG_ID = 'message_id' FIELD_MSG_TYPE = 'msg_type' FIELD_REPLY_ID = 'reply_id' FIELD_TARGET = 'target' -FIELD_TARGET_HOSTS = 'target_hosts' +FIELD_ROUTING_KEY = 'routing_key' IDX_REPLY_TYPE = 1 diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py index 7237fe25f..94d64b4ec 100644 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py @@ -12,12 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. +from fixtures._fixtures import timeout +import retrying from stevedore import driver import testscenarios import testtools -import retrying - import oslo_messaging from oslo_messaging.tests import utils as test_utils from oslo_utils import importutils @@ -97,6 +97,6 @@ class TestImplMatchmaker(test_utils.BaseTestCase): hosts = [] try: hosts = self.test_matcher.get_hosts(target, "test") - except retrying.RetryError: + except (timeout.TimeoutException, retrying.RetryError): pass self.assertEqual(hosts, []) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py index 6f114ae4a..ea287b3b4 100644 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ b/oslo_messaging/tests/functional/zmq/test_startup.py @@ -30,8 +30,7 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): self.conf.prog = "test_prog" self.conf.project = "test_project" - kwargs = {'rpc_response_timeout': 30, - 'use_pub_sub': False} + kwargs = {'rpc_response_timeout': 30} self.config(**kwargs) log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log" diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index 816c01637..df9629f30 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -23,7 +23,6 @@ EOF redis-server --port $ZMQ_REDIS_PORT & -oslo-messaging-zmq-proxy --type PUBLISHER --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-publisher.log 2>&1 & -oslo-messaging-zmq-proxy --type ROUTER --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-router.log 2>&1 & +oslo-messaging-zmq-proxy --config-file ${DATADIR}/zmq.conf > ${DATADIR}/zmq-publisher.log 2>&1 & $*