From 35cc77f5b925bbc527acd6ca1c034977c8968925 Mon Sep 17 00:00:00 2001 From: Thomas Herve Date: Wed, 8 Jun 2016 09:44:23 +0200 Subject: [PATCH] Remove leftovers on websocket disconnections When a websocket connection is closed, we need to cleanup 2 things: protocol instance which are tracked by the websocket factory, and subscriptions which may have been created by the connection. Closes-Bug: #1590112 Change-Id: I4c21fad979a42518c5b319aa6e4bc4c6b485b872 --- zaqar/api/handler.py | 8 ++++++++ .../unit/transport/websocket/v2/test_subscriptions.py | 11 ++++++++--- zaqar/transport/websocket/factory.py | 3 +++ zaqar/transport/websocket/protocol.py | 4 ++++ 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/zaqar/api/handler.py b/zaqar/api/handler.py index 7080dc0dc..dd792b6e2 100644 --- a/zaqar/api/handler.py +++ b/zaqar/api/handler.py @@ -44,6 +44,14 @@ class Handler(object): def set_subscription_factory(self, factory): self._subscription_factory = factory + def clean_subscriptions(self, subscriptions): + for resp in subscriptions: + body = {'queue_name': resp._request._body.get('queue_name'), + 'subscription_id': resp._body.get('subscription_id')} + payload = {'body': body, 'headers': resp._request._headers} + req = self.create_request(payload) + self.v2_endpoints.subscription_delete(req) + def process_request(self, req, protocol): # FIXME(vkmc): Control API version if req._action == 'subscription_create': diff --git a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py index 1b97db0f4..7f681f9a0 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py @@ -99,9 +99,6 @@ class SubscriptionTest(base.V1_1Base): subscriber['subscriber']) self.assertLessEqual(added_age, subscriber['age']) - self.boot.storage.subscription_controller.delete( - 'kitkat', subscriber['id'], project=self.project_id) - response = { 'body': {'message': 'Subscription kitkat created.', 'subscription_id': subscriber['id']}, @@ -113,6 +110,14 @@ class SubscriptionTest(base.V1_1Base): self.assertEqual(1, sender.call_count) self.assertEqual(response, json.loads(sender.call_args[0][0])) + # Trigger protocol close + self.protocol.onClose(True, 100, None) + subscribers = list( + next( + self.boot.storage.subscription_controller.list( + 'kitkat', self.project_id))) + self.assertEqual([], subscribers) + def test_subscription_delete(self): sub = self.boot.storage.subscription_controller.create( 'kitkat', '', 600, {}, project=self.project_id) diff --git a/zaqar/transport/websocket/factory.py b/zaqar/transport/websocket/factory.py index 5bef52d4f..384dc30dd 100644 --- a/zaqar/transport/websocket/factory.py +++ b/zaqar/transport/websocket/factory.py @@ -44,6 +44,9 @@ class ProtocolFactory(websocket.WebSocketServerFactory): proto.factory = self return proto + def unregister(self, proto_id): + self._protos.pop(proto_id) + class NotificationFactory(object): diff --git a/zaqar/transport/websocket/protocol.py b/zaqar/transport/websocket/protocol.py index 233d14921..b786ecabc 100644 --- a/zaqar/transport/websocket/protocol.py +++ b/zaqar/transport/websocket/protocol.py @@ -67,6 +67,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): self._auth_in_binary = None self._deauth_handle = None self.notify_in_binary = None + self._subscriptions = [] def onConnect(self, request): LOG.info(_LI("Client connecting: %s"), request.peer) @@ -135,9 +136,12 @@ class MessagingProtocol(websocket.WebSocketServerProtocol): # subscription. if not subscriber: self.notify_in_binary = isBinary + self._subscriptions.append(resp) return self._send_response(resp, isBinary) def onClose(self, wasClean, code, reason): + self._handler.clean_subscriptions(self._subscriptions) + self.factory.unregister(self.proto_id) LOG.info(_LI("WebSocket connection closed: %s"), reason) def _authenticate(self, payload, in_binary):