From b4c395c79a77988a4b19fdefaa8c26676848b5c6 Mon Sep 17 00:00:00 2001 From: wanghao Date: Thu, 4 Jan 2018 15:55:08 +0800 Subject: [PATCH] Support query filter in queue. This patch will support to query queues filtered by name and metadata in mongodb backend. Other backends will support in following patchs. Co-Authored-By: gecong Change-Id: I5fc6a5959e5d94942aebce9cedb22666e5577cb8 Partial-implements: blueprint queue-filter-support --- api-ref/source/queues.inc | 4 ++ ...queue-filter-support-b704a1c27f7473b9.yaml | 6 ++ zaqar/storage/base.py | 6 +- zaqar/storage/mongodb/queues.py | 7 ++- zaqar/storage/mongodb/utils.py | 22 ++++++-- zaqar/storage/pooling.py | 9 ++- zaqar/storage/redis/queues.py | 5 +- zaqar/storage/sqlalchemy/queues.py | 5 +- zaqar/tests/unit/storage/test_pool_catalog.py | 3 +- .../unit/storage/test_pool_catalog_new.py | 3 +- .../wsgi/v2_0/test_queue_lifecycle.py | 51 +++++++++++++++++ zaqar/transport/wsgi/v2_0/queues.py | 55 ++++++++++++++----- 12 files changed, 144 insertions(+), 32 deletions(-) create mode 100644 releasenotes/notes/queue-filter-support-b704a1c27f7473b9.yaml diff --git a/api-ref/source/queues.inc b/api-ref/source/queues.inc index 5b5764e61..e7037f1aa 100644 --- a/api-ref/source/queues.inc +++ b/api-ref/source/queues.inc @@ -23,6 +23,10 @@ instead of 200, because there was no information to send back. This operation lists queues for the project. The queues are sorted alphabetically by name. +When queue listing , we can add filter in query string parameter +to filter queue, like name and metadata. If metadata or name of queue is +consistent with the filter,the queue will be listed to the user, +otherwise the queue will be filtered. Normal response codes: 200 diff --git a/releasenotes/notes/queue-filter-support-b704a1c27f7473b9.yaml b/releasenotes/notes/queue-filter-support-b704a1c27f7473b9.yaml new file mode 100644 index 000000000..7ed277c9a --- /dev/null +++ b/releasenotes/notes/queue-filter-support-b704a1c27f7473b9.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Support for queue filter when queue listing. With this feature, users can + add filter of name or metadata in query string parameters in queue list + to filter queues. diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index e8080d2e1..e8b6970e9 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -309,8 +309,8 @@ class Queue(ControllerBase): numbers of queues. """ - def list(self, project=None, marker=None, - limit=DEFAULT_QUEUES_PER_PAGE, detailed=False): + def list(self, project=None, kfilter={}, marker=None, + limit=DEFAULT_QUEUES_PER_PAGE, detailed=False, name=None): """Base method for listing queues. :param project: Project id @@ -321,7 +321,7 @@ class Queue(ControllerBase): :returns: An iterator giving a sequence of queues and the marker of the next page. """ - return self._list(project, marker, limit, detailed) + return self._list(project, kfilter, marker, limit, detailed, name) _list = abc.abstractmethod(lambda x: None) diff --git a/zaqar/storage/mongodb/queues.py b/zaqar/storage/mongodb/queues.py index f0abb788d..9938896d6 100644 --- a/zaqar/storage/mongodb/queues.py +++ b/zaqar/storage/mongodb/queues.py @@ -196,10 +196,11 @@ class QueueController(storage.Queue): except errors.QueueDoesNotExist: return {} - def _list(self, project=None, marker=None, - limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): + def _list(self, project=None, kfilter={}, marker=None, + limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False, + name=None): - query = utils.scoped_query(marker, project) + query = utils.scoped_query(marker, project, name, kfilter) projection = {'p_q': 1, '_id': 0} if detailed: diff --git a/zaqar/storage/mongodb/utils.py b/zaqar/storage/mongodb/utils.py index 600dde443..58c9682e1 100644 --- a/zaqar/storage/mongodb/utils.py +++ b/zaqar/storage/mongodb/utils.py @@ -191,7 +191,7 @@ def parse_scoped_project_queue(scoped_name): return scoped_name.split('/') -def scoped_query(queue, project): +def scoped_query(queue, project, name=None, kfilter={}): """Returns a dict usable for querying for scoped project/queues. :param queue: name of queue to seek @@ -207,14 +207,28 @@ def scoped_query(queue, project): if not scoped_name.startswith('/'): # NOTE(kgriffs): scoped queue, e.g., 'project-id/queue-name' - project_prefix = '^' + project + '/' + if name: + project_prefix = '^' + project + '/.*' + name + '.*' + else: + project_prefix = '^' + project + '/' query[key] = {'$regex': project_prefix, '$gt': scoped_name} elif scoped_name == '/': # NOTE(kgriffs): list global queues, but exclude scoped ones - query[key] = {'$regex': '^/'} + if name: + query[key] = {'$regex': '^/.*' + name + '.*'} + else: + query[key] = {'$regex': '^/'} else: # NOTE(kgriffs): unscoped queue, e.g., '/my-global-queue' - query[key] = {'$regex': '^/', '$gt': scoped_name} + if name: + query[key] = {'$regex': '^/.*' + name + '.*', '$gt': scoped_name} + else: + query[key] = {'$regex': '^/', '$gt': scoped_name} + + # Handler the metadata filter in request. + for key, value in kfilter.items(): + key = 'm.' + key + query[key] = {'$eq': value} return query diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index 55d554943..3dacc59a9 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -172,15 +172,18 @@ class QueueController(storage.Queue): self._mgt_queue_ctrl = self._pool_catalog.control.queue_controller self._get_controller = self._pool_catalog.get_queue_controller - def _list(self, project=None, marker=None, - limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): + def _list(self, project=None, kfilter={}, marker=None, + limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False, + name=None): def all_pages(): yield next(self._mgt_queue_ctrl.list( project=project, + kfilter=kfilter, marker=marker, limit=limit, - detailed=detailed)) + detailed=detailed, + name=name)) # make a heap compared with 'name' ls = heapq.merge(*[ diff --git a/zaqar/storage/redis/queues.py b/zaqar/storage/redis/queues.py index 1af454e98..4746edb35 100644 --- a/zaqar/storage/redis/queues.py +++ b/zaqar/storage/redis/queues.py @@ -83,8 +83,9 @@ class QueueController(storage.Queue): @utils.raises_conn_error @utils.retries_on_connection_error - def _list(self, project=None, marker=None, - limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): + def _list(self, project=None, kfilter={}, marker=None, + limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False, + name=None): client = self._client qset_key = utils.scope_queue_name(QUEUES_SET_STORE_NAME, project) marker = utils.scope_queue_name(marker, project) diff --git a/zaqar/storage/sqlalchemy/queues.py b/zaqar/storage/sqlalchemy/queues.py index ea7e962cd..5ebd0def4 100644 --- a/zaqar/storage/sqlalchemy/queues.py +++ b/zaqar/storage/sqlalchemy/queues.py @@ -23,8 +23,9 @@ from zaqar.storage.sqlalchemy import utils class QueueController(storage.Queue): - def _list(self, project, marker=None, - limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False): + def _list(self, project, kfilter={}, marker=None, + limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False, + name=None): if project is None: project = '' diff --git a/zaqar/tests/unit/storage/test_pool_catalog.py b/zaqar/tests/unit/storage/test_pool_catalog.py index 876e02a73..618f10a0d 100644 --- a/zaqar/tests/unit/storage/test_pool_catalog.py +++ b/zaqar/tests/unit/storage/test_pool_catalog.py @@ -109,7 +109,8 @@ class PoolCatalogTest(testing.TestBase): flavor='fake') def test_queues_list_on_multi_pools(self): - def fake_list(project=None, marker=None, limit=10, detailed=False): + def fake_list(project=None, kfilter={}, marker=None, limit=10, + detailed=False, name=None): yield iter([{'name': 'fake_queue'}]) list_str = 'zaqar.storage.mongodb.queues.QueueController.list' diff --git a/zaqar/tests/unit/storage/test_pool_catalog_new.py b/zaqar/tests/unit/storage/test_pool_catalog_new.py index 567cc7b24..5cee0e384 100644 --- a/zaqar/tests/unit/storage/test_pool_catalog_new.py +++ b/zaqar/tests/unit/storage/test_pool_catalog_new.py @@ -108,7 +108,8 @@ class PoolCatalogTest(testing.TestBase): flavor='fake') def test_queues_list_on_multi_pools(self): - def fake_list(project=None, marker=None, limit=10, detailed=False): + def fake_list(project=None, kfilter={}, marker=None, limit=10, + detailed=False, name=None): yield iter([{'name': 'fake_queue'}]) list_str = 'zaqar.storage.mongodb.queues.QueueController.list' diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py index 645a5b17a..b230abe35 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_queue_lifecycle.py @@ -455,6 +455,13 @@ class TestQueueLifecycleMongoDB(base.V2Base): '_dead_letter_queue_messages_ttl': None, '_max_claim_count': None}, result_doc) + # queue filter + result = self.simulate_get(self.queue_path, headers=header, + query_string='node=34') + self.assertEqual(falcon.HTTP_200, self.srmock.status) + result_doc = jsonutils.loads(result[0]) + self.assertEqual(0, len(result_doc['queues'])) + # List tail self.simulate_get(target, headers=header, query_string=params) self.assertEqual(falcon.HTTP_200, self.srmock.status) @@ -488,6 +495,50 @@ class TestQueueLifecycleMongoDB(base.V2Base): self.simulate_get(self.queue_path, headers=header) self.assertEqual(falcon.HTTP_503, self.srmock.status) + def test_list_with_filter(self): + arbitrary_number = 644079696574693 + project_id = str(arbitrary_number) + client_id = uuidutils.generate_uuid() + header = { + 'X-Project-ID': project_id, + 'Client-ID': client_id + } + + # Create some + def create_queue(name, project_id, body): + altheader = {'Client-ID': client_id} + if project_id is not None: + altheader['X-Project-ID'] = project_id + uri = self.queue_path + '/' + name + self.simulate_put(uri, headers=altheader, body=body) + + create_queue('q1', project_id, '{"test_metadata_key1": "value1"}') + create_queue('q2', project_id, '{"_max_messages_post_size": 2000}') + create_queue('q3', project_id, '{"test_metadata_key2": 30}') + + # List (filter query) + result = self.simulate_get(self.queue_path, headers=header, + query_string='name=q&test_metadata_key2=30') + + result_doc = jsonutils.loads(result[0]) + self.assertEqual(1, len(result_doc['queues'])) + self.assertEqual('q3', result_doc['queues'][0]['name']) + + # List (filter query) + result = self.simulate_get(self.queue_path, headers=header, + query_string='_max_messages_post_size=2000') + + result_doc = jsonutils.loads(result[0]) + self.assertEqual(1, len(result_doc['queues'])) + self.assertEqual('q2', result_doc['queues'][0]['name']) + + # List (filter query) + result = self.simulate_get(self.queue_path, headers=header, + query_string='name=q') + + result_doc = jsonutils.loads(result[0]) + self.assertEqual(3, len(result_doc['queues'])) + class TestQueueLifecycleFaultyDriver(base.V2BaseFaulty): diff --git a/zaqar/transport/wsgi/v2_0/queues.py b/zaqar/transport/wsgi/v2_0/queues.py index aad940106..59f322cdb 100644 --- a/zaqar/transport/wsgi/v2_0/queues.py +++ b/zaqar/transport/wsgi/v2_0/queues.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import falcon from oslo_log import log as logging import six @@ -252,20 +253,11 @@ class CollectionResource(object): self._queue_controller = queue_controller self._validate = validate - @decorators.TransportLog("Queues collection") - @acl.enforce("queues:get_all") - def on_get(self, req, resp, project_id): - kwargs = {} - - # NOTE(kgriffs): This syntax ensures that - # we don't clobber default values with None. - req.get_param('marker', store=kwargs) - req.get_param_as_int('limit', store=kwargs) - req.get_param_as_bool('detailed', store=kwargs) - + def _queue_list(self, project_id, path, kfilter, **kwargs): try: self._validate.queue_listing(**kwargs) - results = self._queue_controller.list(project=project_id, **kwargs) + results = self._queue_controller.list(project=project_id, + kfilter=kfilter, **kwargs) # Buffer list of queues queues = list(next(results)) @@ -283,13 +275,29 @@ class CollectionResource(object): kwargs['marker'] = next(results) or kwargs.get('marker', '') reserved_metadata = _get_reserved_metadata(self._validate).items() for each_queue in queues: - each_queue['href'] = req.path + '/' + each_queue['name'] + each_queue['href'] = path + '/' + each_queue['name'] if kwargs.get('detailed'): for meta, value in reserved_metadata: if not each_queue.get('metadata', {}).get(meta): each_queue['metadata'][meta] = value + return queues, kwargs['marker'] + + def _on_get_with_kfilter(self, req, resp, project_id, kfilter={}): + kwargs = {} + + # NOTE(kgriffs): This syntax ensures that + # we don't clobber default values with None. + req.get_param('marker', store=kwargs) + req.get_param_as_int('limit', store=kwargs) + req.get_param_as_bool('detailed', store=kwargs) + req.get_param('name', store=kwargs) + + queues, marker = self._queue_list(project_id, + req.path, kfilter, **kwargs) + links = [] + kwargs['marker'] = marker if queues: links = [ { @@ -305,3 +313,24 @@ class CollectionResource(object): resp.body = utils.to_json(response_body) # status defaults to 200 + + @decorators.TransportLog("Queues collection") + @acl.enforce("queues:get_all") + def on_get(self, req, resp, project_id): + field = ('marker', 'limit', 'detailed', 'name') + kfilter = copy.deepcopy(req.params) + + for key in req.params.keys(): + if key in field: + kfilter.pop(key) + + kfilter = kfilter if len(kfilter) > 0 else {} + for key in kfilter.keys(): + # Since we get the filter value from URL, so need to + # turn the string to integer if using integer filter value. + try: + kfilter[key] = int(kfilter[key]) + except ValueError: + continue + self._on_get_with_kfilter(req, resp, project_id, kfilter) + # status defaults to 200