Merge "Subscription Confirmation Support-2"

This commit is contained in:
Jenkins 2016-10-10 02:04:48 +00:00 committed by Gerrit Code Review
commit 8493c232b2
12 changed files with 138 additions and 66 deletions

View File

@ -699,6 +699,37 @@ class Subscription(ControllerBase):
"""
raise NotImplementedError
@abc.abstractmethod
def get_with_subscriber(self, queue, subscriber, project=None):
"""Base method for get a subscription with the subscriber.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscriber: link of the subscription to be notified.
:type subscriber: six.text_type
:param project: Project id
:type project: six.text_type
:returns: Dictionary containing subscription data
:rtype: dict
"""
raise NotImplementedError
@abc.abstractmethod
def confirm(self, queue, subscription_id, project=None, confirmed=True):
"""Base method for confirming a subscription.
:param queue: Name of the queue subscription belongs to.
:type queue: six.text_type
:param subscription_id: ID of the subscription to be deleted.
:type subscription_id: six.text_type
:param project: Project id
:type project: six.text_type
:param confirmed: Confirm a subscription or cancel the confirmation of
a subscription.
:type confirmed: boolean
"""
raise NotImplementedError
@six.add_metaclass(abc.ABCMeta)
class PoolsBase(ControllerBase):

View File

@ -164,10 +164,11 @@ class SubscriptionController(base.Subscription):
return _basic_subscription(res, now)
@utils.raises_conn_error
def confirm(self, queue, subscription_id, project=None, confirm=True):
def confirm(self, queue, subscription_id, project=None, confirmed=True):
res = self._collection.update({'_id': utils.to_oid(subscription_id),
'p': project}, {'$set': {'c': confirm}},
'p': project},
{'$set': {'c': confirmed}},
upsert=False)
if not res['updatedExisting']:
raise errors.SubscriptionDoesNotExist(subscription_id)

View File

@ -409,11 +409,11 @@ class SubscriptionController(storage.Subscription):
return control.exists(queue, subscription_id,
project=project)
def confirm(self, queue, subscription_id, project=None, confirm=None):
def confirm(self, queue, subscription_id, project=None, confirmed=None):
control = self._get_controller(queue, project)
if control:
return control.confirm(queue, subscription_id,
project=project, confirm=confirm)
project=project, confirmed=confirmed)
def get_with_subscriber(self, queue, subscriber, project=None):
control = self._get_controller(queue, project)

View File

@ -22,7 +22,7 @@ from oslo_utils import encodeutils
from oslo_utils import timeutils
MSGENV_FIELD_KEYS = (b'id', b't', b'cr', b'e', b'u', b'c', b'c.e')
SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p')
SUBENV_FIELD_KEYS = (b'id', b's', b'u', b't', b'e', b'o', b'p', b'c')
# TODO(kgriffs): Make similar classes for claims and queues
@ -115,6 +115,7 @@ class SubscriptionEnvelope(object):
'expires',
'options',
'project',
'confirmed',
]
def __init__(self, **kwargs):
@ -124,6 +125,7 @@ class SubscriptionEnvelope(object):
self.ttl = kwargs['ttl']
self.expires = kwargs.get('expires', float('inf'))
self.options = kwargs['options']
self.confirmed = kwargs.get('confirmed', 'True')
@staticmethod
def from_redis(sid, client):
@ -144,6 +146,7 @@ class SubscriptionEnvelope(object):
def to_basic(self, now):
created = self.expires - self.ttl
is_confirmed = self.confirmed == str(True)
basic_msg = {
'id': self.id,
'source': self.source,
@ -151,6 +154,7 @@ class SubscriptionEnvelope(object):
'ttl': self.ttl,
'age': now - created,
'options': self.options,
'confirmed': is_confirmed,
}
return basic_msg
@ -294,7 +298,8 @@ def _hmap_to_subenv_kwargs(hmap):
'subscriber': hmap[b'u'],
'ttl': int(hmap[b't']),
'expires': int(hmap[b'e']),
'options': _unpack(hmap[b'o'])
'options': _unpack(hmap[b'o']),
'confirmed': hmap[b'c']
}

View File

@ -71,6 +71,9 @@ class SubscriptionController(base.Subscription):
ttl = int(record[2])
expires = int(record[3])
created = expires - ttl
is_confirmed = True
if len(record) == 6:
is_confirmed = record[5] == str(True)
ret = {
'id': sid,
'source': record[0],
@ -78,6 +81,7 @@ class SubscriptionController(base.Subscription):
'ttl': ttl,
'age': now - created,
'options': self._unpacker(record[4]),
'confirmed': is_confirmed,
}
marker_next['next'] = sid
@ -108,6 +112,7 @@ class SubscriptionController(base.Subscription):
source = queue
now = timeutils.utcnow_ts()
expires = now + ttl
confirmed = False
subscription = {'id': subscription_id,
's': source,
@ -115,7 +120,8 @@ class SubscriptionController(base.Subscription):
't': ttl,
'e': expires,
'o': self._packer(options),
'p': project}
'p': project,
'c': confirmed}
try:
# Pipeline ensures atomic inserts.
@ -150,8 +156,9 @@ class SubscriptionController(base.Subscription):
try:
sub_ids = (q for q in self._client.zrange(subset_key, 0, -1))
for s_id in sub_ids:
subscription = self._client.hmget(s_id, ['s', 'u', 't', 'o'])
if subscription == [None, None, None, None]:
subscription = self._client.hmget(s_id,
['s', 'u', 't', 'o', 'c'])
if subscription == [None, None, None, None, None]:
# NOTE(flwang): Under this check, that means the
# subscription has been expired. So redis can't get
# the subscription but the id is still there. So let's
@ -228,3 +235,31 @@ class SubscriptionController(base.Subscription):
pipe.zrem(subset_key, subscription_id)
pipe.delete(subscription_id)
pipe.execute()
@utils.raises_conn_error
@utils.retries_on_connection_error
def get_with_subscriber(self, queue, subscriber, project=None):
subset_key = utils.scope_subscription_ids_set(queue,
project,
SUBSCRIPTION_IDS_SUFFIX)
sub_ids = (q for q in self._client.zrange(subset_key, 0, -1))
for s_id in sub_ids:
subscription = self._client.hmget(s_id,
['s', 'u', 't', 'o', 'c'])
if subscription[1] == subscriber:
subscription = SubscriptionEnvelope.from_redis(s_id,
self._client)
now = timeutils.utcnow_ts()
return subscription.to_basic(now)
@utils.raises_conn_error
@utils.retries_on_connection_error
def confirm(self, queue, subscription_id, project=None, confirmed=True):
# Let's get our subscription by ID. If it does not exist,
# SubscriptionDoesNotExist error will be raised internally.
self.get(queue, subscription_id, project=project)
fields = {'c': confirmed}
with self._client.pipeline() as pipe:
pipe.hmset(subscription_id, fields)
pipe.execute()

View File

@ -266,7 +266,7 @@ class SubscriptionListCursor(object):
@raises_conn_error
def next(self):
curr = next(self.subscription_iter)
subscription = self.client.hmget(curr, ['s', 'u', 't', 'e', 'o'])
subscription = self.client.hmget(curr, ['s', 'u', 't', 'e', 'o', 'c'])
# NOTE(flwang): The expired subscription will be removed
# automatically, but the key can't be deleted automatically as well.
# Though we clean up those expired ids when create new subscription,

View File

@ -94,6 +94,28 @@ class TestSubscriptions(base.BaseV2MessagingTest):
subscription_id = result[1]["subscription_id"]
self.delete_subscription(self.queue_name, subscription_id)
@test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74')
def test_create_subscriptions_with_duplicate_subscriber(self):
# Adding subscriptions to the queue
results = self._create_subscriptions()
s_id1 = results[0][1]['subscription_id']
# Adding a subscription with duplicate subscriber, it will reconfirm
# the subscription and run well.
rbody = {'subscriber': 'http://fake:8080',
'options': {'MessagingKeyMsg': 'MessagingValueMsg'},
'ttl': 293305}
resp, body = self.create_subscription(self.queue_name, rbody)
s_id2 = body['subscription_id']
self.assertEqual('201', resp['status'])
self.assertEqual(s_id2, s_id1)
# Delete the subscriptions created
for result in results:
subscription_id = result[1]["subscription_id"]
self.delete_subscription(self.queue_name, subscription_id)
@decorators.idempotent_id('ff4344b4-ba78-44c5-9ffc-44e53e484f76')
def test_trust_subscription(self):
sub_queue = data_utils.rand_name('Queues-Test')

View File

@ -17,7 +17,6 @@ import uuid
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
@ -44,28 +43,6 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest):
results.append((resp, body))
return results
# Create Subscriptions
# TODO(wangxiyuan): Now the subscription confirmation feature only support
# mongoDB backend. Skip this test until the feature support the redis
# backend. Then rewrite it.
@decorators.skip_because(bug='1609596')
@test.attr(type=['negative'])
@test.idempotent_id('fe0d8ec1-1a64-4490-8869-e821b2252e74')
def test_create_subscriptions_with_duplicate_subscriber(self):
# Adding a subscription to the queue
results = self._create_subscriptions()
# Adding a duplicate subscriber
rbody = {'subscriber': 'http://fake:8080',
'options': {'MessagingKeyMsg': 'MessagingValueMsg'},
'ttl': 293305}
self.assertRaises(lib_exc.Conflict,
self.create_subscription, self.queue_name, rbody)
# Delete the subscriptions created
for result in results:
subscription_id = result[1]["subscription_id"]
self.delete_subscription(self.queue_name, subscription_id)
@test.attr(type=['negative'])
@test.idempotent_id('0bda2907-a783-4614-af16-23d7a7d53b72')
def test_create_subscriptions_with_invalid_body(self):

View File

@ -1236,6 +1236,35 @@ class SubscriptionControllerTest(ControllerBaseTest):
project=self.project,
**update_fields)
def test_confirm(self):
s_id = self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
self.addCleanup(self.subscription_controller.delete, self.source,
s_id, self.project)
subscription = self.subscription_controller.get(self.source, s_id,
project=self.project)
self.assertEqual(False, subscription['confirmed'])
self.subscription_controller.confirm(self.source, s_id,
project=self.project,
confirmed=True)
subscription = self.subscription_controller.get(self.source, s_id,
project=self.project)
self.assertEqual(True, subscription['confirmed'])
def test_confirm_with_nonexist_subscription(self):
s_id = 'fake-id'
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.confirm,
self.source, s_id, project=self.project,
confirmed=True
)
class PoolsControllerTest(ControllerBaseTest):
"""Pools Controller base tests.

View File

@ -493,34 +493,6 @@ class MongodbSubscriptionTests(MongodbSetupMixin,
controller_class = controllers.SubscriptionController
control_driver_class = mongodb.ControlDriver
def test_confirm(self):
s_id = self.subscription_controller.create(self.source,
self.subscriber,
self.ttl,
self.options,
project=self.project)
self.addCleanup(self.subscription_controller.delete, self.source,
s_id, self.project)
subscription = self.subscription_controller.get(self.source, s_id,
project=self.project)
self.assertEqual(False, subscription['confirmed'])
self.subscription_controller.confirm(self.source, s_id,
project=self.project,
confirm=True)
subscription = self.subscription_controller.get(self.source, s_id,
project=self.project)
self.assertEqual(True, subscription['confirmed'])
def test_confirm_with_nonexist_subscription(self):
s_id = 'fake-id'
self.assertRaises(errors.SubscriptionDoesNotExist,
self.subscription_controller.confirm,
self.source, s_id, project=self.project, confirm=True
)
#
# TODO(kgriffs): Do these need database purges as well as those above?

View File

@ -540,9 +540,9 @@ class Validator(object):
except OverflowError:
raise ValidationFailed(msg, datetime.datetime.max)
def subscription_confirming(self, confirm):
confirm = confirm.get('confirmed', None)
if not isinstance(confirm, bool):
def subscription_confirming(self, confirmed):
confirmed = confirmed.get('confirmed', None)
if not isinstance(confirmed, bool):
msg = _(u"The 'confirmed' should be boolean.")
raise ValidationFailed(msg)

View File

@ -264,10 +264,10 @@ class ConfirmResource(object):
try:
self._validate.subscription_confirming(document)
confirm = document.get('confirmed', None)
confirmed = document.get('confirmed', None)
self._subscription_controller.confirm(queue_name, subscription_id,
project=project_id,
confirm=confirm)
confirmed=confirmed)
resp.status = falcon.HTTP_204
resp.location = req.path
except storage_errors.SubscriptionDoesNotExist as ex: