Merge "Subscription Confirmation Support-1"

This commit is contained in:
Jenkins 2016-08-18 07:51:37 +00:00 committed by Gerrit Code Review
commit 545da53500
21 changed files with 709 additions and 29 deletions

View File

@ -0,0 +1,204 @@
..
Licensed under the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
==============================
The subscription Confirm Guide
==============================
The subscription confirm feature now only support webhook with mongoDB backend.
This guide shows how to use this feature:
1. Set the config option "require_confirmation" and add the policy to the
policy.json file. Then restart Zaqar-wsgi service::
In the config file:
[notification]
require_confirmation = True
In the policy.json file:
"subscription:confirm": "",
2. Create a subscription.
Here used zaqar/samples/zaqar/subscriber_service_sample.py be the subscriber
endpoint for example.So before the step 2, you should start the subscriber
service first.
The service could be started simply by the command::
python zaqar/samples/zaqar/subscriber_service_sample.py
The service's default port is 5678. If you want to use a new port, the command
will be like::
python zaqar/samples/zaqar/subscriber_service_sample.py new_port_number
The service will not confirm the subscription automatically by default. If you
want to do that, the command will be like::
python zaqar/samples/zaqar/subscriber_service_sample.py --auto-confirm
Then create a subscription::
curl -i -X POST http://10.229.47.217:8888/v2/queues/test/subscriptions \
-H "Content-type: application/json" \
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \
-d '{"subscriber":"http://10.229.47.217:5678", "ttl":3600, "options":{}}'
The response::
HTTP/1.1 201 Created
content-length: 47
content-type: application/json; charset=UTF-8
location: http://10.229.47.217:8888/v2/queues/test/subscriptions
Connection: close
{"subscription_id": "576256b03990b480617b4063"}
At the same time, If the subscriber sample service is not start by
"--auto confirm", you will receive a POST request in the subscriber sample
service, the request is like::
WARNING:root:{"UnsubscribeBody": {"confirmed": false}, "URL-Methods": "PUT",
"X-Project-ID": "51be2c72393e457ebf0a22a668e10a64",
"URL-Paths": "/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm",
"URL-Expires": "2016-07-06T04:35:56", "queue_name": "test",
"SubscribeURL": ["/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm"],
"SubscribeBody": {"confirmed": true},
"URL-Signature": "d4038a40589cdb61cd13d5a6997472f5be779db441dd8fe0c597a6e465f30c41",
"Message": "You have chosen to subscribe to the queue: test",
"Message_Type": "SubscriptionConfirmation"}
10.229.47.217 - - [06/Jul/2016 11:35:56] "POST / HTTP/1.1" 200 -
If you start the sample service with "--auto confirm", please go to step 6
directly, because the step 5 will be done by the service automatically.
3. Get the subscription.
The request::
curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \
-H "Content-type: application/json" \
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4"
The response::
HTTP/1.1 200 OK
content-length: 154
content-type: application/json; charset=UTF-8
Connection: close
{"confirmed": false, "age": 73, "id": "576256b03990b480617b4063",
"subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600, "options": {}}
You can find that the "confirmed" property is false by default.
4. Post a message to the subscription's queue
The request::
curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \
-H "Content-type: application/json" \
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \
-d '{"messages": [{"ttl": 3600,"body": "test123"}]}'
The response::
HTTP/1.1 201 Created
content-length: 68
content-type: application/json; charset=UTF-8
location: http://10.229.47.217:8888/v2/queues/test/messages?ids=57624dee3990b4634d71bb4a
Connection: close
{"resources": ["/v2/queues/test/messages/57624dee3990b4634d71bb4a"]}
The subscriber received nothing and you will find a log info in zaqar-wsgi.::
2016-07-06 11:37:57.929 98400 INFO zaqar.notification.notifier
[(None,)2473911afe2642c0b74d7e1200d9bba7 51be2c72393e457ebf0a22a668e10a64 - - -]
The subscriber http://10.229.47.217:5678 is not confirmed.
5. Use the information showed in step3 to confirm the subscription
The request::
curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \
-H "Content-type: application/json" \
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
-H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \
-H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \
-H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \
-H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": true}'
The response::
HTTP/1.1 204 No Content
location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm
Connection: close
6. Repeat step3 to get the subscription
The request::
curl -i -X GET http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063 \
-H "Content-type: application/json" \
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4"
The response::
HTTP/1.1 200 OK
content-length: 155
content-type: application/json; charset=UTF-8
Connection: close
{"confirmed": true, "age": 1370, "id": "576256b03990b480617b4063",
"subscriber": "http://10.229.47.217:5678", "source": "test", "ttl": 3600,
"options": {}}
The subscription is confirmed now.
7. Repeat step4 to post a new message.
The request::
curl -i -X POST http://10.229.47.217:8888/v2/queues/test/messages \
-H "Content-type: application/json" \
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
-H "X-Auth-Token: 440b677561454ea8a7f872201dd4e2c4" \
-d '{"messages": [{"ttl": 3600,"body": "test123"}]}'
The response::
HTTP/1.1 201 Created
content-length: 68
content-type: application/json; charset=UTF-8
location: http://10.229.47.217:8888/v2/queues/test/messages?ids=5762526d3990b474c80d5483
Connection: close
{"resources": ["/v2/queues/test/messages/5762526d3990b474c80d5483"]}
Then in subscriber sample service, you will receive a request::
WARNING:root:{"body": {"event": "BackupStarted"}, "queue_name": "test",
"Message_Type": "Notification", "ttl": 3600}
10.229.47.217 - - [06/Jul/2016 13:19:07] "POST / HTTP/1.1" 200 -
8. Unsubscription.
The request::
curl -i -X PUT http://10.229.47.217:8888/v2/queues/test/subscriptions/576256b03990b480617b4063/confirm \
-H "Content-type: application/json" \
-H "Client-ID: de305d54-75b4-431b-adb2-eb6b9e546014" \
-H "URL-Methods: PUT" -H "X-Project-ID: 51be2c72393e457ebf0a22a668e10a64" \
-H "URL-Signature: d28dced4eabbb09878a73d9a7a651df3a3ce5434fcdb6c3727decf6c7078b282" \
-H "URL-Paths: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm" \
-H "URL-Expires: 2016-06-16T08:35:12" -d '{"confirmed": false}'
The response::
HTTP/1.1 204 No Content
location: /v2/queues/test/subscriptions/576256b03990b480617b4063/confirm
Connection: close
Then try to post a message. The subscriber will not receive the notification
any more.

View File

@ -27,6 +27,7 @@
"subscription:get": "", "subscription:get": "",
"subscription:delete": "", "subscription:delete": "",
"subscription:update": "", "subscription:update": "",
"subscription:confirm": "",
"pools:get_all": "rule:context_is_admin", "pools:get_all": "rule:context_is_admin",
"pools:create": "rule:context_is_admin", "pools:create": "rule:context_is_admin",

View File

@ -0,0 +1,9 @@
---
features:
- Now before users send messages to subscribers through a queue, the
subscribers should be confirmed first. Zaqar only sends messages to the
confirmed subscribers. This feature supports "webhook" and "mailto"
subscribers with mongoDB or redis backend. The "mailto" part will be done
in O cycle. Set "require_confirmation = True" to enable this feature. The
default value is "False" now and we will enable it by default after one or
two cycles.

View File

@ -0,0 +1,79 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import json
import logging
import requests
import sys
import uuid
try:
import SimpleHTTPServer
import SocketServer
except Exception:
from http import server as SimpleHTTPServer
import socketserver as SocketServer
_AUTO_CONFIRM = False
for arg in sys.argv:
if arg == '--auto-confirm':
_AUTO_CONFIRM = True
sys.argv.remove(arg)
break
if len(sys.argv) > 2:
PORT = int(sys.argv[2])
elif len(sys.argv) > 1:
PORT = int(sys.argv[1])
else:
PORT = 5678
class ServerHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
"""This is the sample service for wsgi subscription.
"""
# TODO(wangxiyuan): support websocket.
def do_POST(self):
logging.warning('=================== POST =====================')
data_string = str(
self.rfile.read(int(self.headers['Content-Length'])))
self.data = json.loads(data_string)
if _AUTO_CONFIRM:
self._send_confirm_request()
message = 'OK'
self.send_response(200)
self.end_headers()
self.wfile.write(message)
logging.warning(self.headers)
logging.warning(self.data)
return
def _send_confirm_request(self):
url = self.data['WSGISubscribeURL']
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'X-Project-ID': self.data['X-Project-ID'],
'Client-ID': str(uuid.uuid4()),
'URL-Methods': self.data['URL-Methods'],
'URL-Signature': self.data['URL-Signature'],
'URL-Paths': self.data['URL-Paths'],
'URL-Expires': self.data['URL-Expires'],
}
data = {'confirmed': True}
requests.put(url=url, data=json.dumps(data), headers=headers)
Handler = ServerHandler
httpd = SocketServer.TCPServer(("", PORT), Handler)
httpd.serve_forever()

View File

@ -21,7 +21,10 @@ from oslo_config import cfg
PASSWORD_PLUGIN = 'password' PASSWORD_PLUGIN = 'password'
TRUSTEE_CONF_GROUP = 'trustee' TRUSTEE_CONF_GROUP = 'trustee'
KEYSTONE_AUTHTOKEN_GROUP = 'keystone_authtoken'
loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP) loading.register_auth_conf_options(cfg.CONF, TRUSTEE_CONF_GROUP)
loading.register_auth_conf_options(cfg.CONF, KEYSTONE_AUTHTOKEN_GROUP)
_ZAQAR_ENDPOINTS = {}
def _config_options(): def _config_options():
@ -39,9 +42,9 @@ def get_trusted_token(trust_id):
return trust_session.auth.get_access(trust_session).auth_token return trust_session.auth.get_access(trust_session).auth_token
def _get_admin_session(): def _get_admin_session(conf_group):
auth_plugin = loading.load_auth_from_conf_options( auth_plugin = loading.load_auth_from_conf_options(
cfg.CONF, TRUSTEE_CONF_GROUP) cfg.CONF, conf_group)
return session.Session(auth=auth_plugin) return session.Session(auth=auth_plugin)
@ -53,7 +56,7 @@ def _get_user_client(auth_plugin):
def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles, def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles,
expires_at): expires_at):
"""Create a trust with the given user for the configured trustee user.""" """Create a trust with the given user for the configured trustee user."""
admin_session = _get_admin_session() admin_session = _get_admin_session(TRUSTEE_CONF_GROUP)
trustee_user_id = admin_session.get_user_id() trustee_user_id = admin_session.get_user_id()
client = _get_user_client(auth_plugin) client = _get_user_client(auth_plugin)
@ -64,3 +67,29 @@ def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles,
role_names=roles, role_names=roles,
expires_at=expires_at) expires_at=expires_at)
return trust.id return trust.id
def get_public_endpoint():
"""Get Zaqar's public endpoint from keystone"""
global _ZAQAR_ENDPOINTS
if _ZAQAR_ENDPOINTS:
return _ZAQAR_ENDPOINTS
zaqar_session = _get_admin_session(KEYSTONE_AUTHTOKEN_GROUP)
auth = zaqar_session.auth
if not auth:
return _ZAQAR_ENDPOINTS
catalogs = auth.get_auth_ref(zaqar_session).service_catalog
try:
_ZAQAR_ENDPOINTS['zaqar'] = catalogs.url_for(service_name='zaqar')
except Exception:
pass
try:
_ZAQAR_ENDPOINTS['zaqar-websocket'] = catalogs.url_for(
service_name='zaqar-websocket')
except Exception:
pass
return _ZAQAR_ENDPOINTS

View File

@ -59,7 +59,10 @@ _NOTIFICATION_OPTIONS = (
help=('The command of smtp to send email. The format is ' help=('The command of smtp to send email. The format is '
'"command_name arg1 arg2".')), '"command_name arg1 arg2".')),
cfg.IntOpt('max_notifier_workers', default=10, cfg.IntOpt('max_notifier_workers', default=10,
help='The max amount of the notification workers.') help='The max amount of the notification workers.'),
cfg.BoolOpt('require_confirmation', default=False,
help='Whether the http/https/email subscription need to be '
'confirmed before notification.'),
) )
_NOTIFICATION_GROUP = 'notification' _NOTIFICATION_GROUP = 'notification'

View File

@ -13,18 +13,30 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import enum
from stevedore import driver from stevedore import driver
import futurist import futurist
from oslo_log import log as logging from oslo_log import log as logging
from six.moves import urllib_parse from six.moves import urllib_parse
from zaqar.common import auth
from zaqar.common import urls
from zaqar.i18n import _LE from zaqar.i18n import _LE
from zaqar.i18n import _LI
from zaqar.storage import pooling from zaqar.storage import pooling
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@enum.unique
class MessageType(enum.IntEnum):
"""Enum of message type."""
SubscriptionConfirmation = 1
UnsubscribeConfirmation = 2
Notification = 3
class NotifierDriver(object): class NotifierDriver(object):
"""Notifier which is responsible for sending messages to subscribers. """Notifier which is responsible for sending messages to subscribers.
@ -34,6 +46,7 @@ class NotifierDriver(object):
self.subscription_controller = kwargs.get('subscription_controller') self.subscription_controller = kwargs.get('subscription_controller')
max_workers = kwargs.get('max_notifier_workers', 10) max_workers = kwargs.get('max_notifier_workers', 10)
self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers) self.executor = futurist.ThreadPoolExecutor(max_workers=max_workers)
self.require_confirmation = kwargs.get('require_confirmation', False)
def post(self, queue_name, messages, client_uuid, project=None): def post(self, queue_name, messages, client_uuid, project=None):
"""Send messages to the subscribers.""" """Send messages to the subscribers."""
@ -48,14 +61,76 @@ class NotifierDriver(object):
LOG.debug("Notifying subscriber %r" % (sub,)) LOG.debug("Notifying subscriber %r" % (sub,))
s_type = urllib_parse.urlparse( s_type = urllib_parse.urlparse(
sub['subscriber']).scheme sub['subscriber']).scheme
data_driver = self.subscription_controller.driver # If the subscriber doesn't contain 'confirmed', it
mgr = driver.DriverManager('zaqar.notification.tasks', # means that this kind of subscriber was created before
s_type, # the confirm feature be introduced into Zaqar. We
invoke_on_load=True) # should allow them be subscribed.
self.executor.submit(mgr.driver.execute, sub, messages, if (self.require_confirmation and
conf=data_driver.conf) not sub.get('confirmed', True)):
LOG.info(_LI('The subscriber %s is not '
'confirmed.'), sub['subscriber'])
continue
for msg in messages:
msg['Message_Type'] = MessageType.Notification.name
self._execute(s_type, sub, messages)
marker = next(subscribers) marker = next(subscribers)
if not marker: if not marker:
break break
else: else:
LOG.error(_LE('Failed to get subscription controller.')) LOG.error(_LE('Failed to get subscription controller.'))
def send_confirm_notification(self, queue, subscription, conf,
project=None, expires=None,
api_version=None):
key = conf.signed_url.secret_key
if not key:
LOG.error(_LE("Can't send confirm notification due to the value of"
" secret_key option is None"))
return
url = "/%s/queues/%s/subscriptions/%s/confirm" % (api_version, queue,
subscription['id'])
pre_url = urls.create_signed_url(key, [url], project=project,
expires=expires, methods=['PUT'])
message_type = MessageType.SubscriptionConfirmation.name
messages = {}
endpoint_dict = auth.get_public_endpoint()
if endpoint_dict:
wsgi_endpoint = endpoint_dict.get('zaqar', None)
if wsgi_endpoint:
wsgi_subscribe_url = urllib_parse.urljoin(
wsgi_endpoint, url)
messages['WSGISubscribeURL'] = wsgi_subscribe_url
websocket_endpoint = endpoint_dict.get('zaqar-websocket', None)
if websocket_endpoint:
websocket_subscribe_url = urllib_parse.urljoin(
websocket_endpoint, url)
messages['WebSocketSubscribeURL'] = websocket_subscribe_url
messages.update({'Message_Type': message_type,
'Message': 'You have chosen to subscribe to the '
'queue: %s' % queue,
'URL-Signature': pre_url['signature'],
'URL-Methods': pre_url['methods'][0],
'URL-Paths': pre_url['paths'][0],
'X-Project-ID': pre_url['project'],
'URL-Expires': pre_url['expires'],
'SubscribeBody': {'confirmed': True},
'UnsubscribeBody': {'confirmed': False}})
s_type = urllib_parse.urlparse(subscription['subscriber']).scheme
LOG.info(_LI('Begin to send %(type)s confirm notification. The request'
'body is %(messages)s'),
{'type': s_type, 'messages': messages})
self._execute(s_type, subscription, [messages], conf)
def _execute(self, s_type, subscription, messages, conf=None):
if self.subscription_controller:
data_driver = self.subscription_controller.driver
conf = data_driver.conf
else:
conf = conf
mgr = driver.DriverManager('zaqar.notification.tasks',
s_type,
invoke_on_load=True)
self.executor.submit(mgr.driver.execute, subscription, messages,
conf=conf)

View File

@ -48,6 +48,7 @@ class SubscriptionController(base.Subscription):
'e': expires: datetime.datetime 'e': expires: datetime.datetime
'o': options :: dict 'o': options :: dict
'p': project :: six.text_type 'p': project :: six.text_type
'c': confirmed :: boolean
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -70,7 +71,7 @@ class SubscriptionController(base.Subscription):
if marker is not None: if marker is not None:
query['_id'] = {'$gt': utils.to_oid(marker)} query['_id'] = {'$gt': utils.to_oid(marker)}
projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1} projection = {'s': 1, 'u': 1, 't': 1, 'p': 1, 'o': 1, '_id': 1, 'c': 1}
cursor = self._collection.find(query, projection=projection) cursor = self._collection.find(query, projection=projection)
cursor = cursor.limit(limit).sort('_id') cursor = cursor.limit(limit).sort('_id')
@ -103,6 +104,7 @@ class SubscriptionController(base.Subscription):
now = timeutils.utcnow_ts() now = timeutils.utcnow_ts()
now_dt = datetime.datetime.utcfromtimestamp(now) now_dt = datetime.datetime.utcfromtimestamp(now)
expires = now_dt + datetime.timedelta(seconds=ttl) expires = now_dt + datetime.timedelta(seconds=ttl)
confirmed = False
try: try:
subscription_id = self._collection.insert({'s': source, subscription_id = self._collection.insert({'s': source,
@ -110,7 +112,8 @@ class SubscriptionController(base.Subscription):
't': ttl, 't': ttl,
'e': expires, 'e': expires,
'o': options, 'o': options,
'p': project}) 'p': project,
'c': confirmed})
return subscription_id return subscription_id
except pymongo.errors.DuplicateKeyError: except pymongo.errors.DuplicateKeyError:
return None return None
@ -153,6 +156,22 @@ class SubscriptionController(base.Subscription):
self._collection.remove({'_id': utils.to_oid(subscription_id), self._collection.remove({'_id': utils.to_oid(subscription_id),
'p': project}, w=0) 'p': project}, w=0)
@utils.raises_conn_error
def get_with_subscriber(self, queue, subscriber, project=None):
res = self._collection.find_one({'u': subscriber,
'p': project})
now = timeutils.utcnow_ts()
return _basic_subscription(res, now)
@utils.raises_conn_error
def confirm(self, queue, subscription_id, project=None, confirm=True):
res = self._collection.update({'_id': utils.to_oid(subscription_id),
'p': project}, {'$set': {'c': confirm}},
upsert=False)
if not res['updatedExisting']:
raise errors.SubscriptionDoesNotExist(subscription_id)
def _basic_subscription(record, now): def _basic_subscription(record, now):
# NOTE(Eva-i): unused here record's field 'e' (expires) has changed it's # NOTE(Eva-i): unused here record's field 'e' (expires) has changed it's
@ -161,11 +180,13 @@ def _basic_subscription(record, now):
# starting using 'e' field should make sure support both of the formats. # starting using 'e' field should make sure support both of the formats.
oid = record['_id'] oid = record['_id']
age = now - utils.oid_ts(oid) age = now - utils.oid_ts(oid)
confirmed = record.get('c', True)
return { return {
'id': str(oid), 'id': str(oid),
'source': record['s'], 'source': record['s'],
'subscriber': record['u'], 'subscriber': record['u'],
'ttl': record['t'], 'ttl': record['t'],
'age': int(age), 'age': int(age),
'options': record['o'] 'options': record['o'],
'confirmed': confirmed,
} }

View File

@ -150,7 +150,9 @@ class DataDriver(base.DataDriverBase):
kwargs = {'subscription_controller': kwargs = {'subscription_controller':
self._storage.subscription_controller, self._storage.subscription_controller,
'max_notifier_workers': 'max_notifier_workers':
self.conf.notification.max_notifier_workers} self.conf.notification.max_notifier_workers,
'require_confirmation':
self.conf.notification.require_confirmation}
stages.extend(_get_storage_pipeline('message', self.conf, **kwargs)) stages.extend(_get_storage_pipeline('message', self.conf, **kwargs))
stages.append(self._storage.message_controller) stages.append(self._storage.message_controller)
return common.Pipeline(stages) return common.Pipeline(stages)

View File

@ -409,6 +409,17 @@ class SubscriptionController(storage.Subscription):
return control.exists(queue, subscription_id, return control.exists(queue, subscription_id,
project=project) project=project)
def confirm(self, queue, subscription_id, project=None, confirm=None):
control = self._get_controller(queue, project)
if control:
return control.confirm(queue, subscription_id,
project=project, confirm=confirm)
def get_with_subscriber(self, queue, subscriber, project=None):
control = self._get_controller(queue, project)
if control:
return control.get_with_subscriber(queue, subscriber, project)
class Catalog(object): class Catalog(object):
"""Represents the mapping between queues and pool drivers.""" """Represents the mapping between queues and pool drivers."""

View File

@ -58,6 +58,10 @@ class TestBase(testtools.TestCase):
group=configs._DRIVER_GROUP) group=configs._DRIVER_GROUP)
self.conf.register_opts(configs._NOTIFICATION_OPTIONS, self.conf.register_opts(configs._NOTIFICATION_OPTIONS,
group=configs._NOTIFICATION_GROUP) group=configs._NOTIFICATION_GROUP)
self.conf.register_opts(configs._NOTIFICATION_OPTIONS,
group=configs._NOTIFICATION_GROUP)
self.conf.register_opts(configs._SIGNED_URL_OPTIONS,
group=configs._SIGNED_URL_GROUP)
self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL', self.mongodb_url = os.environ.get('ZAQAR_TEST_MONGODB_URL',
'mongodb://127.0.0.1:27017') 'mongodb://127.0.0.1:27017')

View File

@ -27,6 +27,7 @@
"subscription:get": "", "subscription:get": "",
"subscription:delete": "", "subscription:delete": "",
"subscription:update": "", "subscription:update": "",
"subscription:confirm": "",
"pools:get_all": "rule:context_is_admin", "pools:get_all": "rule:context_is_admin",
"pools:create": "rule:context_is_admin", "pools:create": "rule:context_is_admin",

View File

@ -115,10 +115,15 @@ class TestSubscriptions(base.BaseV2MessagingTest):
if not test.call_until_true( if not test.call_until_true(
lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1): lambda: self.list_messages(sub_queue)[1]['messages'], 10, 1):
self.fail("Couldn't get messages") self.fail("Couldn't get messages")
messages = self.list_messages(sub_queue) _, body = self.list_messages(sub_queue)
expected = message_body['messages'][0] expected = message_body['messages'][0]
expected['queue_name'] = self.queue_name expected['queue_name'] = self.queue_name
self.assertEqual(expected, messages[1]['messages'][0]['body']) expected['Message_Type'] = 'Notification'
for message in body['messages']:
# There are two message in the queue. One is the confirm message,
# the other one is the notification.
if message['body']['Message_Type'] == 'Notification':
self.assertEqual(expected, message['body'])
@classmethod @classmethod
def resource_cleanup(cls): def resource_cleanup(cls):

View File

@ -17,6 +17,7 @@ import uuid
from tempest import config from tempest import config
from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc from tempest.lib import exceptions as lib_exc
from tempest import test from tempest import test
@ -45,6 +46,10 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest):
# Create Subscriptions # Create Subscriptions
# TODO(wangxiyuan): Now the subscription confirmation feature only support
# mongoDB backend. Skip this test until the feature support the redis
# backend. Then rewrite it.
@decorators.skip_because(bug='1609596')
@test.attr(type=['negative']) @test.attr(type=['negative'])
@test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74') @test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74')
def test_create_subscriptions_with_duplicate_subscriber(self): def test_create_subscriptions_with_duplicate_subscriber(self):

View File

@ -43,14 +43,17 @@ class NotifierTest(testing.TestBase):
"body": {"event": "BackupStarted", "body": {"event": "BackupStarted",
"backup_id": "backup_id":
"c378813c-3f0b-11e2-ad92"}, "c378813c-3f0b-11e2-ad92"},
"queue_name": "fake_queue" "queue_name": "fake_queue",
"Message_Type": "Notification"
}, },
{"body": {"event": "BackupProgress", {"body": {"event": "BackupProgress",
"current_bytes": "0", "current_bytes": "0",
"total_bytes": "99614720"}, "total_bytes": "99614720"},
"queue_name": "fake_queue" "queue_name": "fake_queue",
"Message_Type": "Notification"
} }
] ]
self.api_version = 'v2'
def test_webhook(self): def test_webhook(self):
subscription = [{'subscriber': 'http://trigger_me', subscription = [{'subscriber': 'http://trigger_me',
@ -257,3 +260,41 @@ class NotifierTest(testing.TestBase):
self.assertEqual(2, mock_post.call_count) self.assertEqual(2, mock_post.call_count)
self.assertEqual(self.notifications[1], self.assertEqual(self.notifications[1],
json.loads(mock_post.call_args[1]['data'])) json.loads(mock_post.call_args[1]['data']))
@mock.patch('requests.post')
def test_send_confirm_notification(self, mock_request):
subscription = {'id': '5760c9fb3990b42e8b7c20bd',
'subscriber': 'http://trigger_me',
'source': 'fake_queue',
'options': {}}
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=subscription)
driver = notifier.NotifierDriver(subscription_controller=ctlr)
self.conf.signed_url.secret_key = 'test_key'
driver.send_confirm_notification('test_queue', subscription, self.conf,
str(self.project),
api_version=self.api_version)
driver.executor.shutdown()
self.assertEqual(1, mock_request.call_count)
expect_args = ['SubscribeBody', 'queue_name', 'URL-Methods',
'X-Project-ID', 'URL-Signature', 'URL-Paths', 'Message',
'URL-Expires', 'Message_Type', 'WSGISubscribeURL',
'WebSocketSubscribeURL' 'UnsubscribeBody']
actual_args = json.loads(mock_request.call_args[1]['data']).keys()
self.assertEqual(expect_args.sort(),
list(actual_args).sort())
@mock.patch('requests.post')
def test_send_confirm_notification_without_signed_url(self, mock_request):
subscription = [{'subscriber': 'http://trigger_me',
'source': 'fake_queue', 'options': {}}]
ctlr = mock.MagicMock()
ctlr.list = mock.Mock(return_value=iter([subscription, {}]))
driver = notifier.NotifierDriver(subscription_controller=ctlr)
driver.send_confirm_notification('test_queue', subscription, self.conf,
str(self.project), self.api_version)
driver.executor.shutdown()
self.assertEqual(0, mock_request.call_count)

View File

@ -495,6 +495,34 @@ class MongodbSubscriptionTests(MongodbSetupMixin,
controller_class = controllers.SubscriptionController controller_class = controllers.SubscriptionController
control_driver_class = mongodb.ControlDriver control_driver_class = mongodb.ControlDriver
def test_confirm(self):
s_id = self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
self.addCleanup(self.subscription_controller.delete, self.source,
s_id, self.project)
subscription = self.subscription_controller.get(self.source, s_id,
project=self.project)
self.assertEqual(False, subscription['confirmed'])
self.subscription_controller.confirm(self.source, s_id,
project=self.project,
confirm=True)
subscription = self.subscription_controller.get(self.source, s_id,
project=self.project)
self.assertEqual(True, subscription['confirmed'])
def test_confirm_with_nonexist_subscription(self):
s_id = 'fake-id'
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.confirm,
self.source, s_id, project=self.project, confirm=True
)
# #
# TODO(kgriffs): Do these need database purges as well as those above? # TODO(kgriffs): Do these need database purges as well as those above?

View File

@ -236,7 +236,8 @@ class SubscriptionTest(base.V1_1Base):
'source': 'kitkat', 'source': 'kitkat',
'options': {}, 'options': {},
'id': str(sub), 'id': str(sub),
'ttl': 600}, 'ttl': 600,
'confirmed': False},
'headers': {'status': 200}, 'headers': {'status': 200},
'request': {'action': 'subscription_get', 'request': {'action': 'subscription_get',
'body': {'queue_name': 'kitkat', 'body': {'queue_name': 'kitkat',
@ -273,7 +274,8 @@ class SubscriptionTest(base.V1_1Base):
'source': 'kitkat', 'source': 'kitkat',
'options': {}, 'options': {},
'id': str(sub), 'id': str(sub),
'ttl': 600}]}, 'ttl': 600,
'confirmed': False}]},
'headers': {'status': 200}, 'headers': {'status': 200},
'request': {'action': 'subscription_list', 'request': {'action': 'subscription_list',
'body': {'queue_name': 'kitkat'}, 'body': {'queue_name': 'kitkat'},
@ -346,7 +348,8 @@ class SubscriptionTest(base.V1_1Base):
ws_notification = msgpack.unpackb(sender.call_args_list[2][0][0], ws_notification = msgpack.unpackb(sender.call_args_list[2][0][0],
encoding='utf-8') encoding='utf-8')
self.assertEqual({'body': {'status': 'disco queen'}, 'ttl': 60, self.assertEqual({'body': {'status': 'disco queen'}, 'ttl': 60,
'queue_name': 'kitkat'}, ws_notification) 'queue_name': 'kitkat',
'Message_Type': u'Notification'}, ws_notification)
def test_list_returns_503_on_nopoolfound_exception(self): def test_list_returns_503_on_nopoolfound_exception(self):
sub = self.boot.storage.subscription_controller.create( sub = self.boot.storage.subscription_controller.create(

View File

@ -20,6 +20,7 @@ import mock
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from zaqar.common import auth from zaqar.common import auth
from zaqar.notification import notifier
from zaqar.storage import errors as storage_errors from zaqar.storage import errors as storage_errors
from zaqar import tests as testing from zaqar import tests as testing
from zaqar.tests.unit.transport.wsgi import base from zaqar.tests.unit.transport.wsgi import base
@ -57,6 +58,11 @@ class TestSubscriptionsMongoDB(base.V2Base):
self.subscription_path = (self.url_prefix + '/queues/' + self.queue + self.subscription_path = (self.url_prefix + '/queues/' + self.queue +
'/subscriptions') '/subscriptions')
self.subscription = 'fake-id'
self.confirm_path = (self.url_prefix + '/queues/' + self.queue +
'/subscriptions/' + self.subscription +
'/confirm')
self.conf.signed_url.secret_key = 'test_key'
def tearDown(self): def tearDown(self):
resp = self.simulate_get(self.subscription_path, resp = self.simulate_get(self.subscription_path,
@ -95,6 +101,44 @@ class TestSubscriptionsMongoDB(base.V2Base):
self._create_subscription(subscriber='http://CCC.com') self._create_subscription(subscriber='http://CCC.com')
self.assertEqual(falcon.HTTP_201, self.srmock.status) self.assertEqual(falcon.HTTP_201, self.srmock.status)
# the subscription is not confirmed, So the second request will
# retry confirm and return 201 again.
self._create_subscription(subscriber='http://CCC.com')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
@mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification')
def test_create_and_send_notification(self, mock_send_confirm):
self._create_subscription(subscriber='http://CCC.com')
self.assertEqual(1, mock_send_confirm.call_count)
@mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification')
def test_recreate(self, mock_send_confirm):
resp = self._create_subscription(subscriber='http://CCC.com')
resp_doc = jsonutils.loads(resp[0])
s_id1 = resp_doc['subscription_id']
self.assertEqual(1, mock_send_confirm.call_count)
resp = self._create_subscription(subscriber='http://CCC.com')
resp_doc = jsonutils.loads(resp[0])
s_id2 = resp_doc['subscription_id']
self.assertEqual(2, mock_send_confirm.call_count)
self.assertEqual(s_id1, s_id2)
@mock.patch.object(notifier.NotifierDriver, 'send_confirm_notification')
def test_recreate_after_confirmed(self, mock_send_confirm):
resp = self._create_subscription(subscriber='http://CCC.com')
self.assertEqual(falcon.HTTP_201, self.srmock.status)
doc = '{"confirmed": true}'
resp_doc = jsonutils.loads(resp[0])
confirm_path = (self.url_prefix + '/queues/' + self.queue +
'/subscriptions/' + resp_doc['subscription_id'] +
'/confirm')
self.simulate_put(confirm_path, body=doc, headers=self.headers)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.assertEqual(1, mock_send_confirm.call_count)
self._create_subscription(subscriber='http://CCC.com') self._create_subscription(subscriber='http://CCC.com')
self.assertEqual(falcon.HTTP_409, self.srmock.status) self.assertEqual(falcon.HTTP_409, self.srmock.status)
@ -353,3 +397,35 @@ class TestSubscriptionsMongoDB(base.V2Base):
options = resp_list_doc['subscriptions'][0]['options'] options = resp_list_doc['subscriptions'][0]['options']
self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options) self.assertEqual({'a': 1, 'trust_id': 'trust_id'}, options)
def test_confirm(self):
doc = '{"confirmed": true}'
resp = self._create_subscription()
resp_doc = jsonutils.loads(resp[0])
confirm_path = (self.url_prefix + '/queues/' + self.queue +
'/subscriptions/' + resp_doc['subscription_id'] +
'/confirm')
self.simulate_put(confirm_path, body=doc, headers=self.headers)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
def test_confirm_with_invalid_body(self):
doc = '{confirmed:123}'
resp = self.simulate_put(self.confirm_path, body=doc,
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
resp_doc = jsonutils.loads(resp[0])
self.assertIn('body could not be parsed', resp_doc['description'])
def test_confirm_without_boolean_body(self):
doc = '{"confirmed":123}'
resp = self.simulate_put(self.confirm_path, body=doc,
headers=self.headers)
self.assertEqual(falcon.HTTP_400, self.srmock.status)
resp_doc = jsonutils.loads(resp[0])
self.assertEqual("The 'confirmed' should be boolean.",
resp_doc['description'])
def test_confirm_with_non_subscription(self):
doc = '{"confirmed": true}'
self.simulate_put(self.confirm_path, body=doc, headers=self.headers)
self.assertEqual(falcon.HTTP_404, self.srmock.status)

View File

@ -540,6 +540,12 @@ class Validator(object):
except OverflowError: except OverflowError:
raise ValidationFailed(msg, datetime.datetime.max) raise ValidationFailed(msg, datetime.datetime.max)
def subscription_confirming(self, confirm):
confirm = confirm.get('confirmed', None)
if not isinstance(confirm, bool):
msg = _(u"The 'confirmed' should be boolean.")
raise ValidationFailed(msg)
def subscription_listing(self, limit=None, **kwargs): def subscription_listing(self, limit=None, **kwargs):
"""Restrictions involving a list of subscriptions. """Restrictions involving a list of subscriptions.

View File

@ -101,12 +101,17 @@ def public_endpoints(driver, conf):
subscriptions.CollectionResource(driver._validate, subscriptions.CollectionResource(driver._validate,
subscription_controller, subscription_controller,
defaults.subscription_ttl, defaults.subscription_ttl,
queue_controller)), queue_controller,
conf)),
('/queues/{queue_name}/subscriptions/{subscription_id}', ('/queues/{queue_name}/subscriptions/{subscription_id}',
subscriptions.ItemResource(driver._validate, subscriptions.ItemResource(driver._validate,
subscription_controller)), subscription_controller)),
('/queues/{queue_name}/subscriptions/{subscription_id}/confirm',
subscriptions.ConfirmResource(driver._validate,
subscription_controller)),
# Pre-Signed URL Endpoint # Pre-Signed URL Endpoint
('/queues/{queue_name}/share', urls.Resource(driver)), ('/queues/{queue_name}/share', urls.Resource(driver)),
] ]

View File

@ -12,15 +12,18 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import datetime
import falcon import falcon
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import netutils from oslo_utils import netutils
from oslo_utils import timeutils
import six import six
from stevedore import driver from stevedore import driver
from zaqar.common import decorators from zaqar.common import decorators
from zaqar.i18n import _ from zaqar.i18n import _
from zaqar.notification import notifier
from zaqar.storage import errors as storage_errors from zaqar.storage import errors as storage_errors
from zaqar.transport import acl from zaqar.transport import acl
from zaqar.transport import utils from zaqar.transport import utils
@ -111,14 +114,17 @@ class ItemResource(object):
class CollectionResource(object): class CollectionResource(object):
__slots__ = ('_subscription_controller', '_validate', __slots__ = ('_subscription_controller', '_validate',
'_default_subscription_ttl', '_queue_controller') '_default_subscription_ttl', '_queue_controller',
'_conf', '_notification')
def __init__(self, validate, subscription_controller, def __init__(self, validate, subscription_controller,
default_subscription_ttl, queue_controller): default_subscription_ttl, queue_controller, conf):
self._subscription_controller = subscription_controller self._subscription_controller = subscription_controller
self._validate = validate self._validate = validate
self._default_subscription_ttl = default_subscription_ttl self._default_subscription_ttl = default_subscription_ttl
self._queue_controller = queue_controller self._queue_controller = queue_controller
self._conf = conf
self._notification = notifier.NotifierDriver()
@decorators.TransportLog("Subscription collection") @decorators.TransportLog("Subscription collection")
@acl.enforce("subscription:get_all") @acl.enforce("subscription:get_all")
@ -192,7 +198,6 @@ class CollectionResource(object):
ttl, ttl,
options, options,
project=project_id) project=project_id)
except validation.ValidationFailed as ex: except validation.ValidationFailed as ex:
LOG.debug(ex) LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex)) raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
@ -201,13 +206,80 @@ class CollectionResource(object):
description = _(u'Subscription could not be created.') description = _(u'Subscription could not be created.')
raise wsgi_errors.HTTPServiceUnavailable(description) raise wsgi_errors.HTTPServiceUnavailable(description)
now = timeutils.utcnow_ts()
now_dt = datetime.datetime.utcfromtimestamp(now)
expires = now_dt + datetime.timedelta(seconds=ttl)
api_version = req.path.split('/')[1]
if created: if created:
subscription = self._subscription_controller.get(queue_name,
created,
project_id)
# send confirm notification
self._notification.send_confirm_notification(
queue_name, subscription, self._conf, project_id,
str(expires), api_version)
resp.location = req.path resp.location = req.path
resp.status = falcon.HTTP_201 resp.status = falcon.HTTP_201
resp.body = utils.to_json( resp.body = utils.to_json(
{'subscription_id': six.text_type(created)}) {'subscription_id': six.text_type(created)})
else: else:
description = _(u'Such subscription already exists. Subscriptions ' subscription = self._subscription_controller.get_with_subscriber(
u'are unique by project + queue + subscriber URI.') queue_name, subscriber, project_id)
raise wsgi_errors.HTTPConflict(description, headers={'location': confirmed = subscription.get('confirmed', True)
req.path}) if confirmed:
description = _(u'Such subscription already exists.'
u'Subscriptions are unique by project + queue '
u'+ subscriber URI.')
raise wsgi_errors.HTTPConflict(description,
headers={'location': req.path})
else:
# The subscription is not confirmed, re-send confirm
# notification
self._notification.send_confirm_notification(
queue_name, subscription, self._conf, project_id,
str(expires), api_version)
resp.location = req.path
resp.status = falcon.HTTP_201
resp.body = utils.to_json(
{'subscription_id': six.text_type(subscription['id'])})
class ConfirmResource(object):
__slots__ = ('_subscription_controller', '_validate')
def __init__(self, validate, subscription_controller):
self._subscription_controller = subscription_controller
self._validate = validate
@decorators.TransportLog("Confirm Subscription")
@acl.enforce("subscription:confirm")
def on_put(self, req, resp, project_id, queue_name, subscription_id):
if req.content_length:
document = wsgi_utils.deserialize(req.stream, req.content_length)
else:
document = {}
try:
self._validate.subscription_confirming(document)
confirm = document.get('confirmed', None)
self._subscription_controller.confirm(queue_name, subscription_id,
project=project_id,
confirm=confirm)
resp.status = falcon.HTTP_204
resp.location = req.path
except storage_errors.SubscriptionDoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(six.text_type(ex))
except validation.ValidationFailed as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPBadRequestAPI(six.text_type(ex))
except Exception as ex:
LOG.exception(ex)
description = (_(u'Subscription %(subscription_id)s could not be'
' confirmed.') %
dict(subscription_id=subscription_id))
raise falcon.HTTPBadRequest(_('Unable to confirm subscription'),
description)