Merge "[zmq] Refactor receivers"
This commit is contained in:
commit
2fc381816b
@ -21,7 +21,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_response
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -38,7 +37,7 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
|
||||
super(DealerPublisherBase, self).__init__(
|
||||
sockets_manager, sender, receiver)
|
||||
|
||||
def _check_received_data(self, reply_id, reply, request):
|
||||
def _check_reply(self, reply, request):
|
||||
assert isinstance(reply, zmq_response.Reply), "Reply expected!"
|
||||
|
||||
def _finally_unregister(self, socket, request):
|
||||
@ -46,12 +45,11 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase):
|
||||
|
||||
def receive_reply(self, socket, request):
|
||||
self.receiver.register_socket(socket)
|
||||
reply_future = \
|
||||
self.receiver.track_request(request)[zmq_names.REPLY_TYPE]
|
||||
_, reply_future = self.receiver.track_request(request)
|
||||
|
||||
try:
|
||||
reply_id, reply = reply_future.result(timeout=request.timeout)
|
||||
self._check_received_data(reply_id, reply, request)
|
||||
reply = reply_future.result(timeout=request.timeout)
|
||||
self._check_reply(reply, request)
|
||||
except AssertionError:
|
||||
LOG.error(_LE("Message format error in reply for %s"),
|
||||
request.message_id)
|
||||
|
@ -53,12 +53,9 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
sender = zmq_senders.RequestSenderDirect(conf)
|
||||
if conf.oslo_messaging_zmq.rpc_use_acks:
|
||||
receiver = zmq_receivers.AckAndReplyReceiverDirect(conf)
|
||||
else:
|
||||
receiver = zmq_receivers.ReplyReceiverDirect(conf)
|
||||
super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
|
||||
receiver)
|
||||
receiver = zmq_receivers.ReceiverDirect(conf)
|
||||
super(DealerPublisherDirect, self).__init__(conf, matchmaker,
|
||||
sender, receiver)
|
||||
|
||||
self.routing_table = zmq_routing_table.RoutingTableAdaptor(
|
||||
conf, matchmaker, zmq.ROUTER)
|
||||
|
@ -39,12 +39,9 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
sender = zmq_senders.RequestSenderProxy(conf)
|
||||
if conf.oslo_messaging_zmq.rpc_use_acks:
|
||||
receiver = zmq_receivers.AckAndReplyReceiverProxy(conf)
|
||||
else:
|
||||
receiver = zmq_receivers.ReplyReceiverProxy(conf)
|
||||
super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
|
||||
receiver)
|
||||
receiver = zmq_receivers.ReceiverProxy(conf)
|
||||
super(DealerPublisherProxy, self).__init__(conf, matchmaker,
|
||||
sender, receiver)
|
||||
|
||||
self.socket = self.sockets_manager.get_socket_to_publishers(
|
||||
self._generate_identity())
|
||||
@ -57,10 +54,9 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
||||
return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + "/" +
|
||||
str(uuid.uuid4()))
|
||||
|
||||
def _check_received_data(self, reply_id, reply, request):
|
||||
super(DealerPublisherProxy, self)._check_received_data(reply_id, reply,
|
||||
request)
|
||||
assert reply_id == request.routing_key, \
|
||||
def _check_reply(self, reply, request):
|
||||
super(DealerPublisherProxy, self)._check_reply(reply, request)
|
||||
assert reply.reply_id == request.routing_key, \
|
||||
"Reply from recipient expected!"
|
||||
|
||||
def _get_routing_keys(self, request):
|
||||
@ -100,7 +96,7 @@ class DealerPublisherProxyDynamic(
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
sender = zmq_senders.RequestSenderProxy(conf)
|
||||
receiver = zmq_receivers.ReplyReceiverDirect(conf)
|
||||
receiver = zmq_receivers.ReceiverDirect(conf)
|
||||
super(DealerPublisherProxyDynamic, self).__init__(conf, matchmaker,
|
||||
sender, receiver)
|
||||
|
||||
|
@ -44,10 +44,10 @@ class PublisherBase(object):
|
||||
|
||||
:param sockets_manager: sockets manager object
|
||||
:type sockets_manager: zmq_sockets_manager.SocketsManager
|
||||
:param senders: request sender object
|
||||
:type senders: zmq_senders.RequestSender
|
||||
:param receiver: reply receiver object
|
||||
:type receiver: zmq_receivers.ReplyReceiver
|
||||
:param sender: request sender object
|
||||
:type sender: zmq_senders.RequestSenderBase
|
||||
:param receiver: response receiver object
|
||||
:type receiver: zmq_receivers.ReceiverBase
|
||||
"""
|
||||
self.sockets_manager = sockets_manager
|
||||
self.conf = sockets_manager.conf
|
||||
|
@ -16,6 +16,7 @@ from concurrent import futures
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_response
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE, _LW
|
||||
@ -33,6 +34,13 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
|
||||
size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _check_ack(ack, request):
|
||||
if ack is not None:
|
||||
assert isinstance(ack, zmq_response.Ack), "Ack expected!"
|
||||
assert ack.reply_id == request.routing_key, \
|
||||
"Ack from recipient expected!"
|
||||
|
||||
def _wait_for_ack(self, request, ack_future=None):
|
||||
if ack_future is None:
|
||||
ack_future = self._schedule_request_for_ack(request)
|
||||
@ -46,12 +54,9 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
|
||||
done = ack_future is None
|
||||
while not done:
|
||||
try:
|
||||
reply_id, response = ack_future.result(timeout=timeout)
|
||||
ack = ack_future.result(timeout=timeout)
|
||||
done = True
|
||||
assert response is None, "Ack expected!"
|
||||
if reply_id is not None:
|
||||
assert reply_id == request.routing_key, \
|
||||
"Ack from recipient expected!"
|
||||
self._check_ack(ack, request)
|
||||
except AssertionError:
|
||||
LOG.error(_LE("Message format error in ack for %s"),
|
||||
request.message_id)
|
||||
@ -85,7 +90,7 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
|
||||
if socket is None:
|
||||
return None
|
||||
self.receiver.register_socket(socket)
|
||||
ack_future = self.receiver.track_request(request)[zmq_names.ACK_TYPE]
|
||||
ack_future, _ = self.receiver.track_request(request)
|
||||
ack_future.socket = socket
|
||||
return ack_future
|
||||
|
||||
@ -98,7 +103,7 @@ class AckManager(zmq_publisher_manager.PublisherManagerBase):
|
||||
return self.publisher.receive_reply(ack_future.socket, request)
|
||||
finally:
|
||||
if not ack_future.done():
|
||||
ack_future.set_result((None, None))
|
||||
ack_future.set_result(None)
|
||||
|
||||
def send_cast(self, request):
|
||||
self._pool.submit(self._wait_for_ack, request)
|
||||
|
@ -22,12 +22,27 @@ import six
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_response
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
def suppress_errors(func):
|
||||
@six.wraps(func)
|
||||
def silent_func(self, socket):
|
||||
try:
|
||||
return func(self, socket)
|
||||
except Exception as e:
|
||||
LOG.error(_LE("Receiving message failed: %r"), e)
|
||||
# NOTE(gdavoian): drop the left parts of a broken message, since
|
||||
# they most likely will lead to additional exceptions
|
||||
if socket.getsockopt(zmq.RCVMORE):
|
||||
socket.recv_multipart()
|
||||
return silent_func
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ReceiverBase(object):
|
||||
"""Base response receiving interface."""
|
||||
@ -37,147 +52,117 @@ class ReceiverBase(object):
|
||||
self._lock = threading.Lock()
|
||||
self._requests = {}
|
||||
self._poller = zmq_async.get_poller()
|
||||
self._executor = zmq_async.get_executor(method=self._run_loop)
|
||||
self._executor = zmq_async.get_executor(self._run_loop)
|
||||
self._executor.execute()
|
||||
|
||||
@abc.abstractproperty
|
||||
def message_types(self):
|
||||
"""A set of supported incoming response types."""
|
||||
|
||||
def register_socket(self, socket):
|
||||
"""Register a socket for receiving data."""
|
||||
self._poller.register(socket, recv_method=self.recv_response)
|
||||
self._poller.register(socket, self.receive_response)
|
||||
|
||||
def unregister_socket(self, socket):
|
||||
"""Unregister a socket from receiving data."""
|
||||
self._poller.unregister(socket)
|
||||
|
||||
@abc.abstractmethod
|
||||
def recv_response(self, socket):
|
||||
"""Receive a response and return a tuple of the form
|
||||
(reply_id, message_type, message_id, response).
|
||||
"""
|
||||
def receive_response(self, socket):
|
||||
"""Receive a response (ack or reply) and return it."""
|
||||
|
||||
def track_request(self, request):
|
||||
"""Track a request via already registered sockets and return
|
||||
a dict of futures for monitoring all types of responses.
|
||||
a pair of ack and reply futures for monitoring all possible
|
||||
types of responses for the given request.
|
||||
"""
|
||||
futures = {}
|
||||
message_id = request.message_id
|
||||
for message_type in self.message_types:
|
||||
future = self._get_future(message_id, message_type)
|
||||
if future is None:
|
||||
future = futurist.Future()
|
||||
self._set_future(message_id, message_type, future)
|
||||
futures[message_type] = future
|
||||
futures = self._get_futures(message_id)
|
||||
if futures is None:
|
||||
ack_future = reply_future = None
|
||||
if self.conf.oslo_messaging_zmq.rpc_use_acks:
|
||||
ack_future = futurist.Future()
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
reply_future = futurist.Future()
|
||||
futures = (ack_future, reply_future)
|
||||
self._set_futures(message_id, futures)
|
||||
return futures
|
||||
|
||||
def untrack_request(self, request):
|
||||
"""Untrack a request and stop monitoring any responses."""
|
||||
for message_type in self.message_types:
|
||||
self._pop_future(request.message_id, message_type)
|
||||
self._pop_futures(request.message_id)
|
||||
|
||||
def stop(self):
|
||||
self._poller.close()
|
||||
self._executor.stop()
|
||||
|
||||
def _get_future(self, message_id, message_type):
|
||||
def _get_futures(self, message_id):
|
||||
with self._lock:
|
||||
return self._requests.get((message_id, message_type))
|
||||
return self._requests.get(message_id)
|
||||
|
||||
def _set_future(self, message_id, message_type, future):
|
||||
def _set_futures(self, message_id, futures):
|
||||
with self._lock:
|
||||
self._requests[(message_id, message_type)] = future
|
||||
self._requests[message_id] = futures
|
||||
|
||||
def _pop_future(self, message_id, message_type):
|
||||
def _pop_futures(self, message_id):
|
||||
with self._lock:
|
||||
return self._requests.pop((message_id, message_type), None)
|
||||
return self._requests.pop(message_id, None)
|
||||
|
||||
def _run_loop(self):
|
||||
data, socket = self._poller.poll(
|
||||
timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout)
|
||||
if data is None:
|
||||
response, socket = \
|
||||
self._poller.poll(self.conf.oslo_messaging_zmq.rpc_poll_timeout)
|
||||
if response is None:
|
||||
return
|
||||
reply_id, message_type, message_id, response = data
|
||||
assert message_type in self.message_types, \
|
||||
"%s is not supported!" % zmq_names.message_type_str(message_type)
|
||||
future = self._get_future(message_id, message_type)
|
||||
if future is not None:
|
||||
message_type, message_id = response.msg_type, response.message_id
|
||||
futures = self._get_futures(message_id)
|
||||
if futures is not None:
|
||||
ack_future, reply_future = futures
|
||||
if message_type == zmq_names.REPLY_TYPE:
|
||||
reply_future.set_result(response)
|
||||
else:
|
||||
ack_future.set_result(response)
|
||||
LOG.debug("Received %(msg_type)s for %(msg_id)s",
|
||||
{"msg_type": zmq_names.message_type_str(message_type),
|
||||
"msg_id": message_id})
|
||||
future.set_result((reply_id, response))
|
||||
|
||||
|
||||
class ReplyReceiver(ReceiverBase):
|
||||
class ReceiverProxy(ReceiverBase):
|
||||
|
||||
message_types = {zmq_names.REPLY_TYPE}
|
||||
|
||||
|
||||
class ReplyReceiverProxy(ReplyReceiver):
|
||||
|
||||
def recv_response(self, socket):
|
||||
@suppress_errors
|
||||
def receive_response(self, socket):
|
||||
empty = socket.recv()
|
||||
assert empty == b'', "Empty expected!"
|
||||
assert empty == b'', "Empty delimiter expected!"
|
||||
reply_id = socket.recv()
|
||||
assert reply_id is not None, "Reply ID expected!"
|
||||
assert reply_id != b'', "Valid reply id expected!"
|
||||
message_type = int(socket.recv())
|
||||
assert message_type == zmq_names.REPLY_TYPE, "Reply expected!"
|
||||
message_id = socket.recv_string()
|
||||
reply_body, failure = socket.recv_loaded()
|
||||
reply = zmq_response.Reply(
|
||||
message_id=message_id, reply_id=reply_id,
|
||||
reply_body=reply_body, failure=failure
|
||||
)
|
||||
return reply_id, message_type, message_id, reply
|
||||
|
||||
|
||||
class ReplyReceiverDirect(ReplyReceiver):
|
||||
|
||||
def recv_response(self, socket):
|
||||
empty = socket.recv()
|
||||
assert empty == b'', "Empty expected!"
|
||||
raw_reply = socket.recv_loaded()
|
||||
assert isinstance(raw_reply, dict), "Dict expected!"
|
||||
reply = zmq_response.Reply(**raw_reply)
|
||||
return reply.reply_id, reply.msg_type, reply.message_id, reply
|
||||
|
||||
|
||||
class AckAndReplyReceiver(ReceiverBase):
|
||||
|
||||
message_types = {zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE}
|
||||
|
||||
|
||||
class AckAndReplyReceiverProxy(AckAndReplyReceiver):
|
||||
|
||||
def recv_response(self, socket):
|
||||
empty = socket.recv()
|
||||
assert empty == b'', "Empty expected!"
|
||||
reply_id = socket.recv()
|
||||
assert reply_id is not None, "Reply ID expected!"
|
||||
message_type = int(socket.recv())
|
||||
assert message_type in (zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE), \
|
||||
"Ack or reply expected!"
|
||||
assert message_type in zmq_names.RESPONSE_TYPES, "Response expected!"
|
||||
message_id = socket.recv_string()
|
||||
assert message_id != '', "Valid message id expected!"
|
||||
if message_type == zmq_names.REPLY_TYPE:
|
||||
reply_body, failure = socket.recv_loaded()
|
||||
reply = zmq_response.Reply(
|
||||
message_id=message_id, reply_id=reply_id,
|
||||
reply_body=reply_body, failure=failure
|
||||
)
|
||||
response = reply
|
||||
reply = zmq_response.Reply(message_id=message_id,
|
||||
reply_id=reply_id,
|
||||
reply_body=reply_body,
|
||||
failure=failure)
|
||||
return reply
|
||||
else:
|
||||
response = None
|
||||
return reply_id, message_type, message_id, response
|
||||
ack = zmq_response.Ack(message_id=message_id,
|
||||
reply_id=reply_id)
|
||||
return ack
|
||||
|
||||
|
||||
class AckAndReplyReceiverDirect(AckAndReplyReceiver):
|
||||
class ReceiverDirect(ReceiverBase):
|
||||
|
||||
def recv_response(self, socket):
|
||||
# acks are not supported yet
|
||||
@suppress_errors
|
||||
def receive_response(self, socket):
|
||||
empty = socket.recv()
|
||||
assert empty == b'', "Empty expected!"
|
||||
raw_reply = socket.recv_loaded()
|
||||
assert isinstance(raw_reply, dict), "Dict expected!"
|
||||
reply = zmq_response.Reply(**raw_reply)
|
||||
return reply.reply_id, reply.msg_type, reply.message_id, reply
|
||||
assert empty == b'', "Empty delimiter expected!"
|
||||
message_type = int(socket.recv())
|
||||
assert message_type in zmq_names.RESPONSE_TYPES, "Response expected!"
|
||||
message_id = socket.recv_string()
|
||||
assert message_id != '', "Valid message id expected!"
|
||||
if message_type == zmq_names.REPLY_TYPE:
|
||||
reply_body, failure = socket.recv_loaded()
|
||||
reply = zmq_response.Reply(message_id=message_id,
|
||||
reply_body=reply_body,
|
||||
failure=failure)
|
||||
return reply
|
||||
else:
|
||||
ack = zmq_response.Ack(message_id=message_id)
|
||||
return ack
|
||||
|
@ -56,7 +56,7 @@ class Request(object):
|
||||
"""
|
||||
|
||||
if self.msg_type not in zmq_names.REQUEST_TYPES:
|
||||
raise RuntimeError("Unknown message type!")
|
||||
raise RuntimeError("Unknown request type!")
|
||||
|
||||
self.target = target
|
||||
self.context = context
|
||||
@ -71,7 +71,7 @@ class Request(object):
|
||||
|
||||
@abc.abstractproperty
|
||||
def msg_type(self):
|
||||
"""ZMQ message type"""
|
||||
"""ZMQ request type"""
|
||||
|
||||
|
||||
class RpcRequest(Request):
|
||||
|
@ -1,4 +1,4 @@
|
||||
# Copyright 2015 Mirantis, Inc.
|
||||
# Copyright 2015-2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -23,13 +23,15 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
class Response(object):
|
||||
|
||||
def __init__(self, message_id=None, reply_id=None):
|
||||
if self.msg_type not in zmq_names.RESPONSE_TYPES:
|
||||
raise RuntimeError("Unknown response type!")
|
||||
|
||||
self._message_id = message_id
|
||||
self._reply_id = reply_id
|
||||
|
||||
@abc.abstractproperty
|
||||
def msg_type(self):
|
||||
pass
|
||||
"""ZMQ response type"""
|
||||
|
||||
@property
|
||||
def message_id(self):
|
||||
|
@ -39,19 +39,19 @@ class SenderBase(object):
|
||||
"""Send a message via a socket in a thread-safe manner."""
|
||||
|
||||
|
||||
class RequestSender(SenderBase):
|
||||
class RequestSenderBase(SenderBase):
|
||||
pass
|
||||
|
||||
|
||||
class AckSender(SenderBase):
|
||||
class AckSenderBase(SenderBase):
|
||||
pass
|
||||
|
||||
|
||||
class ReplySender(SenderBase):
|
||||
class ReplySenderBase(SenderBase):
|
||||
pass
|
||||
|
||||
|
||||
class RequestSenderProxy(SenderBase):
|
||||
class RequestSenderProxy(RequestSenderBase):
|
||||
|
||||
def send(self, socket, request):
|
||||
with self._lock:
|
||||
@ -72,7 +72,7 @@ class RequestSenderProxy(SenderBase):
|
||||
socket.send_dumped([request.context, request.message])
|
||||
|
||||
|
||||
class AckSenderProxy(AckSender):
|
||||
class AckSenderProxy(AckSenderBase):
|
||||
|
||||
def send(self, socket, ack):
|
||||
assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!"
|
||||
@ -92,7 +92,7 @@ class AckSenderProxy(AckSender):
|
||||
socket.send_string(ack.message_id)
|
||||
|
||||
|
||||
class ReplySenderProxy(SenderBase):
|
||||
class ReplySenderProxy(ReplySenderBase):
|
||||
|
||||
def send(self, socket, reply):
|
||||
assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
|
||||
@ -113,7 +113,7 @@ class ReplySenderProxy(SenderBase):
|
||||
socket.send_dumped([reply.reply_body, reply.failure])
|
||||
|
||||
|
||||
class RequestSenderDirect(SenderBase):
|
||||
class RequestSenderDirect(RequestSenderBase):
|
||||
|
||||
def send(self, socket, request):
|
||||
with self._lock:
|
||||
@ -132,7 +132,7 @@ class RequestSenderDirect(SenderBase):
|
||||
socket.send_dumped([request.context, request.message])
|
||||
|
||||
|
||||
class AckSenderDirect(AckSender):
|
||||
class AckSenderDirect(AckSenderBase):
|
||||
|
||||
def send(self, socket, ack):
|
||||
assert ack.msg_type == zmq_names.ACK_TYPE, "Ack expected!"
|
||||
@ -148,7 +148,7 @@ class AckSenderDirect(AckSender):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class ReplySenderDirect(SenderBase):
|
||||
class ReplySenderDirect(ReplySenderBase):
|
||||
|
||||
def send(self, socket, reply):
|
||||
assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
|
||||
@ -163,4 +163,6 @@ class ReplySenderDirect(SenderBase):
|
||||
def _send(self, socket, reply):
|
||||
socket.send(reply.reply_id, zmq.SNDMORE)
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_dumped(reply.to_dict())
|
||||
socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE)
|
||||
socket.send_string(reply.message_id, zmq.SNDMORE)
|
||||
socket.send_dumped([reply.reply_body, reply.failure])
|
||||
|
Loading…
x
Reference in New Issue
Block a user