Remove leftovers on websocket disconnections
When a websocket connection is closed, we need to cleanup 2 things: protocol instance which are tracked by the websocket factory, and subscriptions which may have been created by the connection. Closes-Bug: #1590112 Change-Id: I4c21fad979a42518c5b319aa6e4bc4c6b485b872
This commit is contained in:
parent
61100fec33
commit
35cc77f5b9
@ -44,6 +44,14 @@ class Handler(object):
|
||||
def set_subscription_factory(self, factory):
|
||||
self._subscription_factory = factory
|
||||
|
||||
def clean_subscriptions(self, subscriptions):
|
||||
for resp in subscriptions:
|
||||
body = {'queue_name': resp._request._body.get('queue_name'),
|
||||
'subscription_id': resp._body.get('subscription_id')}
|
||||
payload = {'body': body, 'headers': resp._request._headers}
|
||||
req = self.create_request(payload)
|
||||
self.v2_endpoints.subscription_delete(req)
|
||||
|
||||
def process_request(self, req, protocol):
|
||||
# FIXME(vkmc): Control API version
|
||||
if req._action == 'subscription_create':
|
||||
|
@ -99,9 +99,6 @@ class SubscriptionTest(base.V1_1Base):
|
||||
subscriber['subscriber'])
|
||||
self.assertLessEqual(added_age, subscriber['age'])
|
||||
|
||||
self.boot.storage.subscription_controller.delete(
|
||||
'kitkat', subscriber['id'], project=self.project_id)
|
||||
|
||||
response = {
|
||||
'body': {'message': 'Subscription kitkat created.',
|
||||
'subscription_id': subscriber['id']},
|
||||
@ -113,6 +110,14 @@ class SubscriptionTest(base.V1_1Base):
|
||||
self.assertEqual(1, sender.call_count)
|
||||
self.assertEqual(response, json.loads(sender.call_args[0][0]))
|
||||
|
||||
# Trigger protocol close
|
||||
self.protocol.onClose(True, 100, None)
|
||||
subscribers = list(
|
||||
next(
|
||||
self.boot.storage.subscription_controller.list(
|
||||
'kitkat', self.project_id)))
|
||||
self.assertEqual([], subscribers)
|
||||
|
||||
def test_subscription_delete(self):
|
||||
sub = self.boot.storage.subscription_controller.create(
|
||||
'kitkat', '', 600, {}, project=self.project_id)
|
||||
|
@ -44,6 +44,9 @@ class ProtocolFactory(websocket.WebSocketServerFactory):
|
||||
proto.factory = self
|
||||
return proto
|
||||
|
||||
def unregister(self, proto_id):
|
||||
self._protos.pop(proto_id)
|
||||
|
||||
|
||||
class NotificationFactory(object):
|
||||
|
||||
|
@ -67,6 +67,7 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
|
||||
self._auth_in_binary = None
|
||||
self._deauth_handle = None
|
||||
self.notify_in_binary = None
|
||||
self._subscriptions = []
|
||||
|
||||
def onConnect(self, request):
|
||||
LOG.info(_LI("Client connecting: %s"), request.peer)
|
||||
@ -135,9 +136,12 @@ class MessagingProtocol(websocket.WebSocketServerProtocol):
|
||||
# subscription.
|
||||
if not subscriber:
|
||||
self.notify_in_binary = isBinary
|
||||
self._subscriptions.append(resp)
|
||||
return self._send_response(resp, isBinary)
|
||||
|
||||
def onClose(self, wasClean, code, reason):
|
||||
self._handler.clean_subscriptions(self._subscriptions)
|
||||
self.factory.unregister(self.proto_id)
|
||||
LOG.info(_LI("WebSocket connection closed: %s"), reason)
|
||||
|
||||
def _authenticate(self, payload, in_binary):
|
||||
|
Loading…
x
Reference in New Issue
Block a user