diff --git a/doc/source/subscription_confirm.rst b/doc/source/subscription_confirm.rst new file mode 100644 index 000000000..7042284c8 --- /dev/null +++ b/doc/source/subscription_confirm.rst @@ -0,0 +1,204 @@ +.. + Licensed under the Apache License, Version 2.0 (the "License"); you may + not use this file except in compliance with the License. You may obtain + a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + +============================== +The subscription Confirm Guide +============================== + +The subscription confirm feature now only support webhook with mongoDB backend. +This guide shows how to use this feature: + +1. Set the config option "require_confirmation" and add the policy to the +policy.json file. Then restart Zaqar-wsgi service:: + + In the config file: + [notification] + require_confirmation = True + + In the policy.json file: + "subscription:confirm": "", + +2. Create a subscription. + +Here used zaqar/samples/zaqar/subscriber_service_sample.py be the subscriber +endpoint for example.So before the step 2, you should start the subscriber +service first. +The service could be started simply by the command:: + + python zaqar/samples/zaqar/subscriber_service_sample.py +The service's default port is 5678. If you want to use a new port, the command +will be like:: + + python zaqar/samples/zaqar/subscriber_service_sample.py new_port_number +The service will not confirm the subscription automatically by default. If you +want to do that, the command will be like:: + + python zaqar/samples/zaqar/subscriber_service_sample.py --auto-confirm + +Then create a subscription:: + + curl -i -X POST http://10.229.47.217:8888/v2/queues/test/subscriptions \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \ + -d '{"subscriber":"http://10.229.47.217:5678", "ttl":3600, "options":{}}' + +The response:: + + HTTP/1.1 201 Created + content-length: 47 + content-type: application/json; charset=UTF-8 + location: http://10.229.47.217:8888/v2/queues/test/subscriptions + Connection: close + {"subscription_id": "576256b03990b480617b4063"} + +At the same time, If the subscriber sample service is not start by +"--auto confirm", you will receive a POST request in the subscriber sample +service, the request is like:: + + WARNING:root:{"UnsubscribeBody": {"confirmed": false}, "URL-Methods": "PUT", + "X-Project-ID": "51be2c72393e457ebf0a22a668e10a64", + "URL-Paths": "/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm", + "URL-Expires": "2016-07-06T04:35:56", "queue_name": "test", + "SubscribeURL": ["/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm"], + "SubscribeBody": {"confirmed": true}, + "URL-Signature": "d4038a40589cdb61cd13d5a6997472f5be779db441dd8fe0c597a6e465f30c41", + "Message": "You have chosen to subscribe to the queue: test", + "Message_Type": "SubscriptionConfirmation"} + 10.229.47.217 - - [06/Jul/2016 11:35:56] "POST / HTTP/1.1" 200 - +If you start the sample service with "--auto confirm", please go to step 6 +directly, because the step 5 will be done by the service automatically. + +3. Get the subscription. +The request:: + + curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" + +The response:: + + HTTP/1.1 200 OK + content-length: 154 + content-type: application/json; charset=UTF-8 + Connection: close + {"confirmed": false, "age": 73, "id": "576256b03990b480617b4063", + "subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600, "options": {}} + +You can find that the "confirmed" property is false by default. + +4. Post a message to the subscription's queue +The request:: + + curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \ + -d '{"messages": [{"ttl": 3600,"body": "test123"}]}' + +The response:: + + HTTP/1.1 201 Created + content-length: 68 + content-type: application/json; charset=UTF-8 + location: http://10.229.47.217:8888/v2/queues/test/messages?ids=57624dee3990b4634d71bb4a + Connection: close + {"resources": ["/v2/queues/test/messages/57624dee3990b4634d71bb4a"]} + +The subscriber received nothing and you will find a log info in zaqar-wsgi.:: + + 2016-07-06 11:37:57.929 98400 INFO zaqar.notification.notifier + [(None,)2473911afe2642c0b74d7e1200d9bba7 51be2c72393e457ebf0a22a668e10a64 - - -] + The subscriber http://10.229.47.217:5678 is not confirmed. + +5. Use the information showed in step3 to confirm the subscription +The request:: + + curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \ + -H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \ + -H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \ + -H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": true}' + +The response:: + + HTTP/1.1 204 No Content + location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm + Connection: close + +6. Repeat step3 to get the subscription +The request:: + + curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" + +The response:: + + HTTP/1.1 200 OK + content-length: 155 + content-type: application/json; charset=UTF-8 + Connection: close + {"confirmed": true, "age": 1370, "id": "576256b03990b480617b4063", + "subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600, + "options": {}} + +The subscription is confirmed now. + +7. Repeat step4 to post a new message. +The request:: + + curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \ + -d '{"messages": [{"ttl": 3600,"body": "test123"}]}' + +The response:: + + HTTP/1.1 201 Created + content-length: 68 + content-type: application/json; charset=UTF-8 + location: http://10.229.47.217:8888/v2/queues/test/messages?ids=5762526d3990b474c80d5483 + Connection: close + {"resources": ["/v2/queues/test/messages/5762526d3990b474c80d5483"]} + +Then in subscriber sample service, you will receive a request:: + + WARNING:root:{"body": {"event": "BackupStarted"}, "queue_name": "test", + "Message_Type": "Notification", "ttl": 3600} + 10.229.47.217 - - [06/Jul/2016 13:19:07] "POST / HTTP/1.1" 200 - + +8. Unsubscription. +The request:: + + curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \ + -H "Content-type: application/json" \ + -H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \ + -H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \ + -H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \ + -H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \ + -H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": false}' + +The response:: + + HTTP/1.1 204 No Content + location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm + Connection: close + +Then try to post a message. The subscriber will not receive the notification +any more. diff --git a/etc/policy.json.sample b/etc/policy.json.sample index 96d673376..89d507675 100644 --- a/etc/policy.json.sample +++ b/etc/policy.json.sample @@ -27,6 +27,7 @@ "subscription:get": "", "subscription:delete": "", "subscription:update": "", + "subscription:confirm": "", "pools:get_all": "rule:context_is_admin", "pools:create": "rule:context_is_admin", diff --git a/releasenotes/notes/webhook_subscription_confirmation-883cb7f325885ef0.yaml b/releasenotes/notes/webhook_subscription_confirmation-883cb7f325885ef0.yaml new file mode 100644 index 000000000..05e190007 --- /dev/null +++ b/releasenotes/notes/webhook_subscription_confirmation-883cb7f325885ef0.yaml @@ -0,0 +1,9 @@ +--- +features: + - Now before users send messages to subscribers through a queue, the + subscribers should be confirmed first. Zaqar only sends messages to the + confirmed subscribers. This feature supports "webhook" and "mailto" + subscribers with mongoDB or redis backend. The "mailto" part will be done + in O cycle. Set "require_confirmation = True" to enable this feature. The + default value is "False" now and we will enable it by default after one or + two cycles. diff --git a/samples/zaqar/subscriber_service_sample.py b/samples/zaqar/subscriber_service_sample.py new file mode 100644 index 000000000..58127cd6e --- /dev/null +++ b/samples/zaqar/subscriber_service_sample.py @@ -0,0 +1,79 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy +# of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +import json +import logging +import requests +import sys +import uuid + +try: + import SimpleHTTPServer + import SocketServer +except Exception: + from http import server as SimpleHTTPServer + import socketserver as SocketServer + + +_AUTO_CONFIRM = False +for arg in sys.argv: + if arg == '--auto-confirm': + _AUTO_CONFIRM = True + sys.argv.remove(arg) + break + +if len(sys.argv) > 2: + PORT = int(sys.argv[2]) +elif len(sys.argv) > 1: + PORT = int(sys.argv[1]) +else: + PORT = 5678 + + +class ServerHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): + """This is the sample service for wsgi subscription. + + """ + + # TODO(wangxiyuan): support websocket. + def do_POST(self): + logging.warning('=================== POST =====================') + data_string = str( + self.rfile.read(int(self.headers['Content-Length']))) + self.data = json.loads(data_string) + if _AUTO_CONFIRM: + self._send_confirm_request() + message = 'OK' + self.send_response(200) + self.end_headers() + self.wfile.write(message) + logging.warning(self.headers) + logging.warning(self.data) + return + + def _send_confirm_request(self): + url = self.data['WSGISubscribeURL'] + headers = { + 'Accept': 'application/json', + 'Content-Type': 'application/json', + 'X-Project-ID': self.data['X-Project-ID'], + 'Client-ID': str(uuid.uuid4()), + 'URL-Methods': self.data['URL-Methods'], + 'URL-Signature': self.data['URL-Signature'], + 'URL-Paths': self.data['URL-Paths'], + 'URL-Expires': self.data['URL-Expires'], + } + data = {'confirmed': True} + requests.put(url=url, data=json.dumps(data), headers=headers) + +Handler = ServerHandler +httpd = SocketServer.TCPServer(("", PORT), Handler) +httpd.serve_forever() diff --git a/zaqar/common/auth.py b/zaqar/common/auth.py index 3ab9196ab..c593b0419 100644 --- a/zaqar/common/auth.py +++ b/zaqar/common/auth.py @@ -21,7 +21,10 @@ from oslo_config import cfg PASSWORD_PLUGIN = 'password' TRUSTEE_CONF_GROUP = 'trustee' +KEYSTONE_AUTHTOKEN_GROUP = 'keystone_authtoken' loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP) +loading.register_auth_conf_options(cfg.CONF, KEYSTONE_AUTHTOKEN_GROUP) +_ZAQAR_ENDPOINTS = {} def _config_options(): @@ -39,9 +42,9 @@ def get_trusted_token(trust_id): return trust_session.auth.get_access(trust_session).auth_token -def _get_admin_session(): +def _get_admin_session(conf_group): auth_plugin = loading.load_auth_from_conf_options( - cfg.CONF, TRUSTEE_CONF_GROUP) + cfg.CONF, conf_group) return session.Session(auth=auth_plugin) @@ -53,7 +56,7 @@ def _get_user_client(auth_plugin): def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles, expires_at): """Create a trust with the given user for the configured trustee user.""" - admin_session = _get_admin_session() + admin_session = _get_admin_session(TRUSTEE_CONF_GROUP) trustee_user_id = admin_session.get_user_id() client = _get_user_client(auth_plugin) @@ -64,3 +67,29 @@ def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles, role_names=roles, expires_at=expires_at) return trust.id + + +def get_public_endpoint(): + """Get Zaqar's public endpoint from keystone""" + global _ZAQAR_ENDPOINTS + + if _ZAQAR_ENDPOINTS: + return _ZAQAR_ENDPOINTS + + zaqar_session = _get_admin_session(KEYSTONE_AUTHTOKEN_GROUP) + auth = zaqar_session.auth + if not auth: + return _ZAQAR_ENDPOINTS + + catalogs = auth.get_auth_ref(zaqar_session).service_catalog + try: + _ZAQAR_ENDPOINTS['zaqar'] = catalogs.url_for(service_name='zaqar') + except Exception: + pass + try: + _ZAQAR_ENDPOINTS['zaqar-websocket'] = catalogs.url_for( + service_name='zaqar-websocket') + except Exception: + pass + + return _ZAQAR_ENDPOINTS diff --git a/zaqar/common/configs.py b/zaqar/common/configs.py index 4550acb36..204f7282e 100644 --- a/zaqar/common/configs.py +++ b/zaqar/common/configs.py @@ -59,7 +59,10 @@ _NOTIFICATION_OPTIONS = ( help=('The command of smtp to send email. The format is ' '"command_name arg1 arg2".')), cfg.IntOpt('max_notifier_workers', default=10, - help='The max amount of the notification workers.') + help='The max amount of the notification workers.'), + cfg.BoolOpt('require_confirmation', default=False, + help='Whether the http/https/email subscription need to be ' + 'confirmed before notification.'), ) _NOTIFICATION_GROUP = 'notification' diff --git a/zaqar/notification/notifier.py b/zaqar/notification/notifier.py index 45238da99..de9283631 100644 --- a/zaqar/notification/notifier.py +++ b/zaqar/notification/notifier.py @@ -13,18 +13,30 @@ # See the License for the specific language governing permissions and # limitations under the License. +import enum from stevedore import driver import futurist from oslo_log import log as logging from six.moves import urllib_parse +from zaqar.common import auth +from zaqar.common import urls from zaqar.i18n import _LE +from zaqar.i18n import _LI from zaqar.storage import pooling LOG = logging.getLogger(__name__) +@enum.unique +class MessageType(enum.IntEnum): + """Enum of message type.""" + SubscriptionConfirmation = 1 + UnsubscribeConfirmation = 2 + Notification = 3 + + class NotifierDriver(object): """Notifier which is responsible for sending messages to subscribers. @@ -34,6 +46,7 @@ class NotifierDriver(object): self.subscription_controller = kwargs.get('subscription_controller') max_workers = kwargs.get('max_notifier_workers', 10) self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers) + self.require_confirmation = kwargs.get('require_confirmation', False) def post(self, queue_name, messages, client_uuid, project=None): """Send messages to the subscribers.""" @@ -48,14 +61,76 @@ class NotifierDriver(object): LOG.debug("Notifying subscriber %r" % (sub,)) s_type = urllib_parse.urlparse( sub['subscriber']).scheme - data_driver = self.subscription_controller.driver - mgr = driver.DriverManager('zaqar.notification.tasks', - s_type, - invoke_on_load=True) - self.executor.submit(mgr.driver.execute, sub, messages, - conf=data_driver.conf) + # If the subscriber doesn't contain 'confirmed', it + # means that this kind of subscriber was created before + # the confirm feature be introduced into Zaqar. We + # should allow them be subscribed. + if (self.require_confirmation and + not sub.get('confirmed', True)): + LOG.info(_LI('The subscriber %s is not ' + 'confirmed.'), sub['subscriber']) + continue + for msg in messages: + msg['Message_Type'] = MessageType.Notification.name + self._execute(s_type, sub, messages) marker = next(subscribers) if not marker: break else: LOG.error(_LE('Failed to get subscription controller.')) + + def send_confirm_notification(self, queue, subscription, conf, + project=None, expires=None, + api_version=None): + key = conf.signed_url.secret_key + if not key: + LOG.error(_LE("Can't send confirm notification due to the value of" + " secret_key option is None")) + return + url = "/%s/queues/%s/subscriptions/%s/confirm" % (api_version, queue, + subscription['id']) + pre_url = urls.create_signed_url(key, [url], project=project, + expires=expires, methods=['PUT']) + message_type = MessageType.SubscriptionConfirmation.name + + messages = {} + endpoint_dict = auth.get_public_endpoint() + if endpoint_dict: + wsgi_endpoint = endpoint_dict.get('zaqar', None) + if wsgi_endpoint: + wsgi_subscribe_url = urllib_parse.urljoin( + wsgi_endpoint, url) + messages['WSGISubscribeURL'] = wsgi_subscribe_url + websocket_endpoint = endpoint_dict.get('zaqar-websocket', None) + if websocket_endpoint: + websocket_subscribe_url = urllib_parse.urljoin( + websocket_endpoint, url) + messages['WebSocketSubscribeURL'] = websocket_subscribe_url + messages.update({'Message_Type': message_type, + 'Message': 'You have chosen to subscribe to the ' + 'queue: %s' % queue, + 'URL-Signature': pre_url['signature'], + 'URL-Methods': pre_url['methods'][0], + 'URL-Paths': pre_url['paths'][0], + 'X-Project-ID': pre_url['project'], + 'URL-Expires': pre_url['expires'], + 'SubscribeBody': {'confirmed': True}, + 'UnsubscribeBody': {'confirmed': False}}) + s_type = urllib_parse.urlparse(subscription['subscriber']).scheme + LOG.info(_LI('Begin to send %(type)s confirm notification. The request' + 'body is %(messages)s'), + {'type': s_type, 'messages': messages}) + + self._execute(s_type, subscription, [messages], conf) + + def _execute(self, s_type, subscription, messages, conf=None): + if self.subscription_controller: + data_driver = self.subscription_controller.driver + conf = data_driver.conf + else: + conf = conf + mgr = driver.DriverManager('zaqar.notification.tasks', + s_type, + invoke_on_load=True) + self.executor.submit(mgr.driver.execute, subscription, messages, + conf=conf) diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index b8c01da0d..d9f872e00 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -48,6 +48,7 @@ class SubscriptionController(base.Subscription): 'e': expires: datetime.datetime 'o': options :: dict 'p': project :: six.text_type + 'c': confirmed :: boolean """ def __init__(self, *args, **kwargs): @@ -70,7 +71,7 @@ class SubscriptionController(base.Subscription): if marker is not None: query['_id'] = {'$gt': utils.to_oid(marker)} - projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} + projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1, 'c': 1} cursor = self._collection.find(query, projection=projection) cursor = cursor.limit(limit).sort('_id') @@ -103,6 +104,7 @@ class SubscriptionController(base.Subscription): now = timeutils.utcnow_ts() now_dt = datetime.datetime.utcfromtimestamp(now) expires = now_dt + datetime.timedelta(seconds=ttl) + confirmed = False try: subscription_id = self._collection.insert({'s': source, @@ -110,7 +112,8 @@ class SubscriptionController(base.Subscription): 't': ttl, 'e': expires, 'o': options, - 'p': project}) + 'p': project, + 'c': confirmed}) return subscription_id except pymongo.errors.DuplicateKeyError: return None @@ -153,6 +156,22 @@ class SubscriptionController(base.Subscription): self._collection.remove({'_id': utils.to_oid(subscription_id), 'p': project}, w=0) + @utils.raises_conn_error + def get_with_subscriber(self, queue, subscriber, project=None): + res = self._collection.find_one({'u': subscriber, + 'p': project}) + now = timeutils.utcnow_ts() + return _basic_subscription(res, now) + + @utils.raises_conn_error + def confirm(self, queue, subscription_id, project=None, confirm=True): + + res = self._collection.update({'_id': utils.to_oid(subscription_id), + 'p': project}, {'$set': {'c': confirm}}, + upsert=False) + if not res['updatedExisting']: + raise errors.SubscriptionDoesNotExist(subscription_id) + def _basic_subscription(record, now): # NOTE(Eva-i): unused here record's field 'e' (expires) has changed it's @@ -161,11 +180,13 @@ def _basic_subscription(record, now): # starting using 'e' field should make sure support both of the formats. oid = record['_id'] age = now - utils.oid_ts(oid) + confirmed = record.get('c', True) return { 'id': str(oid), 'source': record['s'], 'subscriber': record['u'], 'ttl': record['t'], 'age': int(age), - 'options': record['o'] + 'options': record['o'], + 'confirmed': confirmed, } diff --git a/zaqar/storage/pipeline.py b/zaqar/storage/pipeline.py index 15e3417d7..55d4af08c 100644 --- a/zaqar/storage/pipeline.py +++ b/zaqar/storage/pipeline.py @@ -150,7 +150,9 @@ class DataDriver(base.DataDriverBase): kwargs = {'subscription_controller': self._storage.subscription_controller, 'max_notifier_workers': - self.conf.notification.max_notifier_workers} + self.conf.notification.max_notifier_workers, + 'require_confirmation': + self.conf.notification.require_confirmation} stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) stages.append(self._storage.message_controller) return common.Pipeline(stages) diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index a48f72833..bf771c6ed 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -409,6 +409,17 @@ class SubscriptionController(storage.Subscription): return control.exists(queue, subscription_id, project=project) + def confirm(self, queue, subscription_id, project=None, confirm=None): + control = self._get_controller(queue, project) + if control: + return control.confirm(queue, subscription_id, + project=project, confirm=confirm) + + def get_with_subscriber(self, queue, subscriber, project=None): + control = self._get_controller(queue, project) + if control: + return control.get_with_subscriber(queue, subscriber, project) + class Catalog(object): """Represents the mapping between queues and pool drivers.""" diff --git a/zaqar/tests/base.py b/zaqar/tests/base.py index 436fc5447..a4824f8db 100644 --- a/zaqar/tests/base.py +++ b/zaqar/tests/base.py @@ -58,6 +58,10 @@ class TestBase(testtools.TestCase): group=configs._DRIVER_GROUP) self.conf.register_opts(configs._NOTIFICATION_OPTIONS, group=configs._NOTIFICATION_GROUP) + self.conf.register_opts(configs._NOTIFICATION_OPTIONS, + group=configs._NOTIFICATION_GROUP) + self.conf.register_opts(configs._SIGNED_URL_OPTIONS, + group=configs._SIGNED_URL_GROUP) self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL', 'mongodb://127.0.0.1:27017') diff --git a/zaqar/tests/etc/policy.json b/zaqar/tests/etc/policy.json index 96d673376..89d507675 100644 --- a/zaqar/tests/etc/policy.json +++ b/zaqar/tests/etc/policy.json @@ -27,6 +27,7 @@ "subscription:get": "", "subscription:delete": "", "subscription:update": "", + "subscription:confirm": "", "pools:get_all": "rule:context_is_admin", "pools:create": "rule:context_is_admin", diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py index 303af388e..ea4232617 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions.py @@ -115,10 +115,15 @@ class TestSubscriptions(base.BaseV2MessagingTest): if not test.call_until_true( lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1): self.fail("Couldn't get messages") - messages = self.list_messages(sub_queue) + _, body = self.list_messages(sub_queue) expected = message_body['messages'][0] expected['queue_name'] = self.queue_name - self.assertEqual(expected, messages[1]['messages'][0]['body']) + expected['Message_Type'] = 'Notification' + for message in body['messages']: + # There are two message in the queue. One is the confirm message, + # the other one is the notification. + if message['body']['Message_Type'] == 'Notification': + self.assertEqual(expected, message['body']) @classmethod def resource_cleanup(cls): diff --git a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py index 2e623a667..1d6b09780 100644 --- a/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py +++ b/zaqar/tests/tempest_plugin/tests/v2/test_subscriptions_negative.py @@ -17,6 +17,7 @@ import uuid from tempest import config from tempest.lib.common.utils import data_utils +from tempest.lib import decorators from tempest.lib import exceptions as lib_exc from tempest import test @@ -45,6 +46,10 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest): # Create Subscriptions + # TODO(wangxiyuan): Now the subscription confirmation feature only support + # mongoDB backend. Skip this test until the feature support the redis + # backend. Then rewrite it. + @decorators.skip_because(bug='1609596') @test.attr(type=['negative']) @test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74') def test_create_subscriptions_with_duplicate_subscriber(self): diff --git a/zaqar/tests/unit/notification/test_notifier.py b/zaqar/tests/unit/notification/test_notifier.py index eb11ce647..efc12f137 100644 --- a/zaqar/tests/unit/notification/test_notifier.py +++ b/zaqar/tests/unit/notification/test_notifier.py @@ -43,14 +43,17 @@ class NotifierTest(testing.TestBase): "body": {"event": "BackupStarted", "backup_id": "c378813c-3f0b-11e2-ad92"}, - "queue_name": "fake_queue" + "queue_name": "fake_queue", + "Message_Type": "Notification" }, {"body": {"event": "BackupProgress", "current_bytes": "0", "total_bytes": "99614720"}, - "queue_name": "fake_queue" + "queue_name": "fake_queue", + "Message_Type": "Notification" } ] + self.api_version = 'v2' def test_webhook(self): subscription = [{'subscriber': 'http://trigger_me', @@ -257,3 +260,41 @@ class NotifierTest(testing.TestBase): self.assertEqual(2, mock_post.call_count) self.assertEqual(self.notifications[1], json.loads(mock_post.call_args[1]['data'])) + + @mock.patch('requests.post') + def test_send_confirm_notification(self, mock_request): + subscription = {'id': '5760c9fb3990b42e8b7c20bd', + 'subscriber': 'http://trigger_me', + 'source': 'fake_queue', + 'options': {}} + ctlr = mock.MagicMock() + ctlr.list = mock.Mock(return_value=subscription) + driver = notifier.NotifierDriver(subscription_controller=ctlr) + self.conf.signed_url.secret_key = 'test_key' + driver.send_confirm_notification('test_queue', subscription, self.conf, + str(self.project), + api_version=self.api_version) + driver.executor.shutdown() + + self.assertEqual(1, mock_request.call_count) + expect_args = ['SubscribeBody', 'queue_name', 'URL-Methods', + 'X-Project-ID', 'URL-Signature', 'URL-Paths', 'Message', + 'URL-Expires', 'Message_Type', 'WSGISubscribeURL', + 'WebSocketSubscribeURL' 'UnsubscribeBody'] + actual_args = json.loads(mock_request.call_args[1]['data']).keys() + self.assertEqual(expect_args.sort(), + list(actual_args).sort()) + + @mock.patch('requests.post') + def test_send_confirm_notification_without_signed_url(self, mock_request): + subscription = [{'subscriber': 'http://trigger_me', + 'source': 'fake_queue', 'options': {}}] + ctlr = mock.MagicMock() + ctlr.list = mock.Mock(return_value=iter([subscription, {}])) + driver = notifier.NotifierDriver(subscription_controller=ctlr) + + driver.send_confirm_notification('test_queue', subscription, self.conf, + str(self.project), self.api_version) + driver.executor.shutdown() + + self.assertEqual(0, mock_request.call_count) diff --git a/zaqar/tests/unit/storage/test_impl_mongodb.py b/zaqar/tests/unit/storage/test_impl_mongodb.py index cb340d6d4..abc635931 100644 --- a/zaqar/tests/unit/storage/test_impl_mongodb.py +++ b/zaqar/tests/unit/storage/test_impl_mongodb.py @@ -495,6 +495,34 @@ class MongodbSubscriptionTests(MongodbSetupMixin, controller_class = controllers.SubscriptionController control_driver_class = mongodb.ControlDriver + def test_confirm(self): + s_id = self.subscription_controller.create(self.source, + self.subscriber, + self.ttl, + self.options, + project=self.project) + self.addCleanup(self.subscription_controller.delete, self.source, + s_id, self.project) + subscription = self.subscription_controller.get(self.source, s_id, + project=self.project) + + self.assertEqual(False, subscription['confirmed']) + + self.subscription_controller.confirm(self.source, s_id, + project=self.project, + confirm=True) + subscription = self.subscription_controller.get(self.source, s_id, + project=self.project) + + self.assertEqual(True, subscription['confirmed']) + + def test_confirm_with_nonexist_subscription(self): + s_id = 'fake-id' + self.assertRaises(errors.SubscriptionDoesNotExist, + self.subscription_controller.confirm, + self.source, s_id, project=self.project, confirm=True + ) + # # TODO(kgriffs): Do these need database purges as well as those above? diff --git a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py index 5ac39b81d..7e400439b 100644 --- a/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py +++ b/zaqar/tests/unit/transport/websocket/v2/test_subscriptions.py @@ -236,7 +236,8 @@ class SubscriptionTest(base.V1_1Base): 'source': 'kitkat', 'options': {}, 'id': str(sub), - 'ttl': 600}, + 'ttl': 600, + 'confirmed': False}, 'headers': {'status': 200}, 'request': {'action': 'subscription_get', 'body': {'queue_name': 'kitkat', @@ -273,7 +274,8 @@ class SubscriptionTest(base.V1_1Base): 'source': 'kitkat', 'options': {}, 'id': str(sub), - 'ttl': 600}]}, + 'ttl': 600, + 'confirmed': False}]}, 'headers': {'status': 200}, 'request': {'action': 'subscription_list', 'body': {'queue_name': 'kitkat'}, @@ -346,7 +348,8 @@ class SubscriptionTest(base.V1_1Base): ws_notification = msgpack.unpackb(sender.call_args_list[2][0][0], encoding='utf-8') self.assertEqual({'body': {'status': 'disco queen'}, 'ttl': 60, - 'queue_name': 'kitkat'}, ws_notification) + 'queue_name': 'kitkat', + 'Message_Type': u'Notification'}, ws_notification) def test_list_returns_503_on_nopoolfound_exception(self): sub = self.boot.storage.subscription_controller.create( diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py index 0f929601f..65eb690de 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_subscriptions.py @@ -20,6 +20,7 @@ import mock from oslo_serialization import jsonutils from zaqar.common import auth +from zaqar.notification import notifier from zaqar.storage import errors as storage_errors from zaqar import tests as testing from zaqar.tests.unit.transport.wsgi import base @@ -57,6 +58,11 @@ class TestSubscriptionsMongoDB(base.V2Base): self.subscription_path = (self.url_prefix + '/queues/' + self.queue + '/subscriptions') + self.subscription = 'fake-id' + self.confirm_path = (self.url_prefix + '/queues/' + self.queue + + '/subscriptions/' + self.subscription + + '/confirm') + self.conf.signed_url.secret_key = 'test_key' def tearDown(self): resp = self.simulate_get(self.subscription_path, @@ -95,6 +101,44 @@ class TestSubscriptionsMongoDB(base.V2Base): self._create_subscription(subscriber='http://CCC.com') self.assertEqual(falcon.HTTP_201, self.srmock.status) + # the subscription is not confirmed, So the second request will + # retry confirm and return 201 again. + self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + @mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification') + def test_create_and_send_notification(self, mock_send_confirm): + self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(1, mock_send_confirm.call_count) + + @mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification') + def test_recreate(self, mock_send_confirm): + resp = self._create_subscription(subscriber='http://CCC.com') + resp_doc = jsonutils.loads(resp[0]) + s_id1 = resp_doc['subscription_id'] + self.assertEqual(1, mock_send_confirm.call_count) + + resp = self._create_subscription(subscriber='http://CCC.com') + resp_doc = jsonutils.loads(resp[0]) + s_id2 = resp_doc['subscription_id'] + self.assertEqual(2, mock_send_confirm.call_count) + + self.assertEqual(s_id1, s_id2) + + @mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification') + def test_recreate_after_confirmed(self, mock_send_confirm): + resp = self._create_subscription(subscriber='http://CCC.com') + self.assertEqual(falcon.HTTP_201, self.srmock.status) + + doc = '{"confirmed": true}' + resp_doc = jsonutils.loads(resp[0]) + confirm_path = (self.url_prefix + '/queues/' + self.queue + + '/subscriptions/' + resp_doc['subscription_id'] + + '/confirm') + self.simulate_put(confirm_path, body=doc, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + self.assertEqual(1, mock_send_confirm.call_count) + self._create_subscription(subscriber='http://CCC.com') self.assertEqual(falcon.HTTP_409, self.srmock.status) @@ -353,3 +397,35 @@ class TestSubscriptionsMongoDB(base.V2Base): options = resp_list_doc['subscriptions'][0]['options'] self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options) + + def test_confirm(self): + doc = '{"confirmed": true}' + resp = self._create_subscription() + resp_doc = jsonutils.loads(resp[0]) + confirm_path = (self.url_prefix + '/queues/' + self.queue + + '/subscriptions/' + resp_doc['subscription_id'] + + '/confirm') + self.simulate_put(confirm_path, body=doc, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + def test_confirm_with_invalid_body(self): + doc = '{confirmed:123}' + resp = self.simulate_put(self.confirm_path, body=doc, + headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + resp_doc = jsonutils.loads(resp[0]) + self.assertIn('body could not be parsed', resp_doc['description']) + + def test_confirm_without_boolean_body(self): + doc = '{"confirmed":123}' + resp = self.simulate_put(self.confirm_path, body=doc, + headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + resp_doc = jsonutils.loads(resp[0]) + self.assertEqual("The 'confirmed' should be boolean.", + resp_doc['description']) + + def test_confirm_with_non_subscription(self): + doc = '{"confirmed": true}' + self.simulate_put(self.confirm_path, body=doc, headers=self.headers) + self.assertEqual(falcon.HTTP_404, self.srmock.status) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index dd539f3aa..75d1bb76e 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -540,6 +540,12 @@ class Validator(object): except OverflowError: raise ValidationFailed(msg, datetime.datetime.max) + def subscription_confirming(self, confirm): + confirm = confirm.get('confirmed', None) + if not isinstance(confirm, bool): + msg = _(u"The 'confirmed' should be boolean.") + raise ValidationFailed(msg) + def subscription_listing(self, limit=None, **kwargs): """Restrictions involving a list of subscriptions. diff --git a/zaqar/transport/wsgi/v2_0/__init__.py b/zaqar/transport/wsgi/v2_0/__init__.py index f5c3eec07..310326584 100644 --- a/zaqar/transport/wsgi/v2_0/__init__.py +++ b/zaqar/transport/wsgi/v2_0/__init__.py @@ -101,12 +101,17 @@ def public_endpoints(driver, conf): subscriptions.CollectionResource(driver._validate, subscription_controller, defaults.subscription_ttl, - queue_controller)), + queue_controller, + conf)), ('/queues/{queue_name}/subscriptions/{subscription_id}', subscriptions.ItemResource(driver._validate, subscription_controller)), + ('/queues/{queue_name}/subscriptions/{subscription_id}/confirm', + subscriptions.ConfirmResource(driver._validate, + subscription_controller)), + # Pre-Signed URL Endpoint ('/queues/{queue_name}/share', urls.Resource(driver)), ] diff --git a/zaqar/transport/wsgi/v2_0/subscriptions.py b/zaqar/transport/wsgi/v2_0/subscriptions.py index 6b65ecd51..7690a47cc 100644 --- a/zaqar/transport/wsgi/v2_0/subscriptions.py +++ b/zaqar/transport/wsgi/v2_0/subscriptions.py @@ -12,15 +12,18 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import falcon from oslo_log import log as logging from oslo_utils import netutils +from oslo_utils import timeutils import six from stevedore import driver from zaqar.common import decorators from zaqar.i18n import _ +from zaqar.notification import notifier from zaqar.storage import errors as storage_errors from zaqar.transport import acl from zaqar.transport import utils @@ -111,14 +114,17 @@ class ItemResource(object): class CollectionResource(object): __slots__ = ('_subscription_controller', '_validate', - '_default_subscription_ttl', '_queue_controller') + '_default_subscription_ttl', '_queue_controller', + '_conf', '_notification') def __init__(self, validate, subscription_controller, - default_subscription_ttl, queue_controller): + default_subscription_ttl, queue_controller, conf): self._subscription_controller = subscription_controller self._validate = validate self._default_subscription_ttl = default_subscription_ttl self._queue_controller = queue_controller + self._conf = conf + self._notification = notifier.NotifierDriver() @decorators.TransportLog("Subscription collection") @acl.enforce("subscription:get_all") @@ -192,7 +198,6 @@ class CollectionResource(object): ttl, options, project=project_id) - except validation.ValidationFailed as ex: LOG.debug(ex) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) @@ -201,13 +206,80 @@ class CollectionResource(object): description = _(u'Subscription could not be created.') raise wsgi_errors.HTTPServiceUnavailable(description) + now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) + expires = now_dt + datetime.timedelta(seconds=ttl) + api_version = req.path.split('/')[1] if created: + subscription = self._subscription_controller.get(queue_name, + created, + project_id) + # send confirm notification + self._notification.send_confirm_notification( + queue_name, subscription, self._conf, project_id, + str(expires), api_version) + resp.location = req.path resp.status = falcon.HTTP_201 resp.body = utils.to_json( {'subscription_id': six.text_type(created)}) else: - description = _(u'Such subscription already exists. Subscriptions ' - u'are unique by project + queue + subscriber URI.') - raise wsgi_errors.HTTPConflict(description, headers={'location': - req.path}) + subscription = self._subscription_controller.get_with_subscriber( + queue_name, subscriber, project_id) + confirmed = subscription.get('confirmed', True) + if confirmed: + description = _(u'Such subscription already exists.' + u'Subscriptions are unique by project + queue ' + u'+ subscriber URI.') + raise wsgi_errors.HTTPConflict(description, + headers={'location': req.path}) + else: + # The subscription is not confirmed, re-send confirm + # notification + self._notification.send_confirm_notification( + queue_name, subscription, self._conf, project_id, + str(expires), api_version) + + resp.location = req.path + resp.status = falcon.HTTP_201 + resp.body = utils.to_json( + {'subscription_id': six.text_type(subscription['id'])}) + + +class ConfirmResource(object): + + __slots__ = ('_subscription_controller', '_validate') + + def __init__(self, validate, subscription_controller): + self._subscription_controller = subscription_controller + self._validate = validate + + @decorators.TransportLog("Confirm Subscription") + @acl.enforce("subscription:confirm") + def on_put(self, req, resp, project_id, queue_name, subscription_id): + if req.content_length: + document = wsgi_utils.deserialize(req.stream, req.content_length) + else: + document = {} + + try: + self._validate.subscription_confirming(document) + confirm = document.get('confirmed', None) + self._subscription_controller.confirm(queue_name, subscription_id, + project=project_id, + confirm=confirm) + resp.status = falcon.HTTP_204 + resp.location = req.path + except storage_errors.SubscriptionDoesNotExist as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPNotFound(six.text_type(ex)) + except validation.ValidationFailed as ex: + LOG.debug(ex) + raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) + except Exception as ex: + LOG.exception(ex) + description = (_(u'Subscription %(subscription_id)s could not be' + ' confirmed.') % + dict(subscription_id=subscription_id)) + raise falcon.HTTPBadRequest(_('Unable to confirm subscription'), + description)