[zmq] Merge publishers

This patch merges small logically related publishers supporting
only some messaging patterns into bigger ones supporting all
messaging patterns at once.

Change-Id: Ic47e4d89166dd14f8a67101e666dc780a1ccb2a8
This commit is contained in:
Gevorg Davoian 2016-07-12 18:42:45 +03:00
parent c6c70aba2d
commit 07187f9bac
7 changed files with 184 additions and 178 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2015 Mirantis, Inc. # Copyright 2016 Mirantis, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import abc
from concurrent import futures from concurrent import futures
import logging import logging
@ -21,6 +22,7 @@ import oslo_messaging
from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers \ from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_publisher_base import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LE
@ -30,34 +32,23 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
class DealerPublisher(zmq_publisher_base.PublisherBase): class DealerPublisherBase(zmq_publisher_base.PublisherBase):
"""Non-CALL publisher using direct connections.""" """Abstract DEALER-publisher."""
def send_request(self, request): def __init__(self, conf, matchmaker, sender, receiver):
if request.msg_type == zmq_names.CALL_TYPE: sockets_manager = zmq_sockets_manager.SocketsManager(
conf, matchmaker, zmq.ROUTER, zmq.DEALER
)
super(DealerPublisherBase, self).__init__(sockets_manager, sender,
receiver)
@staticmethod
def _check_pattern(request, supported_pattern):
if request.msg_type != supported_pattern:
raise zmq_publisher_base.UnsupportedSendPattern( raise zmq_publisher_base.UnsupportedSendPattern(
zmq_names.message_type_str(request.msg_type) zmq_names.message_type_str(request.msg_type)
) )
try:
socket = self.sockets_manager.get_socket(request.target)
except retrying.RetryError:
return
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(socket.connections_count()):
self.sender.send(socket, request)
else:
self.sender.send(socket, request)
class DealerCallPublisher(zmq_publisher_base.PublisherBase):
"""CALL publisher using direct connections."""
def __init__(self, sockets_manager, sender, reply_receiver):
super(DealerCallPublisher, self).__init__(sockets_manager, sender)
self.reply_receiver = reply_receiver
@staticmethod @staticmethod
def _raise_timeout(request): def _raise_timeout(request):
raise oslo_messaging.MessagingTimeout( raise oslo_messaging.MessagingTimeout(
@ -65,26 +56,12 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
{"tout": request.timeout, "msg_id": request.message_id} {"tout": request.timeout, "msg_id": request.message_id}
) )
def send_request(self, request): @abc.abstractmethod
if request.msg_type != zmq_names.CALL_TYPE: def _connect_socket(self, request):
raise zmq_publisher_base.UnsupportedSendPattern( pass
zmq_names.message_type_str(request.msg_type)
)
try:
socket = self._connect_socket(request.target)
except retrying.RetryError:
self._raise_timeout(request)
self.sender.send(socket, request)
self.reply_receiver.register_socket(socket)
return self._recv_reply(request)
def _connect_socket(self, target):
return self.sockets_manager.get_socket(target)
def _recv_reply(self, request): def _recv_reply(self, request):
reply_future, = self.reply_receiver.track_request(request) reply_future, = self.receiver.track_request(request)
try: try:
_, reply = reply_future.result(timeout=request.timeout) _, reply = reply_future.result(timeout=request.timeout)
@ -95,7 +72,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
except futures.TimeoutError: except futures.TimeoutError:
self._raise_timeout(request) self._raise_timeout(request)
finally: finally:
self.reply_receiver.untrack_request(request) self.receiver.untrack_request(request)
if reply.failure: if reply.failure:
raise rpc_common.deserialize_remote_exception( raise rpc_common.deserialize_remote_exception(
@ -104,6 +81,30 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
else: else:
return reply.reply_body return reply.reply_body
def cleanup(self): def send_call(self, request):
self.reply_receiver.stop() self._check_pattern(request, zmq_names.CALL_TYPE)
super(DealerCallPublisher, self).cleanup()
try:
socket = self._connect_socket(request)
except retrying.RetryError:
self._raise_timeout(request)
self.sender.send(socket, request)
self.receiver.register_socket(socket)
return self._recv_reply(request)
@abc.abstractmethod
def _send_non_blocking(self, request):
pass
def send_cast(self, request):
self._check_pattern(request, zmq_names.CAST_TYPE)
self._send_non_blocking(request)
def send_fanout(self, request):
self._check_pattern(request, zmq_names.CAST_FANOUT_TYPE)
self._send_non_blocking(request)
def send_notify(self, request):
self._check_pattern(request, zmq_names.NOTIFY_TYPE)
self._send_non_blocking(request)

View File

@ -0,0 +1,53 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import retrying
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase):
"""DEALER-publisher using direct connections."""
def __init__(self, conf, matchmaker):
sender = zmq_senders.RequestSenderDirect(conf)
receiver = zmq_receivers.ReplyReceiverDirect(conf)
super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender,
receiver)
def _connect_socket(self, request):
return self.sockets_manager.get_socket(request.target)
def _send_non_blocking(self, request):
try:
socket = self._connect_socket(request)
except retrying.RetryError:
return
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(socket.connections_count()):
self.sender.send(socket, request)
else:
self.sender.send(socket, request)

View File

@ -17,10 +17,10 @@ import logging
import retrying import retrying
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher import zmq_dealer_publisher_base
from oslo_messaging._drivers.zmq_driver.client.publishers \ from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
@ -31,17 +31,31 @@ LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
class DealerPublisherProxy(zmq_publisher_base.PublisherBase): class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
"""Non-CALL publisher via proxy.""" """DEALER-publisher via proxy."""
def __init__(self, sockets_manager, sender): def __init__(self, conf, matchmaker):
super(DealerPublisherProxy, self).__init__(sockets_manager, sender) sender = zmq_senders.RequestSenderProxy(conf)
self.socket = sockets_manager.get_socket_to_publishers() receiver = zmq_receivers.ReplyReceiverProxy(conf)
super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
receiver)
self.socket = self.sockets_manager.get_socket_to_publishers()
self.routing_table = zmq_routing_table.RoutingTable(self.conf, self.routing_table = zmq_routing_table.RoutingTable(self.conf,
self.matchmaker) self.matchmaker)
self.connection_updater = \ self.connection_updater = \
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket) PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
def _connect_socket(self, request):
return self.socket
def send_call(self, request):
try:
request.routing_key = \
self.routing_table.get_routable_host(request.target)
except retrying.RetryError:
self._raise_timeout(request)
return super(DealerPublisherProxy, self).send_call(request)
def _get_routing_keys(self, request): def _get_routing_keys(self, request):
try: try:
if request.msg_type in zmq_names.DIRECT_TYPES: if request.msg_type in zmq_names.DIRECT_TYPES:
@ -54,48 +68,14 @@ class DealerPublisherProxy(zmq_publisher_base.PublisherBase):
except retrying.RetryError: except retrying.RetryError:
return [] return []
def send_request(self, request): def _send_non_blocking(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(
zmq_names.message_type_str(request.msg_type)
)
for routing_key in self._get_routing_keys(request): for routing_key in self._get_routing_keys(request):
request.routing_key = routing_key request.routing_key = routing_key
self.sender.send(self.socket, request) self.sender.send(self.socket, request)
def cleanup(self): def cleanup(self):
self.connection_updater.stop()
self.socket.close()
super(DealerPublisherProxy, self).cleanup() super(DealerPublisherProxy, self).cleanup()
class DealerCallPublisherProxy(zmq_dealer_publisher.DealerCallPublisher):
"""CALL publisher via proxy."""
def __init__(self, sockets_manager, sender, reply_waiter):
super(DealerCallPublisherProxy, self).__init__(
sockets_manager, sender, reply_waiter
)
self.socket = self.sockets_manager.get_socket_to_publishers()
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
self.matchmaker)
self.connection_updater = \
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
def send_request(self, request):
try:
request.routing_key = \
self.routing_table.get_routable_host(request.target)
except retrying.RetryError:
self._raise_timeout(request)
return super(DealerCallPublisherProxy, self).send_request(request)
def _connect_socket(self, target):
return self.socket
def cleanup(self):
self.connection_updater.stop() self.connection_updater.stop()
super(DealerCallPublisherProxy, self).cleanup()
self.socket.close() self.socket.close()

View File

@ -53,29 +53,42 @@ class PublisherBase(object):
Publisher can send request objects from zmq_request. Publisher can send request objects from zmq_request.
""" """
def __init__(self, sockets_manager, sender): def __init__(self, sockets_manager, sender, receiver):
"""Construct publisher """Construct publisher
Accept configuration object and Name Service interface object. Accept sockets manager, sender and receiver objects.
Create zmq.Context and connected sockets dictionary.
:param conf: configuration object :param sockets_manager: sockets manager object
:type conf: oslo_config.CONF :type sockets_manager: zmq_sockets_manager.SocketsManager
:param senders: request sender object
:type senders: zmq_senders.RequestSender
:param receiver: reply receiver object
:type receiver: zmq_receivers.ReplyReceiver
""" """
self.sockets_manager = sockets_manager self.sockets_manager = sockets_manager
self.conf = sockets_manager.conf self.conf = sockets_manager.conf
self.matchmaker = sockets_manager.matchmaker self.matchmaker = sockets_manager.matchmaker
self.sender = sender self.sender = sender
self.receiver = receiver
@abc.abstractmethod @abc.abstractmethod
def send_request(self, request): def send_call(self, request):
"""Send request to consumer pass
:param request: Message data and destination container object @abc.abstractmethod
:type request: zmq_request.Request def send_cast(self, request):
""" pass
@abc.abstractmethod
def send_fanout(self, request):
pass
@abc.abstractmethod
def send_notify(self, request):
pass
def cleanup(self): def cleanup(self):
"""Cleanup publisher. Close allocated connections.""" """Cleanup publisher. Close allocated connections."""
self.receiver.stop()
self.sockets_manager.cleanup() self.sockets_manager.cleanup()

View File

@ -15,13 +15,10 @@
from oslo_messaging._drivers import common from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher import zmq_dealer_publisher_direct
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_names
@ -45,34 +42,18 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
if conf.use_router_proxy or not conf.use_pub_sub: if conf.use_router_proxy or not conf.use_pub_sub:
raise WrongClientException() raise WrongClientException()
self.sockets_manager = zmq_sockets_manager.SocketsManager( publisher_direct = \
conf, matchmaker, zmq.ROUTER, zmq.DEALER zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
)
sender_proxy = zmq_senders.RequestSenderProxy(conf) publisher_proxy = \
sender_direct = zmq_senders.RequestSenderDirect(conf) zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
receiver_direct = zmq_receivers.ReplyReceiverDirect(conf)
fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
self.sockets_manager, sender_proxy
)
super(ZmqClientMixDirectPubSub, self).__init__( super(ZmqClientMixDirectPubSub, self).__init__(
conf, matchmaker, allowed_remote_exmods, conf, matchmaker, allowed_remote_exmods,
publishers={ publishers={
zmq_names.CALL_TYPE: zmq_names.CAST_FANOUT_TYPE: publisher_proxy,
zmq_dealer_publisher.DealerCallPublisher( zmq_names.NOTIFY_TYPE: publisher_proxy,
self.sockets_manager, sender_direct, receiver_direct "default": publisher_direct
),
zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
zmq_names.NOTIFY_TYPE: fanout_publisher,
"default":
zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
sender_direct)
} }
) )
@ -90,26 +71,12 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase):
if conf.use_pub_sub or conf.use_router_proxy: if conf.use_pub_sub or conf.use_router_proxy:
raise WrongClientException() raise WrongClientException()
self.sockets_manager = zmq_sockets_manager.SocketsManager( publisher = \
conf, matchmaker, zmq.ROUTER, zmq.DEALER zmq_dealer_publisher_direct.DealerPublisherDirect(conf, matchmaker)
)
sender = zmq_senders.RequestSenderDirect(conf)
receiver = zmq_receivers.ReplyReceiverDirect(conf)
super(ZmqClientDirect, self).__init__( super(ZmqClientDirect, self).__init__(
conf, matchmaker, allowed_remote_exmods, conf, matchmaker, allowed_remote_exmods,
publishers={ publishers={"default": publisher}
zmq_names.CALL_TYPE:
zmq_dealer_publisher.DealerCallPublisher(
self.sockets_manager, sender, receiver
),
"default":
zmq_dealer_publisher.DealerPublisher(self.sockets_manager,
sender)
}
) )
@ -128,25 +95,10 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase):
if not conf.use_router_proxy: if not conf.use_router_proxy:
raise WrongClientException() raise WrongClientException()
self.sockets_manager = zmq_sockets_manager.SocketsManager( publisher = \
conf, matchmaker, zmq.ROUTER, zmq.DEALER zmq_dealer_publisher_proxy.DealerPublisherProxy(conf, matchmaker)
)
sender = zmq_senders.RequestSenderProxy(conf)
receiver = zmq_receivers.ReplyReceiverProxy(conf)
super(ZmqClientProxy, self).__init__( super(ZmqClientProxy, self).__init__(
conf, matchmaker, allowed_remote_exmods, conf, matchmaker, allowed_remote_exmods,
publishers={ publishers={"default": publisher}
zmq_names.CALL_TYPE:
zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
self.sockets_manager, sender, receiver
),
"default":
zmq_dealer_publisher_proxy.DealerPublisherProxy(
self.sockets_manager, sender
)
}
) )

View File

@ -24,45 +24,44 @@ class ZmqClientBase(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None, def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
publishers=None): publishers=None):
self.conf = conf self.conf = conf
self.context = zmq.Context()
self.matchmaker = matchmaker self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or [] self.allowed_remote_exmods = allowed_remote_exmods or []
self.publishers = publishers self.publishers = publishers
self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \ self.call_publisher = publishers.get(zmq_names.CALL_TYPE,
or publishers["default"] publishers["default"])
self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \ self.cast_publisher = publishers.get(zmq_names.CAST_TYPE,
or publishers["default"] publishers["default"])
self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \ self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE,
or publishers["default"] publishers["default"])
self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \ self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE,
or publishers["default"] publishers["default"])
def send_call(self, target, context, message, timeout=None, retry=None): def send_call(self, target, context, message, timeout=None, retry=None):
request = zmq_request.CallRequest( request = zmq_request.CallRequest(
target, context=context, message=message, retry=retry, target, context=context, message=message, retry=retry,
timeout=timeout, allowed_remote_exmods=self.allowed_remote_exmods timeout=timeout, allowed_remote_exmods=self.allowed_remote_exmods
) )
return self.call_publisher.send_request(request) return self.call_publisher.send_call(request)
def send_cast(self, target, context, message, retry=None): def send_cast(self, target, context, message, retry=None):
request = zmq_request.CastRequest( request = zmq_request.CastRequest(
target, context=context, message=message, retry=retry target, context=context, message=message, retry=retry
) )
self.cast_publisher.send_request(request) self.cast_publisher.send_cast(request)
def send_fanout(self, target, context, message, retry=None): def send_fanout(self, target, context, message, retry=None):
request = zmq_request.FanoutRequest( request = zmq_request.FanoutRequest(
target, context=context, message=message, retry=retry target, context=context, message=message, retry=retry
) )
self.fanout_publisher.send_request(request) self.fanout_publisher.send_fanout(request)
def send_notify(self, target, context, message, version, retry=None): def send_notify(self, target, context, message, version, retry=None):
request = zmq_request.NotificationRequest( request = zmq_request.NotificationRequest(
target, context=context, message=message, retry=retry, target, context=context, message=message, retry=retry,
version=version version=version
) )
self.notify_publisher.send_request(request) self.notify_publisher.send_notify(request)
def cleanup(self): def cleanup(self):
cleaned = set() cleaned = set()

View File

@ -37,7 +37,15 @@ class SenderBase(object):
pass pass
class RequestSenderProxy(SenderBase): class RequestSender(SenderBase):
pass
class ReplySender(SenderBase):
pass
class RequestSenderProxy(RequestSender):
def send(self, socket, request): def send(self, socket, request):
socket.send(b'', zmq.SNDMORE) socket.send(b'', zmq.SNDMORE)
@ -55,7 +63,7 @@ class RequestSenderProxy(SenderBase):
"target": request.target}) "target": request.target})
class ReplySenderProxy(SenderBase): class ReplySenderProxy(ReplySender):
def send(self, socket, reply): def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id) LOG.debug("Replying to %s", reply.message_id)
@ -69,7 +77,7 @@ class ReplySenderProxy(SenderBase):
socket.send_dumped(reply.to_dict()) socket.send_dumped(reply.to_dict())
class RequestSenderDirect(SenderBase): class RequestSenderDirect(RequestSender):
def send(self, socket, request): def send(self, socket, request):
socket.send(b'', zmq.SNDMORE) socket.send(b'', zmq.SNDMORE)
@ -85,7 +93,7 @@ class RequestSenderDirect(SenderBase):
"target": request.target}) "target": request.target})
class ReplySenderDirect(SenderBase): class ReplySenderDirect(ReplySender):
def send(self, socket, reply): def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id) LOG.debug("Replying to %s", reply.message_id)