Close sockets properly

All socket-connections should properly
die after their parents being stopped.

Change-Id: I6a83ed2d5ef194e8b068c1d8bd6813f48636c5fb
This commit is contained in:
Oleksii Zamiatin 2015-07-20 12:21:58 +03:00
parent 75660cedac
commit e2c3e36d75
13 changed files with 87 additions and 33 deletions

View File

@ -96,21 +96,20 @@ class ZmqDriver(base.BaseDriver):
conf.register_opts(zmq_opts)
conf.register_opts(executor_base._pool_opts)
self.conf = conf
self.server = None
self.client = None
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.conf.rpc_zmq_matchmaker,
).driver(self.conf)
self.server = zmq_server.ZmqServer(self.conf, self.matchmaker)
self.client = zmq_client.ZmqClient(self.conf, self.matchmaker,
allowed_remote_exmods)
super(ZmqDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
if self.client is None:
self.client = zmq_client.ZmqClient(self.conf, self.matchmaker,
self._allowed_remote_exmods)
if wait_for_reply:
return self.client.call(target, ctxt, message, timeout, retry)
else:
@ -121,8 +120,6 @@ class ZmqDriver(base.BaseDriver):
return None
def listen(self, target):
if self.server is None:
self.server = zmq_server.ZmqServer(self.conf, self.matchmaker)
self.server.listen(target)
return self.server
@ -130,4 +127,5 @@ class ZmqDriver(base.BaseDriver):
return None
def cleanup(self):
pass
self.client.cleanup()
self.server.cleanup()

View File

@ -110,6 +110,9 @@ class BaseTcpFrontend(object):
LOG.info(_LI("Message %s received."), message)
return message
def close(self):
self.frontend.close()
@six.add_metaclass(abc.ABCMeta)
class BaseBackendMatcher(object):
@ -124,6 +127,11 @@ class BaseBackendMatcher(object):
def redirect_to_backend(self, message):
"""Redirect message"""
def close(self):
if self.backends:
for backend in self.backends.values():
backend.close()
@six.add_metaclass(abc.ABCMeta)
class DirectBackendMatcher(BaseBackendMatcher):

View File

@ -33,3 +33,6 @@ class PublisherBackend(base_proxy.BaseBackendMatcher):
target_pos = zmq_serializer.MESSAGE_CALL_TARGET_POSITION + 1
msg = message[target_pos:]
self.backend.send_multipart(msg)
def close(self):
self.backend.close()

View File

@ -50,6 +50,12 @@ class UniversalProxy(base_proxy.BaseProxy):
else:
self.tcp_frontend.redirect_outgoing_reply(message)
def stop(self):
self.poller.close()
super(UniversalProxy, self).stop()
self.tcp_frontend.close()
self.backend_matcher.close()
class BackendMatcher(base_proxy.BaseBackendMatcher):

View File

@ -49,8 +49,8 @@ class RedisMatchMaker(base.MatchMakerBase):
)
def register(self, target, hostname):
key = zmq_target.target_to_str(target)
if hostname not in self.get_hosts(target):
key = zmq_target.target_to_str(target)
self._redis.lpush(key, hostname)
def get_hosts(self, target):

View File

@ -29,12 +29,12 @@ class GreenPoller(zmq_poller.ZmqPoller):
def __init__(self):
self.incoming_queue = six.moves.queue.Queue()
self.green_pool = eventlet.GreenPool()
self.sockets = []
self.threads = []
def register(self, socket, recv_method=None):
self.sockets.append(socket)
return self.green_pool.spawn(self._socket_receive, socket,
recv_method)
self.threads.append(
self.green_pool.spawn(self._socket_receive, socket,
recv_method))
def _socket_receive(self, socket, recv_method=None):
while True:
@ -58,6 +58,10 @@ class GreenPoller(zmq_poller.ZmqPoller):
return None, None
return incoming[0], incoming[1]
def close(self):
for thread in self.threads:
thread.kill()
class HoldReplyPoller(GreenPoller):

View File

@ -42,7 +42,13 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
def poll(self, timeout=None):
timeout = timeout * 1000 # zmq poller waits milliseconds
sockets = dict(self.poller.poll(timeout=timeout))
sockets = None
try:
sockets = dict(self.poller.poll(timeout=timeout))
except zmq.ZMQError as e:
LOG.debug("Polling terminated with error: %s" % e)
if not sockets:
return None, None
for socket in sockets:
@ -51,6 +57,12 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
else:
return socket.recv_multipart(), socket
def resume_polling(self, socket):
pass # Nothing to do for threading poller
def close(self):
pass # Nothing to do for threading poller
class ThreadingExecutor(zmq_poller.Executor):

View File

@ -33,6 +33,7 @@ class CallRequest(Request):
retry=None, allowed_remote_exmods=None, matchmaker=None):
self.allowed_remote_exmods = allowed_remote_exmods or []
self.matchmaker = matchmaker
self.reply_poller = zmq_async.get_reply_poller()
try:
self.zmq_context = zmq.Context()
@ -51,13 +52,16 @@ class CallRequest(Request):
LOG.error(_LE("Error connecting to socket: %s") % str(e))
raise
def close(self):
self.reply_poller.close()
self.socket.close()
def receive_reply(self):
# NOTE(ozamiatin): Check for retry here (no retries now)
poller = zmq_async.get_reply_poller()
poller.register(self.socket,
recv_method=lambda socket: socket.recv_json())
self.reply_poller.register(
self.socket, recv_method=lambda socket: socket.recv_json())
reply, socket = poller.poll(timeout=self.timeout)
reply, socket = self.reply_poller.poll(timeout=self.timeout)
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % self.timeout)

View File

@ -71,7 +71,13 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
dealer_socket = self.zmq_context.socket(zmq.DEALER)
LOG.info(_LI("Connecting DEALER to %s") % address)
dealer_socket.connect(address)
self.outbound_sockets[address] = dealer_socket
return dealer_socket
except zmq.ZMQError:
LOG.error(_LE("Failed connecting DEALER to %s") % address)
raise
def cleanup(self):
if self.outbound_sockets:
for socket in self.outbound_sockets.values():
socket.close()

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request
from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer
@ -27,10 +28,14 @@ class ZmqClient(object):
matchmaker)
def call(self, target, context, message, timeout=None, retry=None):
request = zmq_call_request.CallRequest(
self.conf, target, context, message, timeout, retry,
self.allowed_remote_exmods, self.matchmaker)
return request()
with contextlib.closing(zmq_call_request.CallRequest(
self.conf, target, context, message, timeout, retry,
self.allowed_remote_exmods,
self.matchmaker)) as request:
return request()
def cast(self, target, context, message, timeout=None, retry=None):
self.cast_publisher.cast(target, context, message, timeout, retry)
def cleanup(self):
self.cast_publisher.cleanup()

View File

@ -12,7 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class ConsumerBase(object):
def __init__(self, listener, conf, zmq_poller, context):
@ -22,14 +27,11 @@ class ConsumerBase(object):
self.context = context
self.sockets_per_target = {}
def poll(self, timeout=None):
pass
def stop(self):
pass
def cleanup(self):
pass
if self.sockets_per_target:
for socket in self.sockets_per_target.values():
socket.close()
@abc.abstractmethod
def listen(self, target):
pass
"""Listen for target"""

View File

@ -47,7 +47,9 @@ class ZmqServer(base.Listener):
LOG.info("[Server] Stop")
def cleanup(self):
pass
self.poller.close()
self.call_resp.cleanup()
self.fanout_resp.cleanup()
def listen(self, target):
LOG.info("[Server] Listen to Target %s" % target)

View File

@ -22,11 +22,15 @@ class ZmqPoller(object):
@abc.abstractmethod
def register(self, socket, recv_method=None):
'Register socket to poll'
"""Register socket to poll"""
@abc.abstractmethod
def poll(self, timeout=None):
'Poll for messages'
"""Poll for messages"""
@abc.abstractmethod
def close(self):
"""Terminate polling"""
@six.add_metaclass(abc.ABCMeta)