From b1081f9e8152cf23614a58ba706a8d58122aa496 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Thu, 27 Oct 2016 15:30:42 +0300 Subject: [PATCH] [zmq] Refactor receivers Change-Id: I19e1dd05fee4323b14bbe377e5e36a37d0f815ac --- .../dealer/zmq_dealer_publisher_base.py | 10 +- .../dealer/zmq_dealer_publisher_direct.py | 9 +- .../dealer/zmq_dealer_publisher_proxy.py | 18 +- .../client/publishers/zmq_publisher_base.py | 8 +- .../zmq_driver/client/zmq_ack_manager.py | 19 +- .../zmq_driver/client/zmq_receivers.py | 175 ++++++++---------- .../_drivers/zmq_driver/client/zmq_request.py | 4 +- .../zmq_driver/client/zmq_response.py | 6 +- .../_drivers/zmq_driver/client/zmq_senders.py | 22 ++- 9 files changed, 128 insertions(+), 143 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py index 26e0c395d..1493aeb1e 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py @@ -21,7 +21,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers \ from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -38,7 +37,7 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase): super(DealerPublisherBase, self).__init__( sockets_manager, sender, receiver) - def _check_received_data(self, reply_id, reply, request): + def _check_reply(self, reply, request): assert isinstance(reply, zmq_response.Reply), "Reply expected!" def _finally_unregister(self, socket, request): @@ -46,12 +45,11 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase): def receive_reply(self, socket, request): self.receiver.register_socket(socket) - reply_future = \ - self.receiver.track_request(request)[zmq_names.REPLY_TYPE] + _, reply_future = self.receiver.track_request(request) try: - reply_id, reply = reply_future.result(timeout=request.timeout) - self._check_received_data(reply_id, reply, request) + reply = reply_future.result(timeout=request.timeout) + self._check_reply(reply, request) except AssertionError: LOG.error(_LE("Message format error in reply for %s"), request.message_id) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index 15c7b8900..4278ee3c9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -53,12 +53,9 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): def __init__(self, conf, matchmaker): sender = zmq_senders.RequestSenderDirect(conf) - if conf.oslo_messaging_zmq.rpc_use_acks: - receiver = zmq_receivers.AckAndReplyReceiverDirect(conf) - else: - receiver = zmq_receivers.ReplyReceiverDirect(conf) - super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender, - receiver) + receiver = zmq_receivers.ReceiverDirect(conf) + super(DealerPublisherDirect, self).__init__(conf, matchmaker, + sender, receiver) self.routing_table = zmq_routing_table.RoutingTableAdaptor( conf, matchmaker, zmq.ROUTER) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index ff15dcf09..b07283508 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -39,12 +39,9 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): def __init__(self, conf, matchmaker): sender = zmq_senders.RequestSenderProxy(conf) - if conf.oslo_messaging_zmq.rpc_use_acks: - receiver = zmq_receivers.AckAndReplyReceiverProxy(conf) - else: - receiver = zmq_receivers.ReplyReceiverProxy(conf) - super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender, - receiver) + receiver = zmq_receivers.ReceiverProxy(conf) + super(DealerPublisherProxy, self).__init__(conf, matchmaker, + sender, receiver) self.socket = self.sockets_manager.get_socket_to_publishers( self._generate_identity()) @@ -57,10 +54,9 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + "/" + str(uuid.uuid4())) - def _check_received_data(self, reply_id, reply, request): - super(DealerPublisherProxy, self)._check_received_data(reply_id, reply, - request) - assert reply_id == request.routing_key, \ + def _check_reply(self, reply, request): + super(DealerPublisherProxy, self)._check_reply(reply, request) + assert reply.reply_id == request.routing_key, \ "Reply from recipient expected!" def _get_routing_keys(self, request): @@ -100,7 +96,7 @@ class DealerPublisherProxyDynamic( def __init__(self, conf, matchmaker): sender = zmq_senders.RequestSenderProxy(conf) - receiver = zmq_receivers.ReplyReceiverDirect(conf) + receiver = zmq_receivers.ReceiverDirect(conf) super(DealerPublisherProxyDynamic, self).__init__(conf, matchmaker, sender, receiver) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index edfe024c6..a05ff70ae 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -44,10 +44,10 @@ class PublisherBase(object): :param sockets_manager: sockets manager object :type sockets_manager: zmq_sockets_manager.SocketsManager - :param senders: request sender object - :type senders: zmq_senders.RequestSender - :param receiver: reply receiver object - :type receiver: zmq_receivers.ReplyReceiver + :param sender: request sender object + :type sender: zmq_senders.RequestSenderBase + :param receiver: response receiver object + :type receiver: zmq_receivers.ReceiverBase """ self.sockets_manager = sockets_manager self.conf = sockets_manager.conf diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py index 1852ccd29..5109e3584 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py @@ -16,6 +16,7 @@ from concurrent import futures import logging from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager +from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LW @@ -33,6 +34,13 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase): size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size ) + @staticmethod + def _check_ack(ack, request): + if ack is not None: + assert isinstance(ack, zmq_response.Ack), "Ack expected!" + assert ack.reply_id == request.routing_key, \ + "Ack from recipient expected!" + def _wait_for_ack(self, request, ack_future=None): if ack_future is None: ack_future = self._schedule_request_for_ack(request) @@ -46,12 +54,9 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase): done = ack_future is None while not done: try: - reply_id, response = ack_future.result(timeout=timeout) + ack = ack_future.result(timeout=timeout) done = True - assert response is None, "Ack expected!" - if reply_id is not None: - assert reply_id == request.routing_key, \ - "Ack from recipient expected!" + self._check_ack(ack, request) except AssertionError: LOG.error(_LE("Message format error in ack for %s"), request.message_id) @@ -85,7 +90,7 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase): if socket is None: return None self.receiver.register_socket(socket) - ack_future = self.receiver.track_request(request)[zmq_names.ACK_TYPE] + ack_future, _ = self.receiver.track_request(request) ack_future.socket = socket return ack_future @@ -98,7 +103,7 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase): return self.publisher.receive_reply(ack_future.socket, request) finally: if not ack_future.done(): - ack_future.set_result((None, None)) + ack_future.set_result(None) def send_cast(self, request): self._pool.submit(self._wait_for_ack, request) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py index 6bf4665a9..5bdfb0fa9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py @@ -22,12 +22,27 @@ import six from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() +def suppress_errors(func): + @six.wraps(func) + def silent_func(self, socket): + try: + return func(self, socket) + except Exception as e: + LOG.error(_LE("Receiving message failed: %r"), e) + # NOTE(gdavoian): drop the left parts of a broken message, since + # they most likely will lead to additional exceptions + if socket.getsockopt(zmq.RCVMORE): + socket.recv_multipart() + return silent_func + + @six.add_metaclass(abc.ABCMeta) class ReceiverBase(object): """Base response receiving interface.""" @@ -37,147 +52,117 @@ class ReceiverBase(object): self._lock = threading.Lock() self._requests = {} self._poller = zmq_async.get_poller() - self._executor = zmq_async.get_executor(method=self._run_loop) + self._executor = zmq_async.get_executor(self._run_loop) self._executor.execute() - @abc.abstractproperty - def message_types(self): - """A set of supported incoming response types.""" - def register_socket(self, socket): """Register a socket for receiving data.""" - self._poller.register(socket, recv_method=self.recv_response) + self._poller.register(socket, self.receive_response) def unregister_socket(self, socket): """Unregister a socket from receiving data.""" self._poller.unregister(socket) @abc.abstractmethod - def recv_response(self, socket): - """Receive a response and return a tuple of the form - (reply_id, message_type, message_id, response). - """ + def receive_response(self, socket): + """Receive a response (ack or reply) and return it.""" def track_request(self, request): """Track a request via already registered sockets and return - a dict of futures for monitoring all types of responses. + a pair of ack and reply futures for monitoring all possible + types of responses for the given request. """ - futures = {} message_id = request.message_id - for message_type in self.message_types: - future = self._get_future(message_id, message_type) - if future is None: - future = futurist.Future() - self._set_future(message_id, message_type, future) - futures[message_type] = future + futures = self._get_futures(message_id) + if futures is None: + ack_future = reply_future = None + if self.conf.oslo_messaging_zmq.rpc_use_acks: + ack_future = futurist.Future() + if request.msg_type == zmq_names.CALL_TYPE: + reply_future = futurist.Future() + futures = (ack_future, reply_future) + self._set_futures(message_id, futures) return futures def untrack_request(self, request): """Untrack a request and stop monitoring any responses.""" - for message_type in self.message_types: - self._pop_future(request.message_id, message_type) + self._pop_futures(request.message_id) def stop(self): self._poller.close() self._executor.stop() - def _get_future(self, message_id, message_type): + def _get_futures(self, message_id): with self._lock: - return self._requests.get((message_id, message_type)) + return self._requests.get(message_id) - def _set_future(self, message_id, message_type, future): + def _set_futures(self, message_id, futures): with self._lock: - self._requests[(message_id, message_type)] = future + self._requests[message_id] = futures - def _pop_future(self, message_id, message_type): + def _pop_futures(self, message_id): with self._lock: - return self._requests.pop((message_id, message_type), None) + return self._requests.pop(message_id, None) def _run_loop(self): - data, socket = self._poller.poll( - timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout) - if data is None: + response, socket = \ + self._poller.poll(self.conf.oslo_messaging_zmq.rpc_poll_timeout) + if response is None: return - reply_id, message_type, message_id, response = data - assert message_type in self.message_types, \ - "%s is not supported!" % zmq_names.message_type_str(message_type) - future = self._get_future(message_id, message_type) - if future is not None: + message_type, message_id = response.msg_type, response.message_id + futures = self._get_futures(message_id) + if futures is not None: + ack_future, reply_future = futures + if message_type == zmq_names.REPLY_TYPE: + reply_future.set_result(response) + else: + ack_future.set_result(response) LOG.debug("Received %(msg_type)s for %(msg_id)s", {"msg_type": zmq_names.message_type_str(message_type), "msg_id": message_id}) - future.set_result((reply_id, response)) -class ReplyReceiver(ReceiverBase): +class ReceiverProxy(ReceiverBase): - message_types = {zmq_names.REPLY_TYPE} - - -class ReplyReceiverProxy(ReplyReceiver): - - def recv_response(self, socket): + @suppress_errors + def receive_response(self, socket): empty = socket.recv() - assert empty == b'', "Empty expected!" + assert empty == b'', "Empty delimiter expected!" reply_id = socket.recv() - assert reply_id is not None, "Reply ID expected!" + assert reply_id != b'', "Valid reply id expected!" message_type = int(socket.recv()) - assert message_type == zmq_names.REPLY_TYPE, "Reply expected!" - message_id = socket.recv_string() - reply_body, failure = socket.recv_loaded() - reply = zmq_response.Reply( - message_id=message_id, reply_id=reply_id, - reply_body=reply_body, failure=failure - ) - return reply_id, message_type, message_id, reply - - -class ReplyReceiverDirect(ReplyReceiver): - - def recv_response(self, socket): - empty = socket.recv() - assert empty == b'', "Empty expected!" - raw_reply = socket.recv_loaded() - assert isinstance(raw_reply, dict), "Dict expected!" - reply = zmq_response.Reply(**raw_reply) - return reply.reply_id, reply.msg_type, reply.message_id, reply - - -class AckAndReplyReceiver(ReceiverBase): - - message_types = {zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE} - - -class AckAndReplyReceiverProxy(AckAndReplyReceiver): - - def recv_response(self, socket): - empty = socket.recv() - assert empty == b'', "Empty expected!" - reply_id = socket.recv() - assert reply_id is not None, "Reply ID expected!" - message_type = int(socket.recv()) - assert message_type in (zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE), \ - "Ack or reply expected!" + assert message_type in zmq_names.RESPONSE_TYPES, "Response expected!" message_id = socket.recv_string() + assert message_id != '', "Valid message id expected!" if message_type == zmq_names.REPLY_TYPE: reply_body, failure = socket.recv_loaded() - reply = zmq_response.Reply( - message_id=message_id, reply_id=reply_id, - reply_body=reply_body, failure=failure - ) - response = reply + reply = zmq_response.Reply(message_id=message_id, + reply_id=reply_id, + reply_body=reply_body, + failure=failure) + return reply else: - response = None - return reply_id, message_type, message_id, response + ack = zmq_response.Ack(message_id=message_id, + reply_id=reply_id) + return ack -class AckAndReplyReceiverDirect(AckAndReplyReceiver): +class ReceiverDirect(ReceiverBase): - def recv_response(self, socket): - # acks are not supported yet + @suppress_errors + def receive_response(self, socket): empty = socket.recv() - assert empty == b'', "Empty expected!" - raw_reply = socket.recv_loaded() - assert isinstance(raw_reply, dict), "Dict expected!" - reply = zmq_response.Reply(**raw_reply) - return reply.reply_id, reply.msg_type, reply.message_id, reply + assert empty == b'', "Empty delimiter expected!" + message_type = int(socket.recv()) + assert message_type in zmq_names.RESPONSE_TYPES, "Response expected!" + message_id = socket.recv_string() + assert message_id != '', "Valid message id expected!" + if message_type == zmq_names.REPLY_TYPE: + reply_body, failure = socket.recv_loaded() + reply = zmq_response.Reply(message_id=message_id, + reply_body=reply_body, + failure=failure) + return reply + else: + ack = zmq_response.Ack(message_id=message_id) + return ack diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py index c3d45e185..5d1a04495 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -56,7 +56,7 @@ class Request(object): """ if self.msg_type not in zmq_names.REQUEST_TYPES: - raise RuntimeError("Unknown message type!") + raise RuntimeError("Unknown request type!") self.target = target self.context = context @@ -71,7 +71,7 @@ class Request(object): @abc.abstractproperty def msg_type(self): - """ZMQ message type""" + """ZMQ request type""" class RpcRequest(Request): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py index ab452dee7..3da30b670 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py @@ -1,4 +1,4 @@ -# Copyright 2015 Mirantis, Inc. +# Copyright 2015-2016 Mirantis, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -23,13 +23,15 @@ from oslo_messaging._drivers.zmq_driver import zmq_names class Response(object): def __init__(self, message_id=None, reply_id=None): + if self.msg_type not in zmq_names.RESPONSE_TYPES: + raise RuntimeError("Unknown response type!") self._message_id = message_id self._reply_id = reply_id @abc.abstractproperty def msg_type(self): - pass + """ZMQ response type""" @property def message_id(self): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py index ba58d3ad0..909c8689b 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py @@ -39,19 +39,19 @@ class SenderBase(object): """Send a message via a socket in a thread-safe manner.""" -class RequestSender(SenderBase): +class RequestSenderBase(SenderBase): pass -class AckSender(SenderBase): +class AckSenderBase(SenderBase): pass -class ReplySender(SenderBase): +class ReplySenderBase(SenderBase): pass -class RequestSenderProxy(SenderBase): +class RequestSenderProxy(RequestSenderBase): def send(self, socket, request): with self._lock: @@ -72,7 +72,7 @@ class RequestSenderProxy(SenderBase): socket.send_dumped([request.context, request.message]) -class AckSenderProxy(AckSender): +class AckSenderProxy(AckSenderBase): def send(self, socket, ack): assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!" @@ -92,7 +92,7 @@ class AckSenderProxy(AckSender): socket.send_string(ack.message_id) -class ReplySenderProxy(SenderBase): +class ReplySenderProxy(ReplySenderBase): def send(self, socket, reply): assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!" @@ -113,7 +113,7 @@ class ReplySenderProxy(SenderBase): socket.send_dumped([reply.reply_body, reply.failure]) -class RequestSenderDirect(SenderBase): +class RequestSenderDirect(RequestSenderBase): def send(self, socket, request): with self._lock: @@ -132,7 +132,7 @@ class RequestSenderDirect(SenderBase): socket.send_dumped([request.context, request.message]) -class AckSenderDirect(AckSender): +class AckSenderDirect(AckSenderBase): def send(self, socket, ack): assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!" @@ -148,7 +148,7 @@ class AckSenderDirect(AckSender): raise NotImplementedError() -class ReplySenderDirect(SenderBase): +class ReplySenderDirect(ReplySenderBase): def send(self, socket, reply): assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!" @@ -163,4 +163,6 @@ class ReplySenderDirect(SenderBase): def _send(self, socket, reply): socket.send(reply.reply_id, zmq.SNDMORE) socket.send(b'', zmq.SNDMORE) - socket.send_dumped(reply.to_dict()) + socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE) + socket.send_string(reply.message_id, zmq.SNDMORE) + socket.send_dumped([reply.reply_body, reply.failure])