From 1adf880a23562906f0549d389de3962697aa65e4 Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Mon, 10 Aug 2015 18:07:38 +0300 Subject: [PATCH] Fix fork-related issues Many services make use of 'fork' system call to start new instances of 'workers'. Such approach forces messaging drivers to perform their initialization in lazy manner. Added LazyDriverItem object to init any part of the driver by first request. Fixed DEALER-publisher not to block on sending when no listener connected. Introduced ZmqSocket wrapper to track connections in outgoing sockets. Refactoring of publishers, introduced PublisherMultisend. Change-Id: I125c946ee9e36061d1b21aa29adcef0611dff201 --- oslo_messaging/_drivers/impl_zmq.py | 74 +++++++++++++++---- .../client/publishers/zmq_dealer_publisher.py | 45 ++++------- .../client/publishers/zmq_publisher_base.py | 45 ++++++++++- .../client/publishers/zmq_req_publisher.py | 2 +- .../server/consumers/zmq_router_consumer.py | 8 +- .../_drivers/zmq_driver/server/zmq_server.py | 4 +- .../_drivers/zmq_driver/zmq_names.py | 15 ++++ .../_drivers/zmq_driver/zmq_socket.py | 57 ++++++++++++++ 8 files changed, 200 insertions(+), 50 deletions(-) create mode 100644 oslo_messaging/_drivers/zmq_driver/zmq_socket.py diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index f0ff1a647..09dde4b1a 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -15,6 +15,7 @@ import logging import pprint import socket +import threading from oslo_config import cfg from stevedore import driver @@ -82,6 +83,36 @@ zmq_opts = [ ] +class LazyDriverItem(object): + + def __init__(self, item_cls, *args, **kwargs): + self._lock = threading.Lock() + self.item = None + self.item_class = item_cls + self.args = args + self.kwargs = kwargs + + def get(self): + # NOTE(ozamiatin): Lazy initialization. + # All init stuff moved closer to usage point - lazy init. + # Better design approach is to initialize in the driver's + # __init__, but 'fork' extensively used by services + # breaks all things. + + if self.item is not None: + return self.item + + self._lock.acquire() + if self.item is None: + self.item = self.item_class(*self.args, **self.kwargs) + self._lock.release() + return self.item + + def cleanup(self): + if self.item: + self.item.cleanup() + + class ZmqDriver(base.BaseDriver): """ZeroMQ Driver implementation. @@ -115,15 +146,27 @@ class ZmqDriver(base.BaseDriver): conf.register_opts(zmq_opts) conf.register_opts(executor_base._pool_opts) self.conf = conf + self.allowed_remote_exmods = allowed_remote_exmods self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', self.conf.rpc_zmq_matchmaker, ).driver(self.conf) - self.server = zmq_server.ZmqServer(self.conf, self.matchmaker) - self.client = zmq_client.ZmqClient(self.conf, self.matchmaker, - allowed_remote_exmods) + self.server = LazyDriverItem( + zmq_server.ZmqServer, self, self.conf, self.matchmaker) + + self.notify_server = LazyDriverItem( + zmq_server.ZmqServer, self, self.conf, self.matchmaker) + + self.client = LazyDriverItem( + zmq_client.ZmqClient, self.conf, self.matchmaker, + self.allowed_remote_exmods) + + self.notifier = LazyDriverItem( + zmq_client.ZmqClient, self.conf, self.matchmaker, + self.allowed_remote_exmods) + super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) @@ -147,13 +190,14 @@ class ZmqDriver(base.BaseDriver): N means N retries :type retry: int """ + client = self.client.get() timeout = timeout or self.conf.rpc_response_timeout if wait_for_reply: - return self.client.send_call(target, ctxt, message, timeout, retry) + return client.send_call(target, ctxt, message, timeout, retry) elif target.fanout: - self.client.send_fanout(target, ctxt, message, timeout, retry) + client.send_fanout(target, ctxt, message, timeout, retry) else: - self.client.send_cast(target, ctxt, message, timeout, retry) + client.send_cast(target, ctxt, message, timeout, retry) def send_notification(self, target, ctxt, message, version, retry=None): """Send notification to server @@ -172,11 +216,11 @@ class ZmqDriver(base.BaseDriver): N means N retries :type retry: int """ + client = self.notifier.get() if target.fanout: - self.client.send_notify_fanout(target, ctxt, message, version, - retry) + client.send_notify_fanout(target, ctxt, message, version, retry) else: - self.client.send_notify(target, ctxt, message, version, retry) + client.send_notify(target, ctxt, message, version, retry) def listen(self, target): """Listen to a specified target on a server side @@ -184,8 +228,9 @@ class ZmqDriver(base.BaseDriver): :param target: Message destination target :type target: oslo_messaging.Target """ - self.server.listen(target) - return self.server + server = self.server.get() + server.listen(target) + return server def listen_for_notifications(self, targets_and_priorities, pool): """Listen to a specified list of targets on a server side @@ -195,11 +240,14 @@ class ZmqDriver(base.BaseDriver): :param pool: Not used for zmq implementation :type pool: object """ - self.server.listen_notification(targets_and_priorities) - return self.server + server = self.notify_server.get() + server.listen_notification(targets_and_priorities) + return server def cleanup(self): """Cleanup all driver's connections finally """ self.client.cleanup() self.server.cleanup() + self.notify_server.cleanup() + self.notifier.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py index bf6f253f9..9fdd6d7a8 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -14,20 +14,21 @@ import logging -from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE, _LI +from oslo_messaging._i18n import _LI, _LW LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class DealerPublisher(zmq_publisher_base.PublisherBase): +class DealerPublisher(zmq_publisher_base.PublisherMultisend): + + def __init__(self, conf, matchmaker): + super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER) def send_request(self, request): @@ -37,41 +38,25 @@ class DealerPublisher(zmq_publisher_base.PublisherBase): dealer_socket, hosts = self._check_hosts_connections(request.target) if request.msg_type in zmq_names.MULTISEND_TYPES: - for _ in range(len(hosts)): + for _ in range(dealer_socket.connections_count()): self._send_request(dealer_socket, request) else: self._send_request(dealer_socket, request) def _send_request(self, socket, request): + if not socket.connections: + # NOTE(ozamiatin): Here we can provide + # a queue for keeping messages to send them later + # when some listener appears. However such approach + # being more reliable will consume additional memory. + LOG.warning(_LW("Request %s was dropped because no connection") + % request.msg_type) + return + socket.send(b'', zmq.SNDMORE) super(DealerPublisher, self)._send_request(socket, request) LOG.info(_LI("Sending message %(message)s to a target %(target)s") % {"message": request.message, "target": request.target}) - - def _check_hosts_connections(self, target): - if str(target) in self.outbound_sockets: - dealer_socket, hosts = self.outbound_sockets[str(target)] - else: - dealer_socket = zmq.Context().socket(zmq.DEALER) - hosts = self.matchmaker.get_hosts(target) - for host in hosts: - self._connect_to_host(dealer_socket, host, target) - self.outbound_sockets[str(target)] = (dealer_socket, hosts) - return dealer_socket, hosts - - @staticmethod - def _connect_to_host(socket, host, target): - address = zmq_address.get_tcp_direct_address(host) - try: - LOG.info(_LI("Connecting DEALER to %(address)s for %(target)s") - % {"address": address, - "target": target}) - socket.connect(address) - except zmq.ZMQError as e: - errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\ - % (address, e) - LOG.error(errmsg) - raise rpc_common.RPCException(errmsg) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index a367e9ed3..51de8a5e6 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -13,13 +13,18 @@ # under the License. import abc +import logging import six from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._i18n import _LE +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._i18n import _LE, _LI +LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() @@ -93,6 +98,42 @@ class PublisherBase(object): def cleanup(self): """Cleanup publisher. Close allocated connections.""" - for socket, hosts in self.outbound_sockets.values(): + for socket in self.outbound_sockets.values(): socket.setsockopt(zmq.LINGER, 0) socket.close() + + +class PublisherMultisend(PublisherBase): + + def __init__(self, conf, matchmaker, socket_type): + self.socket_type = socket_type + super(PublisherMultisend, self).__init__(conf, matchmaker) + + def _check_hosts_connections(self, target): + hosts = self.matchmaker.get_hosts(target) + + if str(target) in self.outbound_sockets: + socket = self.outbound_sockets[str(target)] + else: + socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type) + self.outbound_sockets[str(target)] = socket + + for host in hosts: + self._connect_to_host(socket, host, target) + + return socket, hosts + + def _connect_to_host(self, socket, host, target): + address = zmq_address.get_tcp_direct_address(host) + stype = zmq_names.socket_type_str(self.socket_type) + try: + LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s") + % {"stype": stype, + "address": address, + "target": target}) + socket.connect(address) + except zmq.ZMQError as e: + errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\ + % (stype, address, e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index 68beab903..a3096959c 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -52,7 +52,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): LOG.info(_LI("Connecting REQ to %s") % connect_address) socket.connect(connect_address) - self.outbound_sockets[str(target)] = (socket, [host]) + self.outbound_sockets[str(target)] = socket return socket except zmq.ZMQError as e: diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 58680da90..92b9364ba 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -30,6 +30,7 @@ class RouterConsumer(object): def __init__(self, conf, poller, server): + self.conf = conf self.poller = poller self.server = server @@ -38,6 +39,7 @@ class RouterConsumer(object): self.socket = self.context.socket(zmq.ROUTER) self.address = zmq_address.get_tcp_random_address(conf) self.port = self.socket.bind_to_random_port(self.address) + self.poller.register(self.socket, self._receive_message) LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"), {"addr": self.address, "port": self.port}) @@ -49,7 +51,7 @@ class RouterConsumer(object): def listen(self, target): LOG.info(_LI("Listen to target %s") % str(target)) - self.poller.register(self.socket, self._receive_message) + # Do nothing here because we have single socket def cleanup(self): if not self.socket.closed: @@ -66,7 +68,9 @@ class RouterConsumer(object): assert msg_type is not None, 'Bad format: msg type expected' context = socket.recv_json() message = socket.recv_json() - LOG.debug("Received %s message %s" % (msg_type, str(message))) + LOG.info(_LI("Received %(msg_type)s message %(msg)s") + % {"msg_type": msg_type, + "msg": str(message)}) if msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 30cacd409..8f7f12657 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -28,8 +28,8 @@ zmq = zmq_async.import_zmq() class ZmqServer(base.Listener): - def __init__(self, conf, matchmaker=None): - self.conf = conf + def __init__(self, driver, conf, matchmaker=None): + super(ZmqServer, self).__init__(driver) self.matchmaker = matchmaker self.poller = zmq_async.get_poller() self.rpc_consumer = zmq_router_consumer.RouterConsumer( diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 583600ec4..33fe9247c 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -12,6 +12,17 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_messaging._drivers.zmq_driver import zmq_async + +zmq = zmq_async.import_zmq() + + +ZMQ_SOCKET_STR = {zmq.DEALER: "DEALER", + zmq.ROUTER: "ROUTER", + zmq.REQ: "REQ", + zmq.REP: "REP", + zmq.PUB: "PUB", + zmq.SUB: "SUB"} FIELD_FAILURE = 'failure' FIELD_REPLY = 'reply' @@ -33,3 +44,7 @@ MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE) DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE) CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE) + + +def socket_type_str(socket_type): + return ZMQ_SOCKET_STR[socket_type] diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py new file mode 100644 index 000000000..a4f77b7e8 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -0,0 +1,57 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ZmqSocket(object): + + def __init__(self, context, socket_type): + self.context = context + self.socket_type = socket_type + self.handle = context.socket(socket_type) + self.connections = set() + + def type_name(self): + return zmq_names(self.socket_type) + + def connections_count(self): + return len(self.connections) + + def connect(self, address): + if address not in self.connections: + self.handle.connect(address) + self.connections.add(address) + + def setsockopt(self, *args, **kwargs): + self.handle.setsockopt(*args, **kwargs) + + def send(self, *args, **kwargs): + self.handle.send(*args, **kwargs) + + def send_string(self, *args, **kwargs): + self.handle.send_string(*args, **kwargs) + + def send_json(self, *args, **kwargs): + self.handle.send_json(*args, **kwargs) + + def close(self, *args, **kwargs): + self.handle.close(*args, **kwargs)