From 69c799734bcd0d1a0e85096f687f17ee3d0743c0 Mon Sep 17 00:00:00 2001 From: wangxiyuan Date: Mon, 16 May 2016 15:07:55 +0800 Subject: [PATCH] Subscription Confirmation Support-1 The subscription confirmation feature will contain four patches: 1. webhook with mongoDB 2. email with mongoDB 3. webhook with redis 4. email with redis This patch is the first part of subscription confirmation feature for webhook with MongoDB. Others will be achieved in follow patches. This patch did: 1. Add v2/queue//subscription//confirm endpoint. 2. Add a new config option: "require_confirmation". 3. Add a new property "confirmed" to subscription resource for MongoDB driver. 4. Add a new policy "subscription: confirm". 5. Add a new property "message type" for notification. 6. Use the pre-signed url in confirm request. 8. Re-use POST subscription to allow re-confirm. 9. Update notification for webhook subscription with mongoDB. 10. Support unsubscrib the subscription 11. Add tests for the feature. 12. Add doc and sample. Docimpact APIimpact Change-Id: Id38d4a5b4f9303b12e22e2b5c248facda4c00143 Implements: blueprint subscription-confirmation-support --- doc/source/subscription_confirm.rst | 204 ++++++++++++++++++ etc/policy.json.sample | 1 + ...ription_confirmation-883cb7f325885ef0.yaml | 9 + samples/zaqar/subscriber_service_sample.py | 79 +++++++ zaqar/common/auth.py | 35 ++- zaqar/common/configs.py | 5 +- zaqar/notification/notifier.py | 87 +++++++- zaqar/storage/mongodb/subscriptions.py | 27 ++- zaqar/storage/pipeline.py | 4 +- zaqar/storage/pooling.py | 11 + zaqar/tests/base.py | 4 + zaqar/tests/etc/policy.json | 1 + .../tests/v2/test_subscriptions.py | 9 +- .../tests/v2/test_subscriptions_negative.py | 5 + .../tests/unit/notification/test_notifier.py | 45 +++- zaqar/tests/unit/storage/test_impl_mongodb.py | 28 +++ .../websocket/v2/test_subscriptions.py | 9 +- .../transport/wsgi/v2_0/test_subscriptions.py | 76 +++++++ zaqar/transport/validation.py | 6 + zaqar/transport/wsgi/v2_0/__init__.py | 7 +- zaqar/transport/wsgi/v2_0/subscriptions.py | 86 +++++++- 21 files changed, 709 insertions(+), 29 deletions(-) create mode 100644 doc/source/subscription_confirm.rst create mode 100644 releasenotes/notes/webhook_subscription_confirmation-883cb7f325885ef0.yaml create mode 100644 samples/zaqar/subscriber_service_sample.py diff --git a/doc/source/subscription_confirm.rst b/doc/source/subscription_confirm.rst new file mode 100644 index 000000000..7042284c8 --- /dev/null +++ b/doc/source/subscription_confirm.rst @@ -0,0 +1,204 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + +============================== +The subscription Confirm Guide +============================== + +The subscription confirm feature now only support webhook with mongoDB backend. +This guide shows how to use this feature: + +1. Set the config option "require_confirmation" and add the policy to the +policy.json file. Then restart Zaqar-wsgi service:: + + In the config file: + [notification] + require_confirmation = True + + In the policy.json file: + "subscription:confirm": "", + +2. Create a subscription. + +Here used zaqar/samples/zaqar/subscriber_service_sample.py be the subscriber +endpoint for example.So before the step 2, you should start the subscriber +service first. +The service could be started simply by the command:: + + python zaqar/samples/zaqar/subscriber_service_sample.py +The service's default port is 5678. If you want to use a new port, the command +will be like:: + + python zaqar/samples/zaqar/subscriber_service_sample.py new_port_number +The service will not confirm the subscription automatically by default. If you +want to do that, the command will be like:: + + python zaqar/samples/zaqar/subscriber_service_sample.py --auto-confirm + +Then create a subscription:: + + curl -i -X POST http://10.229.47.217:8888/v2/queues/test/subscriptions \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \ + -d '{"subscriber":"http://10.229.47.217:5678", "ttl":3600, "options":{}}' + +The response:: + + HTTP/1.1 201 Created + content-length: 47 + content-type: application/json; charset=UTF-8 + location: http://10.229.47.217:8888/v2/queues/test/subscriptions + Connection: close + {"subscription_id": "576256b03990b480617b4063"} + +At the same time, If the subscriber sample service is not start by +"--auto confirm", you will receive a POST request in the subscriber sample +service, the request is like:: + + WARNING:root:{"UnsubscribeBody": {"confirmed": false}, "URL-Methods": "PUT", + "X-Project-ID": "51be2c72393e457ebf0a22a668e10a64", + "URL-Paths": "/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm", + "URL-Expires": "2016-07-06T04:35:56", "queue_name": "test", + "SubscribeURL": ["/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm"], + "SubscribeBody": {"confirmed": true}, + "URL-Signature": "d4038a40589cdb61cd13d5a6997472f5be779db441dd8fe0c597a6e465f30c41", + "Message": "You have chosen to subscribe to the queue: test", + "Message_Type": "SubscriptionConfirmation"} + 10.229.47.217 - - [06/Jul/2016 11:35:56] "POST / HTTP/1.1" 200 - +If you start the sample service with "--auto confirm", please go to step 6 +directly, because the step 5 will be done by the service automatically. + +3. Get the subscription. +The request:: + + curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" + +The response:: + + HTTP/1.1 200 OK + content-length: 154 + content-type: application/json; charset=UTF-8 + Connection: close + {"confirmed": false, "age": 73, "id": "576256b03990b480617b4063", + "subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600, "options": {}} + +You can find that the "confirmed" property is false by default. + +4. Post a message to the subscription's queue +The request:: + + curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \ + -d '{"messages": [{"ttl": 3600,"body": "test123"}]}' + +The response:: + + HTTP/1.1 201 Created + content-length: 68 + content-type: application/json; charset=UTF-8 + location: http://10.229.47.217:8888/v2/queues/test/messages?ids=57624dee3990b4634d71bb4a + Connection: close + {"resources": ["/v2/queues/test/messages/57624dee3990b4634d71bb4a"]} + +The subscriber received nothing and you will find a log info in zaqar-wsgi.:: + + 2016-07-06 11:37:57.929 98400 INFO zaqar.notification.notifier + [(None,)2473911afe2642c0b74d7e1200d9bba7 51be2c72393e457ebf0a22a668e10a64 - - -] + The subscriber http://10.229.47.217:5678 is not confirmed. + +5. Use the information showed in step3 to confirm the subscription +The request:: + + curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \ + -H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \ + -H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \ + -H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": true}' + +The response:: + + HTTP/1.1 204 No Content + location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm + Connection: close + +6. Repeat step3 to get the subscription +The request:: + + curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" + +The response:: + + HTTP/1.1 200 OK + content-length: 155 + content-type: application/json; charset=UTF-8 + Connection: close + {"confirmed": true, "age": 1370, "id": "576256b03990b480617b4063", + "subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600, + "options": {}} + +The subscription is confirmed now. + +7. Repeat step4 to post a new message. +The request:: + + curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \ + -d '{"messages": [{"ttl": 3600,"body": "test123"}]}' + +The response:: + + HTTP/1.1 201 Created + content-length: 68 + content-type: application/json; charset=UTF-8 + location: http://10.229.47.217:8888/v2/queues/test/messages?ids=5762526d3990b474c80d5483 + Connection: close + {"resources": ["/v2/queues/test/messages/5762526d3990b474c80d5483"]} + +Then in subscriber sample service, you will receive a request:: + + WARNING:root:{"body": {"event": "BackupStarted"}, "queue_name": "test", + "Message_Type": "Notification", "ttl": 3600} + 10.229.47.217 - - [06/Jul/2016 13:19:07] "POST / HTTP/1.1" 200 - + +8. Unsubscription. +The request:: + + curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \ + -H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \ + -H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \ + -H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": false}' + +The response:: + + HTTP/1.1 204 No Content + location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm + Connection: close + +Then try to post a message. The subscriber will not receive the notification +any more. diff --git a/etc/policy.json.sample b/etc/policy.json.sample index 96d673376..89d507675 100644 --- a/etc/policy.json.sample +++ b/etc/policy.json.sample @@ -27,6 +27,7 @@ "subscription:get": "", "subscription:delete": "", "subscription:update": "", + "subscription:confirm": "", "pools:get_all": "rule:context_is_admin", "pools:create": "rule:context_is_admin", diff --git a/releasenotes/notes/webhook_subscription_confirmation-883cb7f325885ef0.yaml b/releasenotes/notes/webhook_subscription_confirmation-883cb7f325885ef0.yaml new file mode 100644 index 000000000..05e190007 --- /dev/null +++ b/releasenotes/notes/webhook_subscription_confirmation-883cb7f325885ef0.yaml @@ -0,0 +1,9 @@ +--- +features: + - Now before users send messages to subscribers through a queue, the + subscribers should be confirmed first. Zaqar only sends messages to the + confirmed subscribers. This feature supports "webhook" and "mailto" + subscribers with mongoDB or redis backend. The "mailto" part will be done + in O cycle. Set "require_confirmation = True" to enable this feature. The + default value is "False" now and we will enable it by default after one or + two cycles. diff --git a/samples/zaqar/subscriber_service_sample.py b/samples/zaqar/subscriber_service_sample.py new file mode 100644 index 000000000..58127cd6e --- /dev/null +++ b/samples/zaqar/subscriber_service_sample.py @@ -0,0 +1,79 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +import json +import logging +import requests +import sys +import uuid + +try: + import SimpleHTTPServer + import SocketServer +except Exception: + from http import server as SimpleHTTPServer + import socketserver as SocketServer + + +_AUTO_CONFIRM = False +for arg in sys.argv: + if arg == '--auto-confirm': + _AUTO_CONFIRM = True + sys.argv.remove(arg) + break + +if len(sys.argv) > 2: + PORT = int(sys.argv[2]) +elif len(sys.argv) > 1: + PORT = int(sys.argv[1]) +else: + PORT = 5678 + + +class ServerHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): + """This is the sample service for wsgi subscription. + + """ + + # TODO(wangxiyuan): support websocket. + def do_POST(self): + logging.warning('=================== POST =====================') + data_string = str( + self.rfile.read(int(self.headers['Content-Length']))) + self.data = json.loads(data_string) + if _AUTO_CONFIRM: + self._send_confirm_request() + message = 'OK' + self.send_response(200) + self.end_headers() + self.wfile.write(message) + logging.warning(self.headers) + logging.warning(self.data) + return + + def _send_confirm_request(self): + url = self.data['WSGISubscribeURL'] + headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'X-Project-ID': self.data['X-Project-ID'], + 'Client-ID': str(uuid.uuid4()), + 'URL-Methods': self.data['URL-Methods'], + 'URL-Signature': self.data['URL-Signature'], + 'URL-Paths': self.data['URL-Paths'], + 'URL-Expires': self.data['URL-Expires'], + } + data = {'confirmed': True} + requests.put(url=url, data=json.dumps(data), headers=headers) + +Handler = ServerHandler +httpd = SocketServer.TCPServer(("", PORT), Handler) +httpd.serve_forever() diff --git a/zaqar/common/auth.py b/zaqar/common/auth.py index 3ab9196ab..c593b0419 100644 --- a/zaqar/common/auth.py +++ b/zaqar/common/auth.py @@ -21,7 +21,10 @@ from oslo_config import cfg PASSWORD_PLUGIN = 'password' TRUSTEE_CONF_GROUP = 'trustee' +KEYSTONE_AUTHTOKEN_GROUP = 'keystone_authtoken' loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP) +loading.register_auth_conf_options(cfg.CONF, KEYSTONE_AUTHTOKEN_GROUP) +_ZAQAR_ENDPOINTS = {} def _config_options(): @@ -39,9 +42,9 @@ def get_trusted_token(trust_id): return trust_session.auth.get_access(trust_session).auth_token -def _get_admin_session(): +def _get_admin_session(conf_group): auth_plugin = loading.load_auth_from_conf_options( - cfg.CONF, TRUSTEE_CONF_GROUP) + cfg.CONF, conf_group) return session.Session(auth=auth_plugin) @@ -53,7 +56,7 @@ def _get_user_client(auth_plugin): def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles, expires_at): """Create a trust with the given user for the configured trustee user.""" - admin_session = _get_admin_session() + admin_session = _get_admin_session(TRUSTEE_CONF_GROUP) trustee_user_id = admin_session.get_user_id() client = _get_user_client(auth_plugin) @@ -64,3 +67,29 @@ def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles, role_names=roles, expires_at=expires_at) return trust.id + + +def get_public_endpoint(): + """Get Zaqar's public endpoint from keystone""" + global _ZAQAR_ENDPOINTS + + if _ZAQAR_ENDPOINTS: + return _ZAQAR_ENDPOINTS + + zaqar_session = _get_admin_session(KEYSTONE_AUTHTOKEN_GROUP) + auth = zaqar_session.auth + if not auth: + return _ZAQAR_ENDPOINTS + + catalogs = auth.get_auth_ref(zaqar_session).service_catalog + try: + _ZAQAR_ENDPOINTS['zaqar'] = catalogs.url_for(service_name='zaqar') + except Exception: + pass + try: + _ZAQAR_ENDPOINTS['zaqar-websocket'] = catalogs.url_for( + service_name='zaqar-websocket') + except Exception: + pass + + return _ZAQAR_ENDPOINTS diff --git a/zaqar/common/configs.py b/zaqar/common/configs.py index 4550acb36..204f7282e 100644 --- a/zaqar/common/configs.py +++ b/zaqar/common/configs.py @@ -59,7 +59,10 @@ _NOTIFICATION_OPTIONS = ( help=('The command of smtp to send email. The format is ' '"command_name arg1 arg2".')), cfg.IntOpt('max_notifier_workers', default=10, - help='The max amount of the notification workers.') + help='The max amount of the notification workers.'), + cfg.BoolOpt('require_confirmation', default=False, + help='Whether the http/https/email subscription need to be ' + 'confirmed before notification.'), ) _NOTIFICATION_GROUP = 'notification' diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py index 45238da99..de9283631 100644 --- a/zaqar/notification/notifier.py +++ b/zaqar/notification/notifier.py @@ -13,18 +13,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +import enum from stevedore import driver import futurist from oslo_log import log as logging from six.moves import urllib_parse +from zaqar.common import auth +from zaqar.common import urls from zaqar.i18n import _LE +from zaqar.i18n import _LI from zaqar.storage import pooling LOG = logging.getLogger(__name__) +@enum.unique +class MessageType(enum.IntEnum): + """Enum of message type.""" + SubscriptionConfirmation = 1 + UnsubscribeConfirmation = 2 + Notification = 3 + + class NotifierDriver(object): """Notifier which is responsible for sending messages to subscribers. @@ -34,6 +46,7 @@ class NotifierDriver(object): self.subscription_controller = kwargs.get('subscription_controller') max_workers = kwargs.get('max_notifier_workers', 10) self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers) + self.require_confirmation = kwargs.get('require_confirmation', False) def post(self, queue_name, messages, client_uuid, project=None): """Send messages to the subscribers.""" @@ -48,14 +61,76 @@ class NotifierDriver(object): LOG.debug("Notifying subscriber %r" % (sub,)) s_type = urllib_parse.urlparse( sub['subscriber']).scheme - data_driver = self.subscription_controller.driver - mgr = driver.DriverManager('zaqar.notification.tasks', - s_type, - invoke_on_load=True) - self.executor.submit(mgr.driver.execute, sub, messages, - conf=data_driver.conf) + # If the subscriber doesn't contain 'confirmed', it + # means that this kind of subscriber was created before + # the confirm feature be introduced into Zaqar. We + # should allow them be subscribed. + if (self.require_confirmation and + not sub.get('confirmed', True)): + LOG.info(_LI('The subscriber %s is not ' + 'confirmed.'), sub['subscriber']) + continue + for msg in messages: + msg['Message_Type'] = MessageType.Notification.name + self._execute(s_type, sub, messages) marker = next(subscribers) if not marker: break else: LOG.error(_LE('Failed to get subscription controller.')) + + def send_confirm_notification(self, queue, subscription, conf, + project=None, expires=None, + api_version=None): + key = conf.signed_url.secret_key + if not key: + LOG.error(_LE("Can't send confirm notification due to the value of" + " secret_key option is None")) + return + url = "/%s/queues/%s/subscriptions/%s/confirm" % (api_version, queue, + subscription['id']) + pre_url = urls.create_signed_url(key, [url], project=project, + expires=expires, methods=['PUT']) + message_type = MessageType.SubscriptionConfirmation.name + + messages = {} + endpoint_dict = auth.get_public_endpoint() + if endpoint_dict: + wsgi_endpoint = endpoint_dict.get('zaqar', None) + if wsgi_endpoint: + wsgi_subscribe_url = urllib_parse.urljoin( + wsgi_endpoint, url) + messages['WSGISubscribeURL'] = wsgi_subscribe_url + websocket_endpoint = endpoint_dict.get('zaqar-websocket', None) + if websocket_endpoint: + websocket_subscribe_url = urllib_parse.urljoin( + websocket_endpoint, url) + messages['WebSocketSubscribeURL'] = websocket_subscribe_url + messages.update({'Message_Type': message_type, + 'Message': 'You have chosen to subscribe to the ' + 'queue: %s' % queue, + 'URL-Signature': pre_url['signature'], + 'URL-Methods': pre_url['methods'][0], + 'URL-Paths': pre_url['paths'][0], + 'X-Project-ID': pre_url['project'], + 'URL-Expires': pre_url['expires'], + 'SubscribeBody': {'confirmed': True}, + 'UnsubscribeBody': {'confirmed': False}}) + s_type = urllib_parse.urlparse(subscription['subscriber']).scheme + LOG.info(_LI('Begin to send %(type)s confirm notification. The request' + 'body is %(messages)s'), + {'type': s_type, 'messages': messages}) + + self._execute(s_type, subscription, [messages], conf) + + def _execute(self, s_type, subscription, messages, conf=None): + if self.subscription_controller: + data_driver = self.subscription_controller.driver + conf = data_driver.conf + else: + conf = conf + mgr = driver.DriverManager('zaqar.notification.tasks', + s_type, + invoke_on_load=True) + self.executor.submit(mgr.driver.execute, subscription, messages, + conf=conf) diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index b8c01da0d..d9f872e00 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -48,6 +48,7 @@ class SubscriptionController(base.Subscription): 'e': expires: datetime.datetime 'o': options :: dict 'p': project :: six.text_type + 'c': confirmed :: boolean """ def __init__(self, *args, **kwargs): @@ -70,7 +71,7 @@ class SubscriptionController(base.Subscription): if marker is not None: query['_id'] = {'$gt': utils.to_oid(marker)} - projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} + projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1, 'c': 1} cursor = self._collection.find(query, projection=projection) cursor = cursor.limit(limit).sort('_id') @@ -103,6 +104,7 @@ class SubscriptionController(base.Subscription): now = timeutils.utcnow_ts() now_dt = datetime.datetime.utcfromtimestamp(now) expires = now_dt + datetime.timedelta(seconds=ttl) + confirmed = False try: subscription_id = self._collection.insert({'s': source, @@ -110,7 +112,8 @@ class SubscriptionController(base.Subscription): 't': ttl, 'e': expires, 'o': options, - 'p': project}) + 'p': project, + 'c': confirmed}) return subscription_id except pymongo.errors.DuplicateKeyError: return None @@ -153,6 +156,22 @@ class SubscriptionController(base.Subscription): self._collection.remove({'_id': utils.to_oid(subscription_id), 'p': project}, w=0) + @utils.raises_conn_error + def get_with_subscriber(self, queue, subscriber, project=None): + res = self._collection.find_one({'u': subscriber, + 'p': project}) + now = timeutils.utcnow_ts() + return _basic_subscription(res, now) + + @utils.raises_conn_error + def confirm(self, queue, subscription_id, project=None, confirm=True): + + res = self._collection.update({'_id': utils.to_oid(subscription_id), + 'p': project}, {'$set': {'c': confirm}}, + upsert=False) + if not res['updatedExisting']: + raise errors.SubscriptionDoesNotExist(subscription_id) + def _basic_subscription(record, now): # NOTE(Eva-i): unused here record's field 'e' (expires) has changed it's @@ -161,11 +180,13 @@ def _basic_subscription(record, now): # starting using 'e' field should make sure support both of the formats. oid = record['_id'] age = now - utils.oid_ts(oid) + confirmed = record.get('c', True) return { 'id': str(oid), 'source': record['s'], 'subscriber': record['u'], 'ttl': record['t'], 'age': int(age), - 'options': record['o'] + 'options': record['o'], + 'confirmed': confirmed, } diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index 15e3417d7..55d4af08c 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -150,7 +150,9 @@ class DataDriver(base.DataDriverBase): kwargs = {'subscription_controller': self._storage.subscription_controller, 'max_notifier_workers': - self.conf.notification.max_notifier_workers} + self.conf.notification.max_notifier_workers, + 'require_confirmation': + self.conf.notification.require_confirmation} stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) stages.append(self._storage.message_controller) return common.Pipeline(stages) diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index a48f72833..bf771c6ed 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -409,6 +409,17 @@ class SubscriptionController(storage.Subscription): return control.exists(queue, subscription_id, project=project) + def confirm(self, queue, subscription_id, project=None, confirm=None): + control = self._get_controller(queue, project) + if control: + return control.confirm(queue, subscription_id, + project=project, confirm=confirm) + + def get_with_subscriber(self, queue, subscriber, project=None): + control = self._get_controller(queue, project) + if control: + return control.get_with_subscriber(queue, subscriber, project) + class Catalog(object): """Represents the mapping between queues and pool drivers.""" diff --git a/zaqar/tests/base.py b/zaqar/tests/base.py index 436fc5447..a4824f8db 100644 --- a/zaqar/tests/base.py +++ b/zaqar/tests/base.py @@ -58,6 +58,10 @@ class TestBase(testtools.TestCase): group=configs._DRIVER_GROUP) self.conf.register_opts(configs._NOTIFICATION_OPTIONS, group=configs._NOTIFICATION_GROUP) + self.conf.register_opts(configs._NOTIFICATION_OPTIONS, + group=configs._NOTIFICATION_GROUP) + self.conf.register_opts(configs._SIGNED_URL_OPTIONS, + group=configs._SIGNED_URL_GROUP) self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL', 'mongodb://127.0.0.1:27017') diff --git a/zaqar/tests/etc/policy.json b/zaqar/tests/etc/policy.json index 96d673376..89d507675 100644 --- a/zaqar/tests/etc/policy.json +++ b/zaqar/tests/etc/policy.json @@ -27,6 +27,7 @@ "subscription:get": "", "subscription:delete": "", "subscription:update": "", + "subscription:confirm": "", "pools:get_all": "rule:context_is_admin", "pools:create": "rule:context_is_admin", diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py index 303af388e..ea4232617 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py @@ -115,10 +115,15 @@ class TestSubscriptions(base.BaseV2MessagingTest): if not test.call_until_true( lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1): self.fail("Couldn't get messages") - messages = self.list_messages(sub_queue) + _, body = self.list_messages(sub_queue) expected = message_body['messages'][0] expected['queue_name'] = self.queue_name - self.assertEqual(expected, messages[1]['messages'][0]['body']) + expected['Message_Type'] = 'Notification' + for message in body['messages']: + # There are two message in the queue. One is the confirm message, + # the other one is the notification. + if message['body']['Message_Type'] == 'Notification': + self.assertEqual(expected, message['body']) @classmethod def resource_cleanup(cls): diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py index 2e623a667..1d6b09780 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py @@ -17,6 +17,7 @@ import uuid from tempest import config from tempest.lib.common.utils import data_utils +from tempest.lib import decorators from tempest.lib import exceptions as lib_exc from tempest import test @@ -45,6 +46,10 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest): # Create Subscriptions + # TODO(wangxiyuan): Now the subscription confirmation feature only support + # mongoDB backend. Skip this test until the feature support the redis + # backend. Then rewrite it. + @decorators.skip_because(bug='1609596') @test.attr(type=['negative']) @test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74') def test_create_subscriptions_with_duplicate_subscriber(self): diff --git a/zaqar/tests/unit/notification/test_notifier.py b/zaqar/tests/unit/notification/test_notifier.py index eb11ce647..efc12f137 100644 --- a/zaqar/tests/unit/notification/test_notifier.py +++ b/zaqar/tests/unit/notification/test_notifier.py @@ -43,14 +43,17 @@ class NotifierTest(testing.TestBase): "body": {"event": "BackupStarted", "backup_id": "c378813c-3f0b-11e2-ad92"}, - "queue_name": "fake_queue" + "queue_name": "fake_queue", + "Message_Type": "Notification" }, {"body": {"event": "BackupProgress", "current_bytes": "0", "total_bytes": "99614720"}, - "queue_name": "fake_queue" + "queue_name": "fake_queue", + "Message_Type": "Notification" } ] + self.api_version = 'v2' def test_webhook(self): subscription = [{'subscriber': 'http://trigger_me', @@ -257,3 +260,41 @@ class NotifierTest(testing.TestBase): self.assertEqual(2, mock_post.call_count) self.assertEqual(self.notifications[1], json.loads(mock_post.call_args[1]['data'])) + + @mock.patch('requests.post') + def test_send_confirm_notification(self, mock_request): + subscription = {'id': '5760c9fb3990b42e8b7c20bd', + 'subscriber': 'http://trigger_me', + 'source': 'fake_queue', + 'options': {}} + ctlr = mock.MagicMock() + ctlr.list = mock.Mock(return_value=subscription) + driver = notifier.NotifierDriver(subscription_controller=ctlr) + self.conf.signed_url.secret_key = 'test_key' + driver.send_confirm_notification('test_queue', subscription, self.conf, + str(self.project), + api_version=self.api_version) + driver.executor.shutdown() + + self.assertEqual(1, mock_request.call_count) + expect_args = ['SubscribeBody', 'queue_name', 'URL-Methods', + 'X-Project-ID', 'URL-Signature', 'URL-Paths', 'Message', + 'URL-Expires', 'Message_Type', 'WSGISubscribeURL', + 'WebSocketSubscribeURL' 'UnsubscribeBody'] + actual_args = json.loads(mock_request.call_args[1]['data']).keys() + self.assertEqual(expect_args.sort(), + list(actual_args).sort()) + + @mock.patch('requests.post') + def test_send_confirm_notification_without_signed_url(self, mock_request): + subscription = [{'subscriber': 'http://trigger_me', + 'source': 'fake_queue', 'options': {}}] + ctlr = mock.MagicMock() + ctlr.list = mock.Mock(return_value=iter([subscription, {}])) + driver = notifier.NotifierDriver(subscription_controller=ctlr) + + driver.send_confirm_notification('test_queue', subscription, self.conf, + str(self.project), self.api_version) + driver.executor.shutdown() + + self.assertEqual(0, mock_request.call_count) diff --git a/zaqar/tests/unit/storage/test_impl_mongodb.py b/zaqar/tests/unit/storage/test_impl_mongodb.py index cb340d6d4..abc635931 100644 --- a/zaqar/tests/unit/storage/test_impl_mongodb.py +++ b/zaqar/tests/unit/storage/test_impl_mongodb.py @@ -495,6 +495,34 @@ class MongodbSubscriptionTests(MongodbSetupMixin, controller_class = controllers.SubscriptionController control_driver_class = mongodb.ControlDriver + def test_confirm(self): + s_id = self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + self.addCleanup(self.subscription_controller.delete, self.source, + s_id, self.project) + subscription = self.subscription_controller.get(self.source, s_id, + project=self.project) + + self.assertEqual(False, subscription['confirmed']) + + self.subscription_controller.confirm(self.source, s_id, + project=self.project, + confirm=True) + subscription = self.subscription_controller.get(self.source, s_id, + project=self.project) + + self.assertEqual(True, subscription['confirmed']) + + def test_confirm_with_nonexist_subscription(self): + s_id = 'fake-id' + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.confirm, + self.source, s_id, project=self.project, confirm=True + ) + # # TODO(kgriffs): Do these need database purges as well as those above? diff --git a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py index 5ac39b81d..7e400439b 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py @@ -236,7 +236,8 @@ class SubscriptionTest(base.V1_1Base): 'source': 'kitkat', 'options': {}, 'id': str(sub), - 'ttl': 600}, + 'ttl': 600, + 'confirmed': False}, 'headers': {'status': 200}, 'request': {'action': 'subscription_get', 'body': {'queue_name': 'kitkat', @@ -273,7 +274,8 @@ class SubscriptionTest(base.V1_1Base): 'source': 'kitkat', 'options': {}, 'id': str(sub), - 'ttl': 600}]}, + 'ttl': 600, + 'confirmed': False}]}, 'headers': {'status': 200}, 'request': {'action': 'subscription_list', 'body': {'queue_name': 'kitkat'}, @@ -346,7 +348,8 @@ class SubscriptionTest(base.V1_1Base): ws_notification = msgpack.unpackb(sender.call_args_list[2][0][0], encoding='utf-8') self.assertEqual({'body': {'status': 'disco queen'}, 'ttl': 60, - 'queue_name': 'kitkat'}, ws_notification) + 'queue_name': 'kitkat', + 'Message_Type': u'Notification'}, ws_notification) def test_list_returns_503_on_nopoolfound_exception(self): sub = self.boot.storage.subscription_controller.create( diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py index 0f929601f..65eb690de 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py @@ -20,6 +20,7 @@ import mock from oslo_serialization import jsonutils from zaqar.common import auth +from zaqar.notification import notifier from zaqar.storage import errors as storage_errors from zaqar import tests as testing from zaqar.tests.unit.transport.wsgi import base @@ -57,6 +58,11 @@ class TestSubscriptionsMongoDB(base.V2Base): self.subscription_path = (self.url_prefix + '/queues/' + self.queue + '/subscriptions') + self.subscription = 'fake-id' + self.confirm_path = (self.url_prefix + '/queues/' + self.queue + + '/subscriptions/' + self.subscription + + '/confirm') + self.conf.signed_url.secret_key = 'test_key' def tearDown(self): resp = self.simulate_get(self.subscription_path, @@ -95,6 +101,44 @@ class TestSubscriptionsMongoDB(base.V2Base): self._create_subscription(subscriber='http://CCC.com') self.assertEqual(falcon.HTTP_201, self.srmock.status) + # the subscription is not confirmed, So the second request will + # retry confirm and return 201 again. + self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + @mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification') + def test_create_and_send_notification(self, mock_send_confirm): + self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(1, mock_send_confirm.call_count) + + @mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification') + def test_recreate(self, mock_send_confirm): + resp = self._create_subscription(subscriber='http://CCC.com') + resp_doc = jsonutils.loads(resp[0]) + s_id1 = resp_doc['subscription_id'] + self.assertEqual(1, mock_send_confirm.call_count) + + resp = self._create_subscription(subscriber='http://CCC.com') + resp_doc = jsonutils.loads(resp[0]) + s_id2 = resp_doc['subscription_id'] + self.assertEqual(2, mock_send_confirm.call_count) + + self.assertEqual(s_id1, s_id2) + + @mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification') + def test_recreate_after_confirmed(self, mock_send_confirm): + resp = self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + doc = '{"confirmed": true}' + resp_doc = jsonutils.loads(resp[0]) + confirm_path = (self.url_prefix + '/queues/' + self.queue + + '/subscriptions/' + resp_doc['subscription_id'] + + '/confirm') + self.simulate_put(confirm_path, body=doc, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + self.assertEqual(1, mock_send_confirm.call_count) + self._create_subscription(subscriber='http://CCC.com') self.assertEqual(falcon.HTTP_409, self.srmock.status) @@ -353,3 +397,35 @@ class TestSubscriptionsMongoDB(base.V2Base): options = resp_list_doc['subscriptions'][0]['options'] self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options) + + def test_confirm(self): + doc = '{"confirmed": true}' + resp = self._create_subscription() + resp_doc = jsonutils.loads(resp[0]) + confirm_path = (self.url_prefix + '/queues/' + self.queue + + '/subscriptions/' + resp_doc['subscription_id'] + + '/confirm') + self.simulate_put(confirm_path, body=doc, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + def test_confirm_with_invalid_body(self): + doc = '{confirmed:123}' + resp = self.simulate_put(self.confirm_path, body=doc, + headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + resp_doc = jsonutils.loads(resp[0]) + self.assertIn('body could not be parsed', resp_doc['description']) + + def test_confirm_without_boolean_body(self): + doc = '{"confirmed":123}' + resp = self.simulate_put(self.confirm_path, body=doc, + headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + resp_doc = jsonutils.loads(resp[0]) + self.assertEqual("The 'confirmed' should be boolean.", + resp_doc['description']) + + def test_confirm_with_non_subscription(self): + doc = '{"confirmed": true}' + self.simulate_put(self.confirm_path, body=doc, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index dd539f3aa..75d1bb76e 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -540,6 +540,12 @@ class Validator(object): except OverflowError: raise ValidationFailed(msg, datetime.datetime.max) + def subscription_confirming(self, confirm): + confirm = confirm.get('confirmed', None) + if not isinstance(confirm, bool): + msg = _(u"The 'confirmed' should be boolean.") + raise ValidationFailed(msg) + def subscription_listing(self, limit=None, **kwargs): """Restrictions involving a list of subscriptions. diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index f5c3eec07..310326584 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -101,12 +101,17 @@ def public_endpoints(driver, conf): subscriptions.CollectionResource(driver._validate, subscription_controller, defaults.subscription_ttl, - queue_controller)), + queue_controller, + conf)), ('/queues/{queue_name}/subscriptions/{subscription_id}', subscriptions.ItemResource(driver._validate, subscription_controller)), + ('/queues/{queue_name}/subscriptions/{subscription_id}/confirm', + subscriptions.ConfirmResource(driver._validate, + subscription_controller)), + # Pre-Signed URL Endpoint ('/queues/{queue_name}/share', urls.Resource(driver)), ] diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 6b65ecd51..7690a47cc 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -12,15 +12,18 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import falcon from oslo_log import log as logging from oslo_utils import netutils +from oslo_utils import timeutils import six from stevedore import driver from zaqar.common import decorators from zaqar.i18n import _ +from zaqar.notification import notifier from zaqar.storage import errors as storage_errors from zaqar.transport import acl from zaqar.transport import utils @@ -111,14 +114,17 @@ class ItemResource(object): class CollectionResource(object): __slots__ = ('_subscription_controller', '_validate', - '_default_subscription_ttl', '_queue_controller') + '_default_subscription_ttl', '_queue_controller', + '_conf', '_notification') def __init__(self, validate, subscription_controller, - default_subscription_ttl, queue_controller): + default_subscription_ttl, queue_controller, conf): self._subscription_controller = subscription_controller self._validate = validate self._default_subscription_ttl = default_subscription_ttl self._queue_controller = queue_controller + self._conf = conf + self._notification = notifier.NotifierDriver() @decorators.TransportLog("Subscription collection") @acl.enforce("subscription:get_all") @@ -192,7 +198,6 @@ class CollectionResource(object): ttl, options, project=project_id) - except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) @@ -201,13 +206,80 @@ class CollectionResource(object): description = _(u'Subscription could not be created.') raise wsgi_errors.HTTPServiceUnavailable(description) + now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) + expires = now_dt + datetime.timedelta(seconds=ttl) + api_version = req.path.split('/')[1] if created: + subscription = self._subscription_controller.get(queue_name, + created, + project_id) + # send confirm notification + self._notification.send_confirm_notification( + queue_name, subscription, self._conf, project_id, + str(expires), api_version) + resp.location = req.path resp.status = falcon.HTTP_201 resp.body = utils.to_json( {'subscription_id': six.text_type(created)}) else: - description = _(u'Such subscription already exists. Subscriptions ' - u'are unique by project + queue + subscriber URI.') - raise wsgi_errors.HTTPConflict(description, headers={'location': - req.path}) + subscription = self._subscription_controller.get_with_subscriber( + queue_name, subscriber, project_id) + confirmed = subscription.get('confirmed', True) + if confirmed: + description = _(u'Such subscription already exists.' + u'Subscriptions are unique by project + queue ' + u'+ subscriber URI.') + raise wsgi_errors.HTTPConflict(description, + headers={'location': req.path}) + else: + # The subscription is not confirmed, re-send confirm + # notification + self._notification.send_confirm_notification( + queue_name, subscription, self._conf, project_id, + str(expires), api_version) + + resp.location = req.path + resp.status = falcon.HTTP_201 + resp.body = utils.to_json( + {'subscription_id': six.text_type(subscription['id'])}) + + +class ConfirmResource(object): + + __slots__ = ('_subscription_controller', '_validate') + + def __init__(self, validate, subscription_controller): + self._subscription_controller = subscription_controller + self._validate = validate + + @decorators.TransportLog("Confirm Subscription") + @acl.enforce("subscription:confirm") + def on_put(self, req, resp, project_id, queue_name, subscription_id): + if req.content_length: + document = wsgi_utils.deserialize(req.stream, req.content_length) + else: + document = {} + + try: + self._validate.subscription_confirming(document) + confirm = document.get('confirmed', None) + self._subscription_controller.confirm(queue_name, subscription_id, + project=project_id, + confirm=confirm) + resp.status = falcon.HTTP_204 + resp.location = req.path + except storage_errors.SubscriptionDoesNotExist as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPNotFound(six.text_type(ex)) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + except Exception as ex: + LOG.exception(ex) + description = (_(u'Subscription %(subscription_id)s could not be' + ' confirmed.') % + dict(subscription_id=subscription_id)) + raise falcon.HTTPBadRequest(_('Unable to confirm subscription'), + description)