diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index c9846843d..79addabc9 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -159,7 +159,7 @@ class DataDriver(storage.DataDriverBase): class QueueController(storage.Queue): - """Routes operations to a queue controller in the appropriate pool. + """Routes operations to get the appropriate queue controller. :param pool_catalog: a catalog of available pools :type pool_catalog: queues.pooling.base.Catalog @@ -168,16 +168,14 @@ class QueueController(storage.Queue): def __init__(self, pool_catalog): super(QueueController, self).__init__(None) self._pool_catalog = pool_catalog + self._mgt_queue_ctrl = self._pool_catalog.control.queue_controller self._get_controller = self._pool_catalog.get_queue_controller def _list(self, project=None, marker=None, limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): def all_pages(): - pool = self._pool_catalog.get_default_pool() - if pool is None: - raise errors.NoPoolFound() - yield next(pool.queue_controller.list( + yield next(self._mgt_queue_ctrl.list( project=project, marker=marker, limit=limit, @@ -218,17 +216,15 @@ class QueueController(storage.Queue): # however. If between the time we register a queue and go to # look it up, the queue is deleted, then this assertion will # fail. - control = self._get_controller(name, project) - if not control: + pool = self._pool_catalog.lookup(name, project) + if not pool: raise RuntimeError('Failed to register queue') - return control.create(name, metadata=metadata, project=project) + return self._mgt_queue_ctrl.create(name, metadata=metadata, + project=project) def _delete(self, name, project=None): - # NOTE(cpp-cabrera): If we fail to find a project/queue in the - # catalogue for a delete, just ignore it. - control = self._get_controller(name, project) - if control: - + mqHandler = self._get_controller(name, project) + if mqHandler: # NOTE(cpp-cabrera): delete from the catalogue first. If # zaqar crashes in the middle of these two operations, # it is desirable that the entry be missing from the @@ -239,34 +235,24 @@ class QueueController(storage.Queue): # latter case is more difficult to reason about, and may # yield 500s in some operations. self._pool_catalog.deregister(name, project) - ret = control.delete(name, project) - return ret + mqHandler.delete(name, project) - return None + return self._mgt_queue_ctrl.delete(name, project) def _exists(self, name, project=None): - control = self._get_controller(name, project) - if control: - return control.exists(name, project=project) - return False + return self._mgt_queue_ctrl.exists(name, project=project) def get_metadata(self, name, project=None): - control = self._get_controller(name, project) - if control: - return control.get_metadata(name, project=project) - raise errors.QueueDoesNotExist(name, project) + return self._mgt_queue_ctrl.get_metadata(name, project=project) def set_metadata(self, name, metadata, project=None): - control = self._get_controller(name, project) - if control: - return control.set_metadata(name, metadata=metadata, - project=project) - raise errors.QueueDoesNotExist(name, project) + return self._mgt_queue_ctrl.set_metadata(name, metadata=metadata, + project=project) def _stats(self, name, project=None): - control = self._get_controller(name, project) - if control: - return control.stats(name, project=project) + mqHandler = self._get_controller(name, project) + if mqHandler: + return mqHandler.stats(name, project=project) raise errors.QueueDoesNotExist(name, project) diff --git a/zaqar/storage/swift/claims.py b/zaqar/storage/swift/claims.py index b5e4ffbd6..c3daf5da3 100644 --- a/zaqar/storage/swift/claims.py +++ b/zaqar/storage/swift/claims.py @@ -145,6 +145,9 @@ class ClaimController(storage.Claim): return claim_id, claimed def update(self, queue, claim_id, metadata, project=None): + if not self._queue_ctrl.exists(queue, project): + raise errors.QueueDoesNotExist(queue, project) + container = utils._claim_container(queue, project) try: headers, obj = self._client.get_object(container, claim_id) diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index cd18550a4..6f73dea2b 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -323,7 +323,13 @@ class MessageQueueHandler(object): total = 0 claimed = 0 container = utils._message_container(name, project) - _, objects = self._client.get_container(container) + + try: + _, objects = self._client.get_container(container) + except swiftclient.ClientException as exc: + if exc.http_status == 404: + raise errors.QueueIsEmpty(name, project) + newest = None oldest = None now = timeutils.utcnow_ts(True) diff --git a/zaqar/tests/etc/wsgi_mongodb_pooled.conf b/zaqar/tests/etc/wsgi_mongodb_pooled.conf index ffc2aadaf..cae62c0bc 100644 --- a/zaqar/tests/etc/wsgi_mongodb_pooled.conf +++ b/zaqar/tests/etc/wsgi_mongodb_pooled.conf @@ -15,3 +15,6 @@ database = zaqar_test_pooled [drivers:management_store:mongodb] uri = mongodb://127.0.0.1:27017 database = zaqar_test + +[pooling:catalog] +enable_virtual_pool = True \ No newline at end of file diff --git a/zaqar/tests/etc/wsgi_mongodb_pooled_disable_virtual_pool.conf b/zaqar/tests/etc/wsgi_mongodb_pooled_disable_virtual_pool.conf new file mode 100644 index 000000000..94aa9363f --- /dev/null +++ b/zaqar/tests/etc/wsgi_mongodb_pooled_disable_virtual_pool.conf @@ -0,0 +1,20 @@ +[DEFAULT] +pooling = True +admin_mode = True +unreliable = True +enable_deprecated_api_versions = 1,1.1 + +[drivers] +transport = wsgi +message_store = mongodb + +[drivers:message_store:mongodb] +uri = mongodb://127.0.0.1:27017 +database = zaqar_test_pooled + +[drivers:management_store:mongodb] +uri = mongodb://127.0.0.1:27017 +database = zaqar_test + +[pooling:catalog] +enable_virtual_pool = False \ No newline at end of file diff --git a/zaqar/tests/etc/wsgi_redis_pooled.conf b/zaqar/tests/etc/wsgi_redis_pooled.conf index 99045b1c6..2bfd1061f 100644 --- a/zaqar/tests/etc/wsgi_redis_pooled.conf +++ b/zaqar/tests/etc/wsgi_redis_pooled.conf @@ -14,4 +14,7 @@ reconnect_sleep = 1 [drivers:management_store:redis] uri = redis://127.0.0.1:6379 max_reconnect_attempts = 3 -reconnect_sleep = 1 \ No newline at end of file +reconnect_sleep = 1 + +[pooling:catalog] +enable_virtual_pool = True \ No newline at end of file diff --git a/zaqar/tests/etc/wsgi_sqlalchemy_pooled.conf b/zaqar/tests/etc/wsgi_sqlalchemy_pooled.conf index 5c961cc2f..7b5473268 100644 --- a/zaqar/tests/etc/wsgi_sqlalchemy_pooled.conf +++ b/zaqar/tests/etc/wsgi_sqlalchemy_pooled.conf @@ -12,3 +12,5 @@ bind = 0.0.0.0 port = 8888 workers = 20 +[pooling:catalog] +enable_virtual_pool = True \ No newline at end of file diff --git a/zaqar/tests/unit/storage/test_pool_catalog.py b/zaqar/tests/unit/storage/test_pool_catalog.py index a7df390d3..876e02a73 100644 --- a/zaqar/tests/unit/storage/test_pool_catalog.py +++ b/zaqar/tests/unit/storage/test_pool_catalog.py @@ -29,7 +29,7 @@ from zaqar import tests as testing @testing.requires_mongodb class PoolCatalogTest(testing.TestBase): - config_file = 'wsgi_mongodb_pooled.conf' + config_file = 'wsgi_mongodb_pooled_disable_virtual_pool.conf' def setUp(self): super(PoolCatalogTest, self).setUp() diff --git a/zaqar/transport/wsgi/v1_0/stats.py b/zaqar/transport/wsgi/v1_0/stats.py index 3780757a5..a296c8945 100644 --- a/zaqar/transport/wsgi/v1_0/stats.py +++ b/zaqar/transport/wsgi/v1_0/stats.py @@ -54,6 +54,15 @@ class Resource(object): resp.body = utils.to_json(resp_dict) # status defaults to 200 + except storage_errors.QueueIsEmpty as ex: + resp_dict = { + 'messages': { + 'claimed': 0, + 'free': 0, + 'total': 0 + } + } + resp.body = utils.to_json(resp_dict) except storage_errors.DoesNotExist as ex: LOG.debug(ex) raise wsgi_errors.HTTPNotFound(six.text_type(ex)) diff --git a/zaqar/transport/wsgi/v1_1/stats.py b/zaqar/transport/wsgi/v1_1/stats.py index 9f2366628..f816072e5 100644 --- a/zaqar/transport/wsgi/v1_1/stats.py +++ b/zaqar/transport/wsgi/v1_1/stats.py @@ -53,7 +53,8 @@ class Resource(object): resp.body = utils.to_json(resp_dict) # status defaults to 200 - except storage_errors.QueueDoesNotExist as ex: + except (storage_errors.QueueDoesNotExist, + storage_errors.QueueIsEmpty) as ex: resp_dict = { 'messages': { 'claimed': 0, diff --git a/zaqar/transport/wsgi/v2_0/stats.py b/zaqar/transport/wsgi/v2_0/stats.py index 51a98f53b..2258ee8cc 100644 --- a/zaqar/transport/wsgi/v2_0/stats.py +++ b/zaqar/transport/wsgi/v2_0/stats.py @@ -57,7 +57,8 @@ class Resource(object): resp.body = utils.to_json(resp_dict) # status defaults to 200 - except storage_errors.QueueDoesNotExist as ex: + except (storage_errors.QueueDoesNotExist, + storage_errors.QueueIsEmpty) as ex: resp_dict = { 'messages': { 'claimed': 0,