Merge "don't allow get/update/delete subscirtions with wrong queue"
This commit is contained in:
commit
12091072a3
@ -90,7 +90,8 @@ class SubscriptionController(base.Subscription):
|
||||
@utils.raises_conn_error
|
||||
def get(self, queue, subscription_id, project=None):
|
||||
res = self._collection.find_one({'_id': utils.to_oid(subscription_id),
|
||||
'p': project})
|
||||
'p': project,
|
||||
's': queue})
|
||||
|
||||
if not res:
|
||||
raise errors.SubscriptionDoesNotExist(subscription_id)
|
||||
@ -143,7 +144,8 @@ class SubscriptionController(base.Subscription):
|
||||
try:
|
||||
res = self._collection.update(
|
||||
{'_id': utils.to_oid(subscription_id),
|
||||
'p': project},
|
||||
'p': project,
|
||||
's': queue},
|
||||
{'$set': fields},
|
||||
upsert=False)
|
||||
except pymongo.errors.DuplicateKeyError:
|
||||
@ -154,7 +156,8 @@ class SubscriptionController(base.Subscription):
|
||||
@utils.raises_conn_error
|
||||
def delete(self, queue, subscription_id, project=None):
|
||||
self._collection.remove({'_id': utils.to_oid(subscription_id),
|
||||
'p': project}, w=0)
|
||||
'p': project,
|
||||
's': queue}, w=0)
|
||||
|
||||
@utils.raises_conn_error
|
||||
def get_with_subscriber(self, queue, subscriber, project=None):
|
||||
|
@ -93,8 +93,10 @@ class SubscriptionController(base.Subscription):
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
def get(self, queue, subscription_id, project=None):
|
||||
subscription = SubscriptionEnvelope.from_redis(subscription_id,
|
||||
self._client)
|
||||
subscription = None
|
||||
if self.exists(queue, subscription_id, project):
|
||||
subscription = SubscriptionEnvelope.from_redis(subscription_id,
|
||||
self._client)
|
||||
if subscription:
|
||||
now = timeutils.utcnow_ts()
|
||||
return subscription.to_basic(now)
|
||||
@ -230,11 +232,13 @@ class SubscriptionController(base.Subscription):
|
||||
def delete(self, queue, subscription_id, project=None):
|
||||
subset_key = utils.scope_subscription_ids_set(queue, project,
|
||||
SUBSCRIPTION_IDS_SUFFIX)
|
||||
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zrem(subset_key, subscription_id)
|
||||
pipe.delete(subscription_id)
|
||||
pipe.execute()
|
||||
|
||||
if self._client.zrank(subset_key, subscription_id) is not None:
|
||||
# NOTE(prashanthr_): Pipelining is used to mitigate race conditions
|
||||
with self._client.pipeline() as pipe:
|
||||
pipe.zrem(subset_key, subscription_id)
|
||||
pipe.delete(subscription_id)
|
||||
pipe.execute()
|
||||
|
||||
@utils.raises_conn_error
|
||||
@utils.retries_on_connection_error
|
||||
|
@ -239,10 +239,9 @@ class TestSubscriptionsNegative(base.BaseV2MessagingTest):
|
||||
subscription_id = results[0][1]["subscription_id"]
|
||||
non_existent_queue = data_utils.rand_name('rand_queuename')
|
||||
update_rbody = {'ttl': 1000}
|
||||
resp, _ = self.client.update_subscription(non_existent_queue,
|
||||
subscription_id,
|
||||
update_rbody)
|
||||
self.assertEqual('204', resp['status'])
|
||||
self.assertRaises(lib_exc.NotFound, self.client.update_subscription,
|
||||
non_existent_queue, subscription_id, update_rbody)
|
||||
|
||||
for result in results:
|
||||
subscription_id = result[1]["subscription_id"]
|
||||
self.delete_subscription(self.queue_name, subscription_id)
|
||||
|
@ -1175,6 +1175,39 @@ class SubscriptionControllerTest(ControllerBaseTest):
|
||||
project=self.project)
|
||||
self.assertIsNone(s_id)
|
||||
|
||||
def test_get_update_delete_on_non_existing_queue(self):
|
||||
self._precreate_queue(precreate_queue=True)
|
||||
s_id = self.subscription_controller.create(
|
||||
self.source,
|
||||
self.subscriber,
|
||||
self.ttl,
|
||||
self.options,
|
||||
project=self.project)
|
||||
self.addCleanup(self.subscription_controller.delete, self.source, s_id,
|
||||
self.project)
|
||||
self.assertIsNotNone(s_id)
|
||||
non_existing_queue = "fake_name"
|
||||
# get
|
||||
self.assertRaises(errors.SubscriptionDoesNotExist,
|
||||
self.subscription_controller.get,
|
||||
non_existing_queue, s_id, project=self.project)
|
||||
# update
|
||||
body = {
|
||||
"subscriber": self.subscriber,
|
||||
"ttl": self.ttl,
|
||||
"options": self.options
|
||||
}
|
||||
self.assertRaises(errors.SubscriptionDoesNotExist,
|
||||
self.subscription_controller.update,
|
||||
non_existing_queue, s_id, project=self.project,
|
||||
**body)
|
||||
# delete
|
||||
self.subscription_controller.delete(non_existing_queue, s_id,
|
||||
project=self.project)
|
||||
s_id = self.subscription_controller.get(self.queue_name, s_id,
|
||||
project=self.project)
|
||||
self.assertIsNotNone(s_id)
|
||||
|
||||
def test_nonexist_source(self):
|
||||
try:
|
||||
s_id = self.subscription_controller.create('fake_queue_name',
|
||||
|
Loading…
x
Reference in New Issue
Block a user