Subscription Confirmation Support-1
The subscription confirmation feature will contain four patches: 1. webhook with mongoDB 2. email with mongoDB 3. webhook with redis 4. email with redis This patch is the first part of subscription confirmation feature for webhook with MongoDB. Others will be achieved in follow patches. This patch did: 1. Add v2/queue/<queue_name>/subscription/<subscription_id>/confirm endpoint. 2. Add a new config option: "require_confirmation". 3. Add a new property "confirmed" to subscription resource for MongoDB driver. 4. Add a new policy "subscription: confirm". 5. Add a new property "message type" for notification. 6. Use the pre-signed url in confirm request. 8. Re-use POST subscription to allow re-confirm. 9. Update notification for webhook subscription with mongoDB. 10. Support unsubscrib the subscription 11. Add tests for the feature. 12. Add doc and sample. Docimpact APIimpact Change-Id: Id38d4a5b4f9303b12e22e2b5c248facda4c00143 Implements: blueprint subscription-confirmation-support
This commit is contained in:
parent
77d0567dc5
commit
69c799734b
204
doc/source/subscription_confirm.rst
Normal file
204
doc/source/subscription_confirm.rst
Normal file
@ -0,0 +1,204 @@
|
|||||||
|
..
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
not use this file except in compliance with the License. You may obtain
|
||||||
|
a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
License for the specific language governing permissions and limitations
|
||||||
|
under the License.
|
||||||
|
|
||||||
|
==============================
|
||||||
|
The subscription Confirm Guide
|
||||||
|
==============================
|
||||||
|
|
||||||
|
The subscription confirm feature now only support webhook with mongoDB backend.
|
||||||
|
This guide shows how to use this feature:
|
||||||
|
|
||||||
|
1. Set the config option "require_confirmation" and add the policy to the
|
||||||
|
policy.json file. Then restart Zaqar-wsgi service::
|
||||||
|
|
||||||
|
In the config file:
|
||||||
|
[notification]
|
||||||
|
require_confirmation = True
|
||||||
|
|
||||||
|
In the policy.json file:
|
||||||
|
"subscription:confirm": "",
|
||||||
|
|
||||||
|
2. Create a subscription.
|
||||||
|
|
||||||
|
Here used zaqar/samples/zaqar/subscriber_service_sample.py be the subscriber
|
||||||
|
endpoint for example.So before the step 2, you should start the subscriber
|
||||||
|
service first.
|
||||||
|
The service could be started simply by the command::
|
||||||
|
|
||||||
|
python zaqar/samples/zaqar/subscriber_service_sample.py
|
||||||
|
The service's default port is 5678. If you want to use a new port, the command
|
||||||
|
will be like::
|
||||||
|
|
||||||
|
python zaqar/samples/zaqar/subscriber_service_sample.py new_port_number
|
||||||
|
The service will not confirm the subscription automatically by default. If you
|
||||||
|
want to do that, the command will be like::
|
||||||
|
|
||||||
|
python zaqar/samples/zaqar/subscriber_service_sample.py --auto-confirm
|
||||||
|
|
||||||
|
Then create a subscription::
|
||||||
|
|
||||||
|
curl -i -X POST http://10.229.47.217:8888/v2/queues/test/subscriptions \
|
||||||
|
-H "Content-type: application/json" \
|
||||||
|
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
|
||||||
|
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \
|
||||||
|
-d '{"subscriber":"http://10.229.47.217:5678", "ttl":3600, "options":{}}'
|
||||||
|
|
||||||
|
The response::
|
||||||
|
|
||||||
|
HTTP/1.1 201 Created
|
||||||
|
content-length: 47
|
||||||
|
content-type: application/json; charset=UTF-8
|
||||||
|
location: http://10.229.47.217:8888/v2/queues/test/subscriptions
|
||||||
|
Connection: close
|
||||||
|
{"subscription_id": "576256b03990b480617b4063"}
|
||||||
|
|
||||||
|
At the same time, If the subscriber sample service is not start by
|
||||||
|
"--auto confirm", you will receive a POST request in the subscriber sample
|
||||||
|
service, the request is like::
|
||||||
|
|
||||||
|
WARNING:root:{"UnsubscribeBody": {"confirmed": false}, "URL-Methods": "PUT",
|
||||||
|
"X-Project-ID": "51be2c72393e457ebf0a22a668e10a64",
|
||||||
|
"URL-Paths": "/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm",
|
||||||
|
"URL-Expires": "2016-07-06T04:35:56", "queue_name": "test",
|
||||||
|
"SubscribeURL": ["/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm"],
|
||||||
|
"SubscribeBody": {"confirmed": true},
|
||||||
|
"URL-Signature": "d4038a40589cdb61cd13d5a6997472f5be779db441dd8fe0c597a6e465f30c41",
|
||||||
|
"Message": "You have chosen to subscribe to the queue: test",
|
||||||
|
"Message_Type": "SubscriptionConfirmation"}
|
||||||
|
10.229.47.217 - - [06/Jul/2016 11:35:56] "POST / HTTP/1.1" 200 -
|
||||||
|
If you start the sample service with "--auto confirm", please go to step 6
|
||||||
|
directly, because the step 5 will be done by the service automatically.
|
||||||
|
|
||||||
|
3. Get the subscription.
|
||||||
|
The request::
|
||||||
|
|
||||||
|
curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \
|
||||||
|
-H "Content-type: application/json" \
|
||||||
|
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
|
||||||
|
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4"
|
||||||
|
|
||||||
|
The response::
|
||||||
|
|
||||||
|
HTTP/1.1 200 OK
|
||||||
|
content-length: 154
|
||||||
|
content-type: application/json; charset=UTF-8
|
||||||
|
Connection: close
|
||||||
|
{"confirmed": false, "age": 73, "id": "576256b03990b480617b4063",
|
||||||
|
"subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600, "options": {}}
|
||||||
|
|
||||||
|
You can find that the "confirmed" property is false by default.
|
||||||
|
|
||||||
|
4. Post a message to the subscription's queue
|
||||||
|
The request::
|
||||||
|
|
||||||
|
curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \
|
||||||
|
-H "Content-type: application/json" \
|
||||||
|
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
|
||||||
|
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \
|
||||||
|
-d '{"messages": [{"ttl": 3600,"body": "test123"}]}'
|
||||||
|
|
||||||
|
The response::
|
||||||
|
|
||||||
|
HTTP/1.1 201 Created
|
||||||
|
content-length: 68
|
||||||
|
content-type: application/json; charset=UTF-8
|
||||||
|
location: http://10.229.47.217:8888/v2/queues/test/messages?ids=57624dee3990b4634d71bb4a
|
||||||
|
Connection: close
|
||||||
|
{"resources": ["/v2/queues/test/messages/57624dee3990b4634d71bb4a"]}
|
||||||
|
|
||||||
|
The subscriber received nothing and you will find a log info in zaqar-wsgi.::
|
||||||
|
|
||||||
|
2016-07-06 11:37:57.929 98400 INFO zaqar.notification.notifier
|
||||||
|
[(None,)2473911afe2642c0b74d7e1200d9bba7 51be2c72393e457ebf0a22a668e10a64 - - -]
|
||||||
|
The subscriber http://10.229.47.217:5678 is not confirmed.
|
||||||
|
|
||||||
|
5. Use the information showed in step3 to confirm the subscription
|
||||||
|
The request::
|
||||||
|
|
||||||
|
curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \
|
||||||
|
-H "Content-type: application/json" \
|
||||||
|
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
|
||||||
|
-H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \
|
||||||
|
-H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \
|
||||||
|
-H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \
|
||||||
|
-H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": true}'
|
||||||
|
|
||||||
|
The response::
|
||||||
|
|
||||||
|
HTTP/1.1 204 No Content
|
||||||
|
location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm
|
||||||
|
Connection: close
|
||||||
|
|
||||||
|
6. Repeat step3 to get the subscription
|
||||||
|
The request::
|
||||||
|
|
||||||
|
curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \
|
||||||
|
-H "Content-type: application/json" \
|
||||||
|
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
|
||||||
|
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4"
|
||||||
|
|
||||||
|
The response::
|
||||||
|
|
||||||
|
HTTP/1.1 200 OK
|
||||||
|
content-length: 155
|
||||||
|
content-type: application/json; charset=UTF-8
|
||||||
|
Connection: close
|
||||||
|
{"confirmed": true, "age": 1370, "id": "576256b03990b480617b4063",
|
||||||
|
"subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600,
|
||||||
|
"options": {}}
|
||||||
|
|
||||||
|
The subscription is confirmed now.
|
||||||
|
|
||||||
|
7. Repeat step4 to post a new message.
|
||||||
|
The request::
|
||||||
|
|
||||||
|
curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \
|
||||||
|
-H "Content-type: application/json" \
|
||||||
|
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
|
||||||
|
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \
|
||||||
|
-d '{"messages": [{"ttl": 3600,"body": "test123"}]}'
|
||||||
|
|
||||||
|
The response::
|
||||||
|
|
||||||
|
HTTP/1.1 201 Created
|
||||||
|
content-length: 68
|
||||||
|
content-type: application/json; charset=UTF-8
|
||||||
|
location: http://10.229.47.217:8888/v2/queues/test/messages?ids=5762526d3990b474c80d5483
|
||||||
|
Connection: close
|
||||||
|
{"resources": ["/v2/queues/test/messages/5762526d3990b474c80d5483"]}
|
||||||
|
|
||||||
|
Then in subscriber sample service, you will receive a request::
|
||||||
|
|
||||||
|
WARNING:root:{"body": {"event": "BackupStarted"}, "queue_name": "test",
|
||||||
|
"Message_Type": "Notification", "ttl": 3600}
|
||||||
|
10.229.47.217 - - [06/Jul/2016 13:19:07] "POST / HTTP/1.1" 200 -
|
||||||
|
|
||||||
|
8. Unsubscription.
|
||||||
|
The request::
|
||||||
|
|
||||||
|
curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \
|
||||||
|
-H "Content-type: application/json" \
|
||||||
|
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
|
||||||
|
-H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \
|
||||||
|
-H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \
|
||||||
|
-H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \
|
||||||
|
-H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": false}'
|
||||||
|
|
||||||
|
The response::
|
||||||
|
|
||||||
|
HTTP/1.1 204 No Content
|
||||||
|
location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm
|
||||||
|
Connection: close
|
||||||
|
|
||||||
|
Then try to post a message. The subscriber will not receive the notification
|
||||||
|
any more.
|
@ -27,6 +27,7 @@
|
|||||||
"subscription:get": "",
|
"subscription:get": "",
|
||||||
"subscription:delete": "",
|
"subscription:delete": "",
|
||||||
"subscription:update": "",
|
"subscription:update": "",
|
||||||
|
"subscription:confirm": "",
|
||||||
|
|
||||||
"pools:get_all": "rule:context_is_admin",
|
"pools:get_all": "rule:context_is_admin",
|
||||||
"pools:create": "rule:context_is_admin",
|
"pools:create": "rule:context_is_admin",
|
||||||
|
@ -0,0 +1,9 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- Now before users send messages to subscribers through a queue, the
|
||||||
|
subscribers should be confirmed first. Zaqar only sends messages to the
|
||||||
|
confirmed subscribers. This feature supports "webhook" and "mailto"
|
||||||
|
subscribers with mongoDB or redis backend. The "mailto" part will be done
|
||||||
|
in O cycle. Set "require_confirmation = True" to enable this feature. The
|
||||||
|
default value is "False" now and we will enable it by default after one or
|
||||||
|
two cycles.
|
79
samples/zaqar/subscriber_service_sample.py
Normal file
79
samples/zaqar/subscriber_service_sample.py
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
# use this file except in compliance with the License. You may obtain a copy
|
||||||
|
# of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations under
|
||||||
|
# the License.
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import requests
|
||||||
|
import sys
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
try:
|
||||||
|
import SimpleHTTPServer
|
||||||
|
import SocketServer
|
||||||
|
except Exception:
|
||||||
|
from http import server as SimpleHTTPServer
|
||||||
|
import socketserver as SocketServer
|
||||||
|
|
||||||
|
|
||||||
|
_AUTO_CONFIRM = False
|
||||||
|
for arg in sys.argv:
|
||||||
|
if arg == '--auto-confirm':
|
||||||
|
_AUTO_CONFIRM = True
|
||||||
|
sys.argv.remove(arg)
|
||||||
|
break
|
||||||
|
|
||||||
|
if len(sys.argv) > 2:
|
||||||
|
PORT = int(sys.argv[2])
|
||||||
|
elif len(sys.argv) > 1:
|
||||||
|
PORT = int(sys.argv[1])
|
||||||
|
else:
|
||||||
|
PORT = 5678
|
||||||
|
|
||||||
|
|
||||||
|
class ServerHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
|
||||||
|
"""This is the sample service for wsgi subscription.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# TODO(wangxiyuan): support websocket.
|
||||||
|
def do_POST(self):
|
||||||
|
logging.warning('=================== POST =====================')
|
||||||
|
data_string = str(
|
||||||
|
self.rfile.read(int(self.headers['Content-Length'])))
|
||||||
|
self.data = json.loads(data_string)
|
||||||
|
if _AUTO_CONFIRM:
|
||||||
|
self._send_confirm_request()
|
||||||
|
message = 'OK'
|
||||||
|
self.send_response(200)
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(message)
|
||||||
|
logging.warning(self.headers)
|
||||||
|
logging.warning(self.data)
|
||||||
|
return
|
||||||
|
|
||||||
|
def _send_confirm_request(self):
|
||||||
|
url = self.data['WSGISubscribeURL']
|
||||||
|
headers = {
|
||||||
|
'Accept': 'application/json',
|
||||||
|
'Content-Type': 'application/json',
|
||||||
|
'X-Project-ID': self.data['X-Project-ID'],
|
||||||
|
'Client-ID': str(uuid.uuid4()),
|
||||||
|
'URL-Methods': self.data['URL-Methods'],
|
||||||
|
'URL-Signature': self.data['URL-Signature'],
|
||||||
|
'URL-Paths': self.data['URL-Paths'],
|
||||||
|
'URL-Expires': self.data['URL-Expires'],
|
||||||
|
}
|
||||||
|
data = {'confirmed': True}
|
||||||
|
requests.put(url=url, data=json.dumps(data), headers=headers)
|
||||||
|
|
||||||
|
Handler = ServerHandler
|
||||||
|
httpd = SocketServer.TCPServer(("", PORT), Handler)
|
||||||
|
httpd.serve_forever()
|
@ -21,7 +21,10 @@ from oslo_config import cfg
|
|||||||
|
|
||||||
PASSWORD_PLUGIN = 'password'
|
PASSWORD_PLUGIN = 'password'
|
||||||
TRUSTEE_CONF_GROUP = 'trustee'
|
TRUSTEE_CONF_GROUP = 'trustee'
|
||||||
|
KEYSTONE_AUTHTOKEN_GROUP = 'keystone_authtoken'
|
||||||
loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP)
|
loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP)
|
||||||
|
loading.register_auth_conf_options(cfg.CONF, KEYSTONE_AUTHTOKEN_GROUP)
|
||||||
|
_ZAQAR_ENDPOINTS = {}
|
||||||
|
|
||||||
|
|
||||||
def _config_options():
|
def _config_options():
|
||||||
@ -39,9 +42,9 @@ def get_trusted_token(trust_id):
|
|||||||
return trust_session.auth.get_access(trust_session).auth_token
|
return trust_session.auth.get_access(trust_session).auth_token
|
||||||
|
|
||||||
|
|
||||||
def _get_admin_session():
|
def _get_admin_session(conf_group):
|
||||||
auth_plugin = loading.load_auth_from_conf_options(
|
auth_plugin = loading.load_auth_from_conf_options(
|
||||||
cfg.CONF, TRUSTEE_CONF_GROUP)
|
cfg.CONF, conf_group)
|
||||||
return session.Session(auth=auth_plugin)
|
return session.Session(auth=auth_plugin)
|
||||||
|
|
||||||
|
|
||||||
@ -53,7 +56,7 @@ def _get_user_client(auth_plugin):
|
|||||||
def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles,
|
def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles,
|
||||||
expires_at):
|
expires_at):
|
||||||
"""Create a trust with the given user for the configured trustee user."""
|
"""Create a trust with the given user for the configured trustee user."""
|
||||||
admin_session = _get_admin_session()
|
admin_session = _get_admin_session(TRUSTEE_CONF_GROUP)
|
||||||
trustee_user_id = admin_session.get_user_id()
|
trustee_user_id = admin_session.get_user_id()
|
||||||
|
|
||||||
client = _get_user_client(auth_plugin)
|
client = _get_user_client(auth_plugin)
|
||||||
@ -64,3 +67,29 @@ def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles,
|
|||||||
role_names=roles,
|
role_names=roles,
|
||||||
expires_at=expires_at)
|
expires_at=expires_at)
|
||||||
return trust.id
|
return trust.id
|
||||||
|
|
||||||
|
|
||||||
|
def get_public_endpoint():
|
||||||
|
"""Get Zaqar's public endpoint from keystone"""
|
||||||
|
global _ZAQAR_ENDPOINTS
|
||||||
|
|
||||||
|
if _ZAQAR_ENDPOINTS:
|
||||||
|
return _ZAQAR_ENDPOINTS
|
||||||
|
|
||||||
|
zaqar_session = _get_admin_session(KEYSTONE_AUTHTOKEN_GROUP)
|
||||||
|
auth = zaqar_session.auth
|
||||||
|
if not auth:
|
||||||
|
return _ZAQAR_ENDPOINTS
|
||||||
|
|
||||||
|
catalogs = auth.get_auth_ref(zaqar_session).service_catalog
|
||||||
|
try:
|
||||||
|
_ZAQAR_ENDPOINTS['zaqar'] = catalogs.url_for(service_name='zaqar')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
_ZAQAR_ENDPOINTS['zaqar-websocket'] = catalogs.url_for(
|
||||||
|
service_name='zaqar-websocket')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return _ZAQAR_ENDPOINTS
|
||||||
|
@ -59,7 +59,10 @@ _NOTIFICATION_OPTIONS = (
|
|||||||
help=('The command of smtp to send email. The format is '
|
help=('The command of smtp to send email. The format is '
|
||||||
'"command_name arg1 arg2".')),
|
'"command_name arg1 arg2".')),
|
||||||
cfg.IntOpt('max_notifier_workers', default=10,
|
cfg.IntOpt('max_notifier_workers', default=10,
|
||||||
help='The max amount of the notification workers.')
|
help='The max amount of the notification workers.'),
|
||||||
|
cfg.BoolOpt('require_confirmation', default=False,
|
||||||
|
help='Whether the http/https/email subscription need to be '
|
||||||
|
'confirmed before notification.'),
|
||||||
)
|
)
|
||||||
|
|
||||||
_NOTIFICATION_GROUP = 'notification'
|
_NOTIFICATION_GROUP = 'notification'
|
||||||
|
@ -13,18 +13,30 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import enum
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
import futurist
|
import futurist
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from six.moves import urllib_parse
|
from six.moves import urllib_parse
|
||||||
|
|
||||||
|
from zaqar.common import auth
|
||||||
|
from zaqar.common import urls
|
||||||
from zaqar.i18n import _LE
|
from zaqar.i18n import _LE
|
||||||
|
from zaqar.i18n import _LI
|
||||||
from zaqar.storage import pooling
|
from zaqar.storage import pooling
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class MessageType(enum.IntEnum):
|
||||||
|
"""Enum of message type."""
|
||||||
|
SubscriptionConfirmation = 1
|
||||||
|
UnsubscribeConfirmation = 2
|
||||||
|
Notification = 3
|
||||||
|
|
||||||
|
|
||||||
class NotifierDriver(object):
|
class NotifierDriver(object):
|
||||||
"""Notifier which is responsible for sending messages to subscribers.
|
"""Notifier which is responsible for sending messages to subscribers.
|
||||||
|
|
||||||
@ -34,6 +46,7 @@ class NotifierDriver(object):
|
|||||||
self.subscription_controller = kwargs.get('subscription_controller')
|
self.subscription_controller = kwargs.get('subscription_controller')
|
||||||
max_workers = kwargs.get('max_notifier_workers', 10)
|
max_workers = kwargs.get('max_notifier_workers', 10)
|
||||||
self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers)
|
self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers)
|
||||||
|
self.require_confirmation = kwargs.get('require_confirmation', False)
|
||||||
|
|
||||||
def post(self, queue_name, messages, client_uuid, project=None):
|
def post(self, queue_name, messages, client_uuid, project=None):
|
||||||
"""Send messages to the subscribers."""
|
"""Send messages to the subscribers."""
|
||||||
@ -48,14 +61,76 @@ class NotifierDriver(object):
|
|||||||
LOG.debug("Notifying subscriber %r" % (sub,))
|
LOG.debug("Notifying subscriber %r" % (sub,))
|
||||||
s_type = urllib_parse.urlparse(
|
s_type = urllib_parse.urlparse(
|
||||||
sub['subscriber']).scheme
|
sub['subscriber']).scheme
|
||||||
data_driver = self.subscription_controller.driver
|
# If the subscriber doesn't contain 'confirmed', it
|
||||||
mgr = driver.DriverManager('zaqar.notification.tasks',
|
# means that this kind of subscriber was created before
|
||||||
s_type,
|
# the confirm feature be introduced into Zaqar. We
|
||||||
invoke_on_load=True)
|
# should allow them be subscribed.
|
||||||
self.executor.submit(mgr.driver.execute, sub, messages,
|
if (self.require_confirmation and
|
||||||
conf=data_driver.conf)
|
not sub.get('confirmed', True)):
|
||||||
|
LOG.info(_LI('The subscriber %s is not '
|
||||||
|
'confirmed.'), sub['subscriber'])
|
||||||
|
continue
|
||||||
|
for msg in messages:
|
||||||
|
msg['Message_Type'] = MessageType.Notification.name
|
||||||
|
self._execute(s_type, sub, messages)
|
||||||
marker = next(subscribers)
|
marker = next(subscribers)
|
||||||
if not marker:
|
if not marker:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
LOG.error(_LE('Failed to get subscription controller.'))
|
LOG.error(_LE('Failed to get subscription controller.'))
|
||||||
|
|
||||||
|
def send_confirm_notification(self, queue, subscription, conf,
|
||||||
|
project=None, expires=None,
|
||||||
|
api_version=None):
|
||||||
|
key = conf.signed_url.secret_key
|
||||||
|
if not key:
|
||||||
|
LOG.error(_LE("Can't send confirm notification due to the value of"
|
||||||
|
" secret_key option is None"))
|
||||||
|
return
|
||||||
|
url = "/%s/queues/%s/subscriptions/%s/confirm" % (api_version, queue,
|
||||||
|
subscription['id'])
|
||||||
|
pre_url = urls.create_signed_url(key, [url], project=project,
|
||||||
|
expires=expires, methods=['PUT'])
|
||||||
|
message_type = MessageType.SubscriptionConfirmation.name
|
||||||
|
|
||||||
|
messages = {}
|
||||||
|
endpoint_dict = auth.get_public_endpoint()
|
||||||
|
if endpoint_dict:
|
||||||
|
wsgi_endpoint = endpoint_dict.get('zaqar', None)
|
||||||
|
if wsgi_endpoint:
|
||||||
|
wsgi_subscribe_url = urllib_parse.urljoin(
|
||||||
|
wsgi_endpoint, url)
|
||||||
|
messages['WSGISubscribeURL'] = wsgi_subscribe_url
|
||||||
|
websocket_endpoint = endpoint_dict.get('zaqar-websocket', None)
|
||||||
|
if websocket_endpoint:
|
||||||
|
websocket_subscribe_url = urllib_parse.urljoin(
|
||||||
|
websocket_endpoint, url)
|
||||||
|
messages['WebSocketSubscribeURL'] = websocket_subscribe_url
|
||||||
|
messages.update({'Message_Type': message_type,
|
||||||
|
'Message': 'You have chosen to subscribe to the '
|
||||||
|
'queue: %s' % queue,
|
||||||
|
'URL-Signature': pre_url['signature'],
|
||||||
|
'URL-Methods': pre_url['methods'][0],
|
||||||
|
'URL-Paths': pre_url['paths'][0],
|
||||||
|
'X-Project-ID': pre_url['project'],
|
||||||
|
'URL-Expires': pre_url['expires'],
|
||||||
|
'SubscribeBody': {'confirmed': True},
|
||||||
|
'UnsubscribeBody': {'confirmed': False}})
|
||||||
|
s_type = urllib_parse.urlparse(subscription['subscriber']).scheme
|
||||||
|
LOG.info(_LI('Begin to send %(type)s confirm notification. The request'
|
||||||
|
'body is %(messages)s'),
|
||||||
|
{'type': s_type, 'messages': messages})
|
||||||
|
|
||||||
|
self._execute(s_type, subscription, [messages], conf)
|
||||||
|
|
||||||
|
def _execute(self, s_type, subscription, messages, conf=None):
|
||||||
|
if self.subscription_controller:
|
||||||
|
data_driver = self.subscription_controller.driver
|
||||||
|
conf = data_driver.conf
|
||||||
|
else:
|
||||||
|
conf = conf
|
||||||
|
mgr = driver.DriverManager('zaqar.notification.tasks',
|
||||||
|
s_type,
|
||||||
|
invoke_on_load=True)
|
||||||
|
self.executor.submit(mgr.driver.execute, subscription, messages,
|
||||||
|
conf=conf)
|
||||||
|
@ -48,6 +48,7 @@ class SubscriptionController(base.Subscription):
|
|||||||
'e': expires: datetime.datetime
|
'e': expires: datetime.datetime
|
||||||
'o': options :: dict
|
'o': options :: dict
|
||||||
'p': project :: six.text_type
|
'p': project :: six.text_type
|
||||||
|
'c': confirmed :: boolean
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
@ -70,7 +71,7 @@ class SubscriptionController(base.Subscription):
|
|||||||
if marker is not None:
|
if marker is not None:
|
||||||
query['_id'] = {'$gt': utils.to_oid(marker)}
|
query['_id'] = {'$gt': utils.to_oid(marker)}
|
||||||
|
|
||||||
projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1}
|
projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1, 'c': 1}
|
||||||
|
|
||||||
cursor = self._collection.find(query, projection=projection)
|
cursor = self._collection.find(query, projection=projection)
|
||||||
cursor = cursor.limit(limit).sort('_id')
|
cursor = cursor.limit(limit).sort('_id')
|
||||||
@ -103,6 +104,7 @@ class SubscriptionController(base.Subscription):
|
|||||||
now = timeutils.utcnow_ts()
|
now = timeutils.utcnow_ts()
|
||||||
now_dt = datetime.datetime.utcfromtimestamp(now)
|
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||||
expires = now_dt + datetime.timedelta(seconds=ttl)
|
expires = now_dt + datetime.timedelta(seconds=ttl)
|
||||||
|
confirmed = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
subscription_id = self._collection.insert({'s': source,
|
subscription_id = self._collection.insert({'s': source,
|
||||||
@ -110,7 +112,8 @@ class SubscriptionController(base.Subscription):
|
|||||||
't': ttl,
|
't': ttl,
|
||||||
'e': expires,
|
'e': expires,
|
||||||
'o': options,
|
'o': options,
|
||||||
'p': project})
|
'p': project,
|
||||||
|
'c': confirmed})
|
||||||
return subscription_id
|
return subscription_id
|
||||||
except pymongo.errors.DuplicateKeyError:
|
except pymongo.errors.DuplicateKeyError:
|
||||||
return None
|
return None
|
||||||
@ -153,6 +156,22 @@ class SubscriptionController(base.Subscription):
|
|||||||
self._collection.remove({'_id': utils.to_oid(subscription_id),
|
self._collection.remove({'_id': utils.to_oid(subscription_id),
|
||||||
'p': project}, w=0)
|
'p': project}, w=0)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def get_with_subscriber(self, queue, subscriber, project=None):
|
||||||
|
res = self._collection.find_one({'u': subscriber,
|
||||||
|
'p': project})
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
return _basic_subscription(res, now)
|
||||||
|
|
||||||
|
@utils.raises_conn_error
|
||||||
|
def confirm(self, queue, subscription_id, project=None, confirm=True):
|
||||||
|
|
||||||
|
res = self._collection.update({'_id': utils.to_oid(subscription_id),
|
||||||
|
'p': project}, {'$set': {'c': confirm}},
|
||||||
|
upsert=False)
|
||||||
|
if not res['updatedExisting']:
|
||||||
|
raise errors.SubscriptionDoesNotExist(subscription_id)
|
||||||
|
|
||||||
|
|
||||||
def _basic_subscription(record, now):
|
def _basic_subscription(record, now):
|
||||||
# NOTE(Eva-i): unused here record's field 'e' (expires) has changed it's
|
# NOTE(Eva-i): unused here record's field 'e' (expires) has changed it's
|
||||||
@ -161,11 +180,13 @@ def _basic_subscription(record, now):
|
|||||||
# starting using 'e' field should make sure support both of the formats.
|
# starting using 'e' field should make sure support both of the formats.
|
||||||
oid = record['_id']
|
oid = record['_id']
|
||||||
age = now - utils.oid_ts(oid)
|
age = now - utils.oid_ts(oid)
|
||||||
|
confirmed = record.get('c', True)
|
||||||
return {
|
return {
|
||||||
'id': str(oid),
|
'id': str(oid),
|
||||||
'source': record['s'],
|
'source': record['s'],
|
||||||
'subscriber': record['u'],
|
'subscriber': record['u'],
|
||||||
'ttl': record['t'],
|
'ttl': record['t'],
|
||||||
'age': int(age),
|
'age': int(age),
|
||||||
'options': record['o']
|
'options': record['o'],
|
||||||
|
'confirmed': confirmed,
|
||||||
}
|
}
|
||||||
|
@ -150,7 +150,9 @@ class DataDriver(base.DataDriverBase):
|
|||||||
kwargs = {'subscription_controller':
|
kwargs = {'subscription_controller':
|
||||||
self._storage.subscription_controller,
|
self._storage.subscription_controller,
|
||||||
'max_notifier_workers':
|
'max_notifier_workers':
|
||||||
self.conf.notification.max_notifier_workers}
|
self.conf.notification.max_notifier_workers,
|
||||||
|
'require_confirmation':
|
||||||
|
self.conf.notification.require_confirmation}
|
||||||
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
|
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
|
||||||
stages.append(self._storage.message_controller)
|
stages.append(self._storage.message_controller)
|
||||||
return common.Pipeline(stages)
|
return common.Pipeline(stages)
|
||||||
|
@ -409,6 +409,17 @@ class SubscriptionController(storage.Subscription):
|
|||||||
return control.exists(queue, subscription_id,
|
return control.exists(queue, subscription_id,
|
||||||
project=project)
|
project=project)
|
||||||
|
|
||||||
|
def confirm(self, queue, subscription_id, project=None, confirm=None):
|
||||||
|
control = self._get_controller(queue, project)
|
||||||
|
if control:
|
||||||
|
return control.confirm(queue, subscription_id,
|
||||||
|
project=project, confirm=confirm)
|
||||||
|
|
||||||
|
def get_with_subscriber(self, queue, subscriber, project=None):
|
||||||
|
control = self._get_controller(queue, project)
|
||||||
|
if control:
|
||||||
|
return control.get_with_subscriber(queue, subscriber, project)
|
||||||
|
|
||||||
|
|
||||||
class Catalog(object):
|
class Catalog(object):
|
||||||
"""Represents the mapping between queues and pool drivers."""
|
"""Represents the mapping between queues and pool drivers."""
|
||||||
|
@ -58,6 +58,10 @@ class TestBase(testtools.TestCase):
|
|||||||
group=configs._DRIVER_GROUP)
|
group=configs._DRIVER_GROUP)
|
||||||
self.conf.register_opts(configs._NOTIFICATION_OPTIONS,
|
self.conf.register_opts(configs._NOTIFICATION_OPTIONS,
|
||||||
group=configs._NOTIFICATION_GROUP)
|
group=configs._NOTIFICATION_GROUP)
|
||||||
|
self.conf.register_opts(configs._NOTIFICATION_OPTIONS,
|
||||||
|
group=configs._NOTIFICATION_GROUP)
|
||||||
|
self.conf.register_opts(configs._SIGNED_URL_OPTIONS,
|
||||||
|
group=configs._SIGNED_URL_GROUP)
|
||||||
|
|
||||||
self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL',
|
self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL',
|
||||||
'mongodb://127.0.0.1:27017')
|
'mongodb://127.0.0.1:27017')
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
"subscription:get": "",
|
"subscription:get": "",
|
||||||
"subscription:delete": "",
|
"subscription:delete": "",
|
||||||
"subscription:update": "",
|
"subscription:update": "",
|
||||||
|
"subscription:confirm": "",
|
||||||
|
|
||||||
"pools:get_all": "rule:context_is_admin",
|
"pools:get_all": "rule:context_is_admin",
|
||||||
"pools:create": "rule:context_is_admin",
|
"pools:create": "rule:context_is_admin",
|
||||||
|
@ -115,10 +115,15 @@ class TestSubscriptions(base.BaseV2MessagingTest):
|
|||||||
if not test.call_until_true(
|
if not test.call_until_true(
|
||||||
lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1):
|
lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1):
|
||||||
self.fail("Couldn't get messages")
|
self.fail("Couldn't get messages")
|
||||||
messages = self.list_messages(sub_queue)
|
_, body = self.list_messages(sub_queue)
|
||||||
expected = message_body['messages'][0]
|
expected = message_body['messages'][0]
|
||||||
expected['queue_name'] = self.queue_name
|
expected['queue_name'] = self.queue_name
|
||||||
self.assertEqual(expected, messages[1]['messages'][0]['body'])
|
expected['Message_Type'] = 'Notification'
|
||||||
|
for message in body['messages']:
|
||||||
|
# There are two message in the queue. One is the confirm message,
|
||||||
|
# the other one is the notification.
|
||||||
|
if message['body']['Message_Type'] == 'Notification':
|
||||||
|
self.assertEqual(expected, message['body'])
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def resource_cleanup(cls):
|
def resource_cleanup(cls):
|
||||||
|
@ -17,6 +17,7 @@ import uuid
|
|||||||
|
|
||||||
from tempest import config
|
from tempest import config
|
||||||
from tempest.lib.common.utils import data_utils
|
from tempest.lib.common.utils import data_utils
|
||||||
|
from tempest.lib import decorators
|
||||||
from tempest.lib import exceptions as lib_exc
|
from tempest.lib import exceptions as lib_exc
|
||||||
from tempest import test
|
from tempest import test
|
||||||
|
|
||||||
@ -45,6 +46,10 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest):
|
|||||||
|
|
||||||
# Create Subscriptions
|
# Create Subscriptions
|
||||||
|
|
||||||
|
# TODO(wangxiyuan): Now the subscription confirmation feature only support
|
||||||
|
# mongoDB backend. Skip this test until the feature support the redis
|
||||||
|
# backend. Then rewrite it.
|
||||||
|
@decorators.skip_because(bug='1609596')
|
||||||
@test.attr(type=['negative'])
|
@test.attr(type=['negative'])
|
||||||
@test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74')
|
@test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74')
|
||||||
def test_create_subscriptions_with_duplicate_subscriber(self):
|
def test_create_subscriptions_with_duplicate_subscriber(self):
|
||||||
|
@ -43,14 +43,17 @@ class NotifierTest(testing.TestBase):
|
|||||||
"body": {"event": "BackupStarted",
|
"body": {"event": "BackupStarted",
|
||||||
"backup_id":
|
"backup_id":
|
||||||
"c378813c-3f0b-11e2-ad92"},
|
"c378813c-3f0b-11e2-ad92"},
|
||||||
"queue_name": "fake_queue"
|
"queue_name": "fake_queue",
|
||||||
|
"Message_Type": "Notification"
|
||||||
},
|
},
|
||||||
{"body": {"event": "BackupProgress",
|
{"body": {"event": "BackupProgress",
|
||||||
"current_bytes": "0",
|
"current_bytes": "0",
|
||||||
"total_bytes": "99614720"},
|
"total_bytes": "99614720"},
|
||||||
"queue_name": "fake_queue"
|
"queue_name": "fake_queue",
|
||||||
|
"Message_Type": "Notification"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
self.api_version = 'v2'
|
||||||
|
|
||||||
def test_webhook(self):
|
def test_webhook(self):
|
||||||
subscription = [{'subscriber': 'http://trigger_me',
|
subscription = [{'subscriber': 'http://trigger_me',
|
||||||
@ -257,3 +260,41 @@ class NotifierTest(testing.TestBase):
|
|||||||
self.assertEqual(2, mock_post.call_count)
|
self.assertEqual(2, mock_post.call_count)
|
||||||
self.assertEqual(self.notifications[1],
|
self.assertEqual(self.notifications[1],
|
||||||
json.loads(mock_post.call_args[1]['data']))
|
json.loads(mock_post.call_args[1]['data']))
|
||||||
|
|
||||||
|
@mock.patch('requests.post')
|
||||||
|
def test_send_confirm_notification(self, mock_request):
|
||||||
|
subscription = {'id': '5760c9fb3990b42e8b7c20bd',
|
||||||
|
'subscriber': 'http://trigger_me',
|
||||||
|
'source': 'fake_queue',
|
||||||
|
'options': {}}
|
||||||
|
ctlr = mock.MagicMock()
|
||||||
|
ctlr.list = mock.Mock(return_value=subscription)
|
||||||
|
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||||
|
self.conf.signed_url.secret_key = 'test_key'
|
||||||
|
driver.send_confirm_notification('test_queue', subscription, self.conf,
|
||||||
|
str(self.project),
|
||||||
|
api_version=self.api_version)
|
||||||
|
driver.executor.shutdown()
|
||||||
|
|
||||||
|
self.assertEqual(1, mock_request.call_count)
|
||||||
|
expect_args = ['SubscribeBody', 'queue_name', 'URL-Methods',
|
||||||
|
'X-Project-ID', 'URL-Signature', 'URL-Paths', 'Message',
|
||||||
|
'URL-Expires', 'Message_Type', 'WSGISubscribeURL',
|
||||||
|
'WebSocketSubscribeURL' 'UnsubscribeBody']
|
||||||
|
actual_args = json.loads(mock_request.call_args[1]['data']).keys()
|
||||||
|
self.assertEqual(expect_args.sort(),
|
||||||
|
list(actual_args).sort())
|
||||||
|
|
||||||
|
@mock.patch('requests.post')
|
||||||
|
def test_send_confirm_notification_without_signed_url(self, mock_request):
|
||||||
|
subscription = [{'subscriber': 'http://trigger_me',
|
||||||
|
'source': 'fake_queue', 'options': {}}]
|
||||||
|
ctlr = mock.MagicMock()
|
||||||
|
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
|
||||||
|
driver = notifier.NotifierDriver(subscription_controller=ctlr)
|
||||||
|
|
||||||
|
driver.send_confirm_notification('test_queue', subscription, self.conf,
|
||||||
|
str(self.project), self.api_version)
|
||||||
|
driver.executor.shutdown()
|
||||||
|
|
||||||
|
self.assertEqual(0, mock_request.call_count)
|
||||||
|
@ -495,6 +495,34 @@ class MongodbSubscriptionTests(MongodbSetupMixin,
|
|||||||
controller_class = controllers.SubscriptionController
|
controller_class = controllers.SubscriptionController
|
||||||
control_driver_class = mongodb.ControlDriver
|
control_driver_class = mongodb.ControlDriver
|
||||||
|
|
||||||
|
def test_confirm(self):
|
||||||
|
s_id = self.subscription_controller.create(self.source,
|
||||||
|
self.subscriber,
|
||||||
|
self.ttl,
|
||||||
|
self.options,
|
||||||
|
project=self.project)
|
||||||
|
self.addCleanup(self.subscription_controller.delete, self.source,
|
||||||
|
s_id, self.project)
|
||||||
|
subscription = self.subscription_controller.get(self.source, s_id,
|
||||||
|
project=self.project)
|
||||||
|
|
||||||
|
self.assertEqual(False, subscription['confirmed'])
|
||||||
|
|
||||||
|
self.subscription_controller.confirm(self.source, s_id,
|
||||||
|
project=self.project,
|
||||||
|
confirm=True)
|
||||||
|
subscription = self.subscription_controller.get(self.source, s_id,
|
||||||
|
project=self.project)
|
||||||
|
|
||||||
|
self.assertEqual(True, subscription['confirmed'])
|
||||||
|
|
||||||
|
def test_confirm_with_nonexist_subscription(self):
|
||||||
|
s_id = 'fake-id'
|
||||||
|
self.assertRaises(errors.SubscriptionDoesNotExist,
|
||||||
|
self.subscription_controller.confirm,
|
||||||
|
self.source, s_id, project=self.project, confirm=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# TODO(kgriffs): Do these need database purges as well as those above?
|
# TODO(kgriffs): Do these need database purges as well as those above?
|
||||||
|
@ -236,7 +236,8 @@ class SubscriptionTest(base.V1_1Base):
|
|||||||
'source': 'kitkat',
|
'source': 'kitkat',
|
||||||
'options': {},
|
'options': {},
|
||||||
'id': str(sub),
|
'id': str(sub),
|
||||||
'ttl': 600},
|
'ttl': 600,
|
||||||
|
'confirmed': False},
|
||||||
'headers': {'status': 200},
|
'headers': {'status': 200},
|
||||||
'request': {'action': 'subscription_get',
|
'request': {'action': 'subscription_get',
|
||||||
'body': {'queue_name': 'kitkat',
|
'body': {'queue_name': 'kitkat',
|
||||||
@ -273,7 +274,8 @@ class SubscriptionTest(base.V1_1Base):
|
|||||||
'source': 'kitkat',
|
'source': 'kitkat',
|
||||||
'options': {},
|
'options': {},
|
||||||
'id': str(sub),
|
'id': str(sub),
|
||||||
'ttl': 600}]},
|
'ttl': 600,
|
||||||
|
'confirmed': False}]},
|
||||||
'headers': {'status': 200},
|
'headers': {'status': 200},
|
||||||
'request': {'action': 'subscription_list',
|
'request': {'action': 'subscription_list',
|
||||||
'body': {'queue_name': 'kitkat'},
|
'body': {'queue_name': 'kitkat'},
|
||||||
@ -346,7 +348,8 @@ class SubscriptionTest(base.V1_1Base):
|
|||||||
ws_notification = msgpack.unpackb(sender.call_args_list[2][0][0],
|
ws_notification = msgpack.unpackb(sender.call_args_list[2][0][0],
|
||||||
encoding='utf-8')
|
encoding='utf-8')
|
||||||
self.assertEqual({'body': {'status': 'disco queen'}, 'ttl': 60,
|
self.assertEqual({'body': {'status': 'disco queen'}, 'ttl': 60,
|
||||||
'queue_name': 'kitkat'}, ws_notification)
|
'queue_name': 'kitkat',
|
||||||
|
'Message_Type': u'Notification'}, ws_notification)
|
||||||
|
|
||||||
def test_list_returns_503_on_nopoolfound_exception(self):
|
def test_list_returns_503_on_nopoolfound_exception(self):
|
||||||
sub = self.boot.storage.subscription_controller.create(
|
sub = self.boot.storage.subscription_controller.create(
|
||||||
|
@ -20,6 +20,7 @@ import mock
|
|||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
|
|
||||||
from zaqar.common import auth
|
from zaqar.common import auth
|
||||||
|
from zaqar.notification import notifier
|
||||||
from zaqar.storage import errors as storage_errors
|
from zaqar.storage import errors as storage_errors
|
||||||
from zaqar import tests as testing
|
from zaqar import tests as testing
|
||||||
from zaqar.tests.unit.transport.wsgi import base
|
from zaqar.tests.unit.transport.wsgi import base
|
||||||
@ -57,6 +58,11 @@ class TestSubscriptionsMongoDB(base.V2Base):
|
|||||||
|
|
||||||
self.subscription_path = (self.url_prefix + '/queues/' + self.queue +
|
self.subscription_path = (self.url_prefix + '/queues/' + self.queue +
|
||||||
'/subscriptions')
|
'/subscriptions')
|
||||||
|
self.subscription = 'fake-id'
|
||||||
|
self.confirm_path = (self.url_prefix + '/queues/' + self.queue +
|
||||||
|
'/subscriptions/' + self.subscription +
|
||||||
|
'/confirm')
|
||||||
|
self.conf.signed_url.secret_key = 'test_key'
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
resp = self.simulate_get(self.subscription_path,
|
resp = self.simulate_get(self.subscription_path,
|
||||||
@ -95,6 +101,44 @@ class TestSubscriptionsMongoDB(base.V2Base):
|
|||||||
self._create_subscription(subscriber='http://CCC.com')
|
self._create_subscription(subscriber='http://CCC.com')
|
||||||
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||||
|
|
||||||
|
# the subscription is not confirmed, So the second request will
|
||||||
|
# retry confirm and return 201 again.
|
||||||
|
self._create_subscription(subscriber='http://CCC.com')
|
||||||
|
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||||
|
|
||||||
|
@mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification')
|
||||||
|
def test_create_and_send_notification(self, mock_send_confirm):
|
||||||
|
self._create_subscription(subscriber='http://CCC.com')
|
||||||
|
self.assertEqual(1, mock_send_confirm.call_count)
|
||||||
|
|
||||||
|
@mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification')
|
||||||
|
def test_recreate(self, mock_send_confirm):
|
||||||
|
resp = self._create_subscription(subscriber='http://CCC.com')
|
||||||
|
resp_doc = jsonutils.loads(resp[0])
|
||||||
|
s_id1 = resp_doc['subscription_id']
|
||||||
|
self.assertEqual(1, mock_send_confirm.call_count)
|
||||||
|
|
||||||
|
resp = self._create_subscription(subscriber='http://CCC.com')
|
||||||
|
resp_doc = jsonutils.loads(resp[0])
|
||||||
|
s_id2 = resp_doc['subscription_id']
|
||||||
|
self.assertEqual(2, mock_send_confirm.call_count)
|
||||||
|
|
||||||
|
self.assertEqual(s_id1, s_id2)
|
||||||
|
|
||||||
|
@mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification')
|
||||||
|
def test_recreate_after_confirmed(self, mock_send_confirm):
|
||||||
|
resp = self._create_subscription(subscriber='http://CCC.com')
|
||||||
|
self.assertEqual(falcon.HTTP_201, self.srmock.status)
|
||||||
|
|
||||||
|
doc = '{"confirmed": true}'
|
||||||
|
resp_doc = jsonutils.loads(resp[0])
|
||||||
|
confirm_path = (self.url_prefix + '/queues/' + self.queue +
|
||||||
|
'/subscriptions/' + resp_doc['subscription_id'] +
|
||||||
|
'/confirm')
|
||||||
|
self.simulate_put(confirm_path, body=doc, headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_204, self.srmock.status)
|
||||||
|
self.assertEqual(1, mock_send_confirm.call_count)
|
||||||
|
|
||||||
self._create_subscription(subscriber='http://CCC.com')
|
self._create_subscription(subscriber='http://CCC.com')
|
||||||
self.assertEqual(falcon.HTTP_409, self.srmock.status)
|
self.assertEqual(falcon.HTTP_409, self.srmock.status)
|
||||||
|
|
||||||
@ -353,3 +397,35 @@ class TestSubscriptionsMongoDB(base.V2Base):
|
|||||||
options = resp_list_doc['subscriptions'][0]['options']
|
options = resp_list_doc['subscriptions'][0]['options']
|
||||||
|
|
||||||
self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options)
|
self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options)
|
||||||
|
|
||||||
|
def test_confirm(self):
|
||||||
|
doc = '{"confirmed": true}'
|
||||||
|
resp = self._create_subscription()
|
||||||
|
resp_doc = jsonutils.loads(resp[0])
|
||||||
|
confirm_path = (self.url_prefix + '/queues/' + self.queue +
|
||||||
|
'/subscriptions/' + resp_doc['subscription_id'] +
|
||||||
|
'/confirm')
|
||||||
|
self.simulate_put(confirm_path, body=doc, headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_204, self.srmock.status)
|
||||||
|
|
||||||
|
def test_confirm_with_invalid_body(self):
|
||||||
|
doc = '{confirmed:123}'
|
||||||
|
resp = self.simulate_put(self.confirm_path, body=doc,
|
||||||
|
headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||||
|
resp_doc = jsonutils.loads(resp[0])
|
||||||
|
self.assertIn('body could not be parsed', resp_doc['description'])
|
||||||
|
|
||||||
|
def test_confirm_without_boolean_body(self):
|
||||||
|
doc = '{"confirmed":123}'
|
||||||
|
resp = self.simulate_put(self.confirm_path, body=doc,
|
||||||
|
headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_400, self.srmock.status)
|
||||||
|
resp_doc = jsonutils.loads(resp[0])
|
||||||
|
self.assertEqual("The 'confirmed' should be boolean.",
|
||||||
|
resp_doc['description'])
|
||||||
|
|
||||||
|
def test_confirm_with_non_subscription(self):
|
||||||
|
doc = '{"confirmed": true}'
|
||||||
|
self.simulate_put(self.confirm_path, body=doc, headers=self.headers)
|
||||||
|
self.assertEqual(falcon.HTTP_404, self.srmock.status)
|
||||||
|
@ -540,6 +540,12 @@ class Validator(object):
|
|||||||
except OverflowError:
|
except OverflowError:
|
||||||
raise ValidationFailed(msg, datetime.datetime.max)
|
raise ValidationFailed(msg, datetime.datetime.max)
|
||||||
|
|
||||||
|
def subscription_confirming(self, confirm):
|
||||||
|
confirm = confirm.get('confirmed', None)
|
||||||
|
if not isinstance(confirm, bool):
|
||||||
|
msg = _(u"The 'confirmed' should be boolean.")
|
||||||
|
raise ValidationFailed(msg)
|
||||||
|
|
||||||
def subscription_listing(self, limit=None, **kwargs):
|
def subscription_listing(self, limit=None, **kwargs):
|
||||||
"""Restrictions involving a list of subscriptions.
|
"""Restrictions involving a list of subscriptions.
|
||||||
|
|
||||||
|
@ -101,12 +101,17 @@ def public_endpoints(driver, conf):
|
|||||||
subscriptions.CollectionResource(driver._validate,
|
subscriptions.CollectionResource(driver._validate,
|
||||||
subscription_controller,
|
subscription_controller,
|
||||||
defaults.subscription_ttl,
|
defaults.subscription_ttl,
|
||||||
queue_controller)),
|
queue_controller,
|
||||||
|
conf)),
|
||||||
|
|
||||||
('/queues/{queue_name}/subscriptions/{subscription_id}',
|
('/queues/{queue_name}/subscriptions/{subscription_id}',
|
||||||
subscriptions.ItemResource(driver._validate,
|
subscriptions.ItemResource(driver._validate,
|
||||||
subscription_controller)),
|
subscription_controller)),
|
||||||
|
|
||||||
|
('/queues/{queue_name}/subscriptions/{subscription_id}/confirm',
|
||||||
|
subscriptions.ConfirmResource(driver._validate,
|
||||||
|
subscription_controller)),
|
||||||
|
|
||||||
# Pre-Signed URL Endpoint
|
# Pre-Signed URL Endpoint
|
||||||
('/queues/{queue_name}/share', urls.Resource(driver)),
|
('/queues/{queue_name}/share', urls.Resource(driver)),
|
||||||
]
|
]
|
||||||
|
@ -12,15 +12,18 @@
|
|||||||
# implied.
|
# implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import datetime
|
||||||
|
|
||||||
import falcon
|
import falcon
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import netutils
|
from oslo_utils import netutils
|
||||||
|
from oslo_utils import timeutils
|
||||||
import six
|
import six
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
from zaqar.common import decorators
|
from zaqar.common import decorators
|
||||||
from zaqar.i18n import _
|
from zaqar.i18n import _
|
||||||
|
from zaqar.notification import notifier
|
||||||
from zaqar.storage import errors as storage_errors
|
from zaqar.storage import errors as storage_errors
|
||||||
from zaqar.transport import acl
|
from zaqar.transport import acl
|
||||||
from zaqar.transport import utils
|
from zaqar.transport import utils
|
||||||
@ -111,14 +114,17 @@ class ItemResource(object):
|
|||||||
class CollectionResource(object):
|
class CollectionResource(object):
|
||||||
|
|
||||||
__slots__ = ('_subscription_controller', '_validate',
|
__slots__ = ('_subscription_controller', '_validate',
|
||||||
'_default_subscription_ttl', '_queue_controller')
|
'_default_subscription_ttl', '_queue_controller',
|
||||||
|
'_conf', '_notification')
|
||||||
|
|
||||||
def __init__(self, validate, subscription_controller,
|
def __init__(self, validate, subscription_controller,
|
||||||
default_subscription_ttl, queue_controller):
|
default_subscription_ttl, queue_controller, conf):
|
||||||
self._subscription_controller = subscription_controller
|
self._subscription_controller = subscription_controller
|
||||||
self._validate = validate
|
self._validate = validate
|
||||||
self._default_subscription_ttl = default_subscription_ttl
|
self._default_subscription_ttl = default_subscription_ttl
|
||||||
self._queue_controller = queue_controller
|
self._queue_controller = queue_controller
|
||||||
|
self._conf = conf
|
||||||
|
self._notification = notifier.NotifierDriver()
|
||||||
|
|
||||||
@decorators.TransportLog("Subscription collection")
|
@decorators.TransportLog("Subscription collection")
|
||||||
@acl.enforce("subscription:get_all")
|
@acl.enforce("subscription:get_all")
|
||||||
@ -192,7 +198,6 @@ class CollectionResource(object):
|
|||||||
ttl,
|
ttl,
|
||||||
options,
|
options,
|
||||||
project=project_id)
|
project=project_id)
|
||||||
|
|
||||||
except validation.ValidationFailed as ex:
|
except validation.ValidationFailed as ex:
|
||||||
LOG.debug(ex)
|
LOG.debug(ex)
|
||||||
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
@ -201,13 +206,80 @@ class CollectionResource(object):
|
|||||||
description = _(u'Subscription could not be created.')
|
description = _(u'Subscription could not be created.')
|
||||||
raise wsgi_errors.HTTPServiceUnavailable(description)
|
raise wsgi_errors.HTTPServiceUnavailable(description)
|
||||||
|
|
||||||
|
now = timeutils.utcnow_ts()
|
||||||
|
now_dt = datetime.datetime.utcfromtimestamp(now)
|
||||||
|
expires = now_dt + datetime.timedelta(seconds=ttl)
|
||||||
|
api_version = req.path.split('/')[1]
|
||||||
if created:
|
if created:
|
||||||
|
subscription = self._subscription_controller.get(queue_name,
|
||||||
|
created,
|
||||||
|
project_id)
|
||||||
|
# send confirm notification
|
||||||
|
self._notification.send_confirm_notification(
|
||||||
|
queue_name, subscription, self._conf, project_id,
|
||||||
|
str(expires), api_version)
|
||||||
|
|
||||||
resp.location = req.path
|
resp.location = req.path
|
||||||
resp.status = falcon.HTTP_201
|
resp.status = falcon.HTTP_201
|
||||||
resp.body = utils.to_json(
|
resp.body = utils.to_json(
|
||||||
{'subscription_id': six.text_type(created)})
|
{'subscription_id': six.text_type(created)})
|
||||||
else:
|
else:
|
||||||
description = _(u'Such subscription already exists. Subscriptions '
|
subscription = self._subscription_controller.get_with_subscriber(
|
||||||
u'are unique by project + queue + subscriber URI.')
|
queue_name, subscriber, project_id)
|
||||||
raise wsgi_errors.HTTPConflict(description, headers={'location':
|
confirmed = subscription.get('confirmed', True)
|
||||||
req.path})
|
if confirmed:
|
||||||
|
description = _(u'Such subscription already exists.'
|
||||||
|
u'Subscriptions are unique by project + queue '
|
||||||
|
u'+ subscriber URI.')
|
||||||
|
raise wsgi_errors.HTTPConflict(description,
|
||||||
|
headers={'location': req.path})
|
||||||
|
else:
|
||||||
|
# The subscription is not confirmed, re-send confirm
|
||||||
|
# notification
|
||||||
|
self._notification.send_confirm_notification(
|
||||||
|
queue_name, subscription, self._conf, project_id,
|
||||||
|
str(expires), api_version)
|
||||||
|
|
||||||
|
resp.location = req.path
|
||||||
|
resp.status = falcon.HTTP_201
|
||||||
|
resp.body = utils.to_json(
|
||||||
|
{'subscription_id': six.text_type(subscription['id'])})
|
||||||
|
|
||||||
|
|
||||||
|
class ConfirmResource(object):
|
||||||
|
|
||||||
|
__slots__ = ('_subscription_controller', '_validate')
|
||||||
|
|
||||||
|
def __init__(self, validate, subscription_controller):
|
||||||
|
self._subscription_controller = subscription_controller
|
||||||
|
self._validate = validate
|
||||||
|
|
||||||
|
@decorators.TransportLog("Confirm Subscription")
|
||||||
|
@acl.enforce("subscription:confirm")
|
||||||
|
def on_put(self, req, resp, project_id, queue_name, subscription_id):
|
||||||
|
if req.content_length:
|
||||||
|
document = wsgi_utils.deserialize(req.stream, req.content_length)
|
||||||
|
else:
|
||||||
|
document = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._validate.subscription_confirming(document)
|
||||||
|
confirm = document.get('confirmed', None)
|
||||||
|
self._subscription_controller.confirm(queue_name, subscription_id,
|
||||||
|
project=project_id,
|
||||||
|
confirm=confirm)
|
||||||
|
resp.status = falcon.HTTP_204
|
||||||
|
resp.location = req.path
|
||||||
|
except storage_errors.SubscriptionDoesNotExist as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
|
||||||
|
except validation.ValidationFailed as ex:
|
||||||
|
LOG.debug(ex)
|
||||||
|
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
|
||||||
|
except Exception as ex:
|
||||||
|
LOG.exception(ex)
|
||||||
|
description = (_(u'Subscription %(subscription_id)s could not be'
|
||||||
|
' confirmed.') %
|
||||||
|
dict(subscription_id=subscription_id))
|
||||||
|
raise falcon.HTTPBadRequest(_('Unable to confirm subscription'),
|
||||||
|
description)
|
||||||
|
Loading…
Reference in New Issue
Block a user