diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index 9d5ce0cdf..66447eaf1 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -136,6 +136,14 @@ class SubscriptionController(base.Subscription): key_transform=key_transform) assert fields, ('`subscriber`, `ttl`, ' 'or `options` not found in kwargs') + + new_ttl = fields.get('t', None) + if new_ttl is not None: + now = timeutils.utcnow_ts() + now_dt = datetime.datetime.utcfromtimestamp(now) + expires = now_dt + datetime.timedelta(seconds=new_ttl) + fields['e'] = expires + try: res = self._collection.update( {'_id': utils.to_oid(subscription_id), diff --git a/zaqar/storage/redis/subscriptions.py b/zaqar/storage/redis/subscriptions.py index acc91dba3..c8dd46771 100644 --- a/zaqar/storage/redis/subscriptions.py +++ b/zaqar/storage/redis/subscriptions.py @@ -207,9 +207,17 @@ class SubscriptionController(base.Subscription): if new_options is not None: fields['o'] = self._packer(new_options) + new_ttl = fields.get('t', None) + if new_ttl is not None: + now = timeutils.utcnow_ts() + expires = now + new_ttl + fields['e'] = expires + # Pipeline ensures atomic inserts. with self._client.pipeline() as pipe: pipe.hmset(subscription_id, fields) + if new_ttl is not None: + pipe.expire(subscription_id, new_ttl) pipe.execute() @utils.raises_conn_error diff --git a/zaqar/tests/functional/base.py b/zaqar/tests/functional/base.py index c1c244f76..c56ae8e0d 100644 --- a/zaqar/tests/functional/base.py +++ b/zaqar/tests/functional/base.py @@ -24,8 +24,10 @@ import six from zaqar.api.v1 import response as response_v1 from zaqar.api.v1_1 import response as response_v1_1 +from zaqar.api.v2 import response as response_v2 from zaqar import bootstrap from zaqar.storage import mongodb +from zaqar.storage.redis import driver as redis from zaqar import tests as testing from zaqar.tests.functional import config from zaqar.tests.functional import helpers @@ -52,6 +54,9 @@ class FunctionalTestBase(testing.TestBase): server_class = None config_file = None class_bootstrap = None + # NOTE(Eva-i): ttl_gc_interval is the known maximum time interval between + # automatic resource TTL expirations. Depends on message store back end. + class_ttl_gc_interval = None wipe_dbs_projects = set([]) def setUp(self): @@ -82,6 +87,13 @@ class FunctionalTestBase(testing.TestBase): self.__class__.class_bootstrap = bootstrap.Bootstrap(self.mconf) self.class_bootstrap.transport + datadriver = self.class_bootstrap.storage._storage + if isinstance(datadriver, redis.DataDriver): + self.__class__.class_ttl_gc_interval = 1 + if isinstance(datadriver, mongodb.DataDriver): + # NOTE(kgriffs): MongoDB's TTL scavenger only runs once a minute + self.__class__.class_ttl_gc_interval = 60 + if _TEST_INTEGRATION: # TODO(kgriffs): This code should be replaced to use # an external wsgi server instance. @@ -396,3 +408,9 @@ class V1_1FunctionalTestBase(FunctionalTestBase): def setUp(self): super(V1_1FunctionalTestBase, self).setUp() self.response = response_v1_1.ResponseSchema(self.limits) + + +class V2FunctionalTestBase(FunctionalTestBase): + def setUp(self): + super(V2FunctionalTestBase, self).setUp() + self.response = response_v2.ResponseSchema(self.limits) diff --git a/zaqar/tests/functional/helpers.py b/zaqar/tests/functional/helpers.py index 8a81d155d..2e4dd1788 100644 --- a/zaqar/tests/functional/helpers.py +++ b/zaqar/tests/functional/helpers.py @@ -129,3 +129,9 @@ def create_pool_body(**kwargs): } return pool_body + + +def create_subscription_body(subscriber='http://fake:8080', ttl=600, + options_key='funny', options_value='no'): + options = {options_key: options_value} + return {'subscriber': subscriber, 'options': options, 'ttl': ttl} diff --git a/zaqar/tests/functional/wsgi/v1_1/test_claims.py b/zaqar/tests/functional/wsgi/v1_1/test_claims.py index edcc29e8d..b2739e15d 100644 --- a/zaqar/tests/functional/wsgi/v1_1/test_claims.py +++ b/zaqar/tests/functional/wsgi/v1_1/test_claims.py @@ -29,6 +29,7 @@ class TestClaims(base.V1_1FunctionalTestBase): def setUp(self): super(TestClaims, self).setUp() + assert False, 'hehe' self.headers = helpers.create_zaqar_headers(self.cfg) self.client.headers = self.headers diff --git a/zaqar/tests/functional/wsgi/v2/__init__.py b/zaqar/tests/functional/wsgi/v2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zaqar/tests/functional/wsgi/v2/test_subscriptions.py b/zaqar/tests/functional/wsgi/v2/test_subscriptions.py new file mode 100644 index 000000000..067da8d02 --- /dev/null +++ b/zaqar/tests/functional/wsgi/v2/test_subscriptions.py @@ -0,0 +1,123 @@ +# Copyright (c) 2013 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import time +import uuid + +import ddt + +from zaqar.tests.functional import base +from zaqar.tests.functional import helpers as func_helpers +from zaqar.tests import helpers + + +@ddt.ddt +class TestSubscriptions(base.V2FunctionalTestBase): + + """Tests for Subscriptions.""" + + server_class = base.ZaqarServer + + def setUp(self): + super(TestSubscriptions, self).setUp() + + self.queue_name = uuid.uuid1() + self.queue_url = ("{url}/{version}/queues/{queue}".format( + url=self.cfg.zaqar.url, + version="v2", + queue=self.queue_name)) + + self.client.put(self.queue_url) + + self.subscriptions_url = self.queue_url + '/subscriptions/' + self.client.set_base_url(self.subscriptions_url) + + def tearDown(self): + # Delete test queue subscriptions after each test case. + result = self.client.get(self.subscriptions_url) + subscriptions = result.json()['subscriptions'] + for sub in subscriptions: + sub_url = self.subscriptions_url + sub['id'] + self.client.delete(sub_url) + # Delete test queue. + self.client.delete(self.queue_url) + super(TestSubscriptions, self).tearDown() + + @helpers.is_slow(condition=lambda self: self.class_ttl_gc_interval > 1) + def test_expired_subscription(self): + # Default TTL value is 600. + doc = func_helpers.create_subscription_body() + result = self.client.post(data=doc) + self.assertEqual(201, result.status_code) + longlive_id = result.json()['subscription_id'] + + # This is a minimum TTL allowed by server. + ttl_for_shortlive = 60 + doc = func_helpers.create_subscription_body( + subscriber='http://expire.me', ttl=ttl_for_shortlive) + result = self.client.post(data=doc) + self.assertEqual(201, result.status_code) + shortlive_id = result.json()['subscription_id'] + shortlive_url = self.subscriptions_url + shortlive_id + + # Let's wait for subscription to expire. + for i in range(self.class_ttl_gc_interval + ttl_for_shortlive): + time.sleep(1) + result = self.client.get(shortlive_url) + if result.status_code == 404: + break + else: + self.fail("Didn't remove the subscription in time.") + + # Make sure the expired subscription is not returned when listing. + result = self.client.get(self.subscriptions_url) + self.assertEqual(200, result.status_code) + subscriptions = result.json()['subscriptions'] + self.assertEqual(1, len(subscriptions)) + self.assertEqual(longlive_id, subscriptions[0]['id']) + + @helpers.is_slow(condition=lambda self: self.class_ttl_gc_interval > 1) + def test_update_ttl(self): + # Default TTL value is 600. + doc = func_helpers.create_subscription_body() + result = self.client.post(data=doc) + self.assertEqual(201, result.status_code) + subscription_id = result.json()['subscription_id'] + subscription_url = self.subscriptions_url + subscription_id + + # This is a minimum TTL allowed by server. + updated_ttl = 60 + update_fields = { + 'ttl': updated_ttl + } + result = self.client.patch(subscription_url, data=update_fields) + self.assertEqual(204, result.status_code) + + # Let's wait for updated subscription to expire. + for i in range(self.class_ttl_gc_interval + updated_ttl): + time.sleep(1) + result = self.client.get(subscription_url) + if result.status_code == 404: + break + else: + self.fail("Didn't remove the subscription in time.") + + # Make sure the expired subscription is not returned when listing. + result = self.client.get(self.subscriptions_url) + self.assertEqual(200, result.status_code) + subscriptions = result.json()['subscriptions'] + self.assertEqual(0, len(subscriptions))