From 13df51be82fcac8cdb0e8d4e77824a5ddb81fea7 Mon Sep 17 00:00:00 2001 From: Fei Long Wang Date: Tue, 18 Feb 2014 09:28:51 +0800 Subject: [PATCH] feat(v1.1): Implement new /health endpoint for v1.1 The new /health endpoint of v1.1 is different from the /health of v1.0. In v1.1 /health endpoint we will return a set of KPI (Key Performance Indicator) of the target Marconi server. The API response will be like below: { "mongo_pool_1": { "message_volume": { "claimed": 0, "total": 0, "free": 0 }, "storage_reachable": true, "operation_status": { "create_queue": { "seconds": 0.0021300315856933594, "ref": null, "succeeded": true }, "post_messages": { "seconds": 0.033502817153930664, "ref": null, "succeeded": true }, "list_messages": { "seconds": 0.000013113021850585938, "ref": null, "succeeded": true }, "claim_messages": { "seconds": 0.0013759136199951172, "ref": "3f515f37-58a0-4c81-8214-3e92979b82e7", "succeeded": false }, "delete_queue": { "seconds": 0.0030739307403564453, "ref": null, "succeeded": true } } }, "mongo_pool_2": { "message_volume": { "claimed": 0, "total": 0, "free": 0 }, "storage_reachable": true, "operation_status": { "create_queue": { "seconds": 0.0011799335479736328, "ref": null, "succeeded": true }, "post_messages": { "seconds": 0.024316072463989258, "ref": null, "succeeded": true }, "list_messages": { "seconds": 0.000008106231689453125, "ref": null, "succeeded": true }, "claim_messages": { "seconds": 0.000576019287109375, "ref": "68629fda-b4ce-4cf9-978a-df0df8df36a7", "succeeded": false }, "delete_queue": { "seconds": 0.003300905227661133, "ref": null, "succeeded": true } } }, "catalog_reachable": true } docImpact Implements: blueprint detailed-health Change-Id: I6b281132e2fef8ce65ce54b9d6be297571f8b170 --- tests/functional/wsgi/v1_1/test_health.py | 89 ++++++++++++++++ tests/functional/wsgi/v1_1/test_queues.py | 9 -- tests/unit/queues/transport/wsgi/test_v1_1.py | 15 +-- zaqar/queues/storage/base.py | 100 ++++++++++++++++++ zaqar/queues/storage/mongodb/driver.py | 18 ++++ zaqar/queues/storage/pipeline.py | 3 + zaqar/queues/storage/pooling.py | 14 +++ zaqar/queues/storage/sqlalchemy/driver.py | 23 ++++ zaqar/queues/transport/wsgi/v1_1/__init__.py | 7 +- zaqar/queues/transport/wsgi/v1_1/health.py | 22 ++-- zaqar/tests/faulty_storage.py | 3 + zaqar/tests/functional/base.py | 6 +- .../queues/transport/wsgi/v1_1/__init__.py | 3 + .../queues/transport/wsgi/v1_1/test_health.py | 98 +++++++++++++++++ 14 files changed, 379 insertions(+), 31 deletions(-) create mode 100644 tests/functional/wsgi/v1_1/test_health.py create mode 100644 zaqar/tests/queues/transport/wsgi/v1_1/test_health.py diff --git a/tests/functional/wsgi/v1_1/test_health.py b/tests/functional/wsgi/v1_1/test_health.py new file mode 100644 index 000000000..c74bc93a3 --- /dev/null +++ b/tests/functional/wsgi/v1_1/test_health.py @@ -0,0 +1,89 @@ +# Copyright (c) 2014 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import ddt + +from zaqar.tests.functional import base +from zaqar.tests.functional import helpers + + +@ddt.ddt +class TestHealth(base.V1_1FunctionalTestBase): + + server_class = base.ZaqarAdminServer + + def setUp(self): + super(TestHealth, self).setUp() + self.base_url = ("{url}/{version}".format( + url=self.cfg.zaqar.url, + version="v1.1" + )) + self.cfg.zaqar.version = "v1.1" + self.client.set_base_url(self.base_url) + + @ddt.data( + { + 'name': "pool_1", + 'weight': 10, + 'uri': "sqlite:///:memory:" + } + ) + def test_health_with_pool(self, params): + # FIXME(flwang): Please use mongodb after the sqlalchemy is disabled + # as pool node and the mongodb is working on gate successfully. + doc = helpers.create_pool_body( + weight=params.get('weight', 10), + uri=params.get('uri', "sqlite:///:memory:") + ) + + pool_name = params.get('name', "pool_1") + self.addCleanup(self.client.delete, url='/pools/' + pool_name) + + result = self.client.put('/pools/' + pool_name, data=doc) + self.assertEqual(result.status_code, 201) + + queue_name = 'fake_queue' + self.addCleanup(self.client.delete, url='/queues/' + queue_name) + result = self.client.put('/queues/' + queue_name) + self.assertEqual(result.status_code, 201) + + sample_messages = {'messages': [ + {'body': 239, 'ttl': 999}, + {'body': {'key': 'value'}, 'ttl': 888} + ]} + + result = self.client.post('/queues/%s/messages' % queue_name, + data=sample_messages) + self.assertEqual(result.status_code, 201) + + claim_metadata = {'ttl': 100, 'grace': 300} + + result = self.client.post('/queues/%s/claims' % queue_name, + data=claim_metadata) + self.assertEqual(result.status_code, 201) + + response = self.client.get('/health') + self.assertEqual(response.status_code, 200) + health = response.json() + + self.assertEqual(health['catalog_reachable'], True) + self.assertEqual(health[pool_name]['storage_reachable'], True) + op_status = health[pool_name]['operation_status'] + for op in op_status.keys(): + self.assertTrue(op_status[op]['succeeded']) + + message_volume = health[pool_name]['message_volume'] + self.assertEqual(message_volume['claimed'], 2) + self.assertEqual(message_volume['free'], 0) + self.assertEqual(message_volume['total'], 2) diff --git a/tests/functional/wsgi/v1_1/test_queues.py b/tests/functional/wsgi/v1_1/test_queues.py index 4ec156789..80b866100 100644 --- a/tests/functional/wsgi/v1_1/test_queues.py +++ b/tests/functional/wsgi/v1_1/test_queues.py @@ -235,15 +235,6 @@ class TestQueueMisc(base.V1_1FunctionalTestBase): test_list_queue_invalid_limit.tags = ['negative'] - def test_check_health(self): - """Test health endpoint.""" - - result = self.client.get('/{0}/health' - .format("v1.1")) - self.assertEqual(result.status_code, 204) - - test_check_health.tags = ['positive'] - def test_check_queue_exists(self): """Checks if queue exists.""" diff --git a/tests/unit/queues/transport/wsgi/test_v1_1.py b/tests/unit/queues/transport/wsgi/test_v1_1.py index d016f8cf1..2be97a08b 100644 --- a/tests/unit/queues/transport/wsgi/test_v1_1.py +++ b/tests/unit/queues/transport/wsgi/test_v1_1.py @@ -121,19 +121,12 @@ class TestPing(base.V1_1Base): self.assertEqual(response, []) -class TestHealth(base.V1_1Base): +class TestHealthMongoDB(v1_1.TestHealthMongoDB): + url_prefix = URL_PREFIX - config_file = 'wsgi_sqlalchemy.conf' - def test_get(self): - response = self.simulate_get('/v1.1/health') - self.assertEqual(self.srmock.status, falcon.HTTP_204) - self.assertEqual(response, []) - - def test_head(self): - response = self.simulate_head('/v1.1/health') - self.assertEqual(self.srmock.status, falcon.HTTP_204) - self.assertEqual(response, []) +class TestHealthFaultyDriver(v1_1.TestHealthFaultyDriver): + url_prefix = URL_PREFIX @ddt.ddt diff --git a/zaqar/queues/storage/base.py b/zaqar/queues/storage/base.py index 2b59b5121..d67b17b7f 100644 --- a/zaqar/queues/storage/base.py +++ b/zaqar/queues/storage/base.py @@ -1,4 +1,5 @@ # Copyright (c) 2013 Red Hat, Inc. +# Copyright 2014 Catalyst IT Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,15 +17,22 @@ """Implements the DriverBase abstract class for Zaqar storage drivers.""" import abc +import functools +import time +import uuid import six +import zaqar.openstack.common.log as logging + DEFAULT_QUEUES_PER_PAGE = 10 DEFAULT_MESSAGES_PER_PAGE = 10 DEFAULT_POOLS_PER_PAGE = 10 DEFAULT_MESSAGES_PER_CLAIM = 10 +LOG = logging.getLogger(__name__) + @six.add_metaclass(abc.ABCMeta) class DriverBase(object): @@ -66,6 +74,98 @@ class DataDriverBase(DriverBase): """Check whether the storage is ready.""" raise NotImplementedError + def health(self): + """Return the health status of service.""" + overall_health = {} + # NOTE(flwang): KPI extracted from different storage backends, + # _health() will be implemented by different storage drivers. + backend_health = self._health() + if backend_health: + overall_health.update(backend_health) + + return overall_health + + @abc.abstractmethod + def _health(self): + """Return the health status based on different backends.""" + raise NotImplementedError + + def _get_operation_status(self): + op_status = {} + status_template = lambda s, t, r: {'succeeded': s, + 'seconds': t, + 'ref': r} + project = str(uuid.uuid4()) + queue = str(uuid.uuid4()) + client = str(uuid.uuid4()) + msg_template = lambda s: {'ttl': 600, 'body': {'event': 'p_%s' % s}} + messages = [msg_template(i) for i in range(100)] + claim_metadata = {'ttl': 60, 'grace': 300} + + # NOTE (flwang): Using time.time() instead of timeit since timeit will + # make the method calling be complicated. + def _handle_status(operation_type, callable_operation): + succeeded = True + ref = None + result = None + try: + start = time.time() + result = callable_operation() + except Exception as e: + ref = str(uuid.uuid4()) + LOG.exception(e, extra={'instance_uuid': ref}) + succeeded = False + status = status_template(succeeded, time.time() - start, ref) + op_status[operation_type] = status + return succeeded, result + + # create queue + func = functools.partial(self.queue_controller.create, + queue, project=project) + succeeded, _ = _handle_status('create_queue', func) + + # post messages + if succeeded: + func = functools.partial(self.message_controller.post, + queue, messages, client, project=project) + _, msg_ids = _handle_status('post_messages', func) + + # claim messages + if msg_ids: + func = functools.partial(self.claim_controller.create, + queue, claim_metadata, + project=project) + _, (claim_id, claim_msgs) = _handle_status('claim_messages', + func) + + # list messages + func = functools.partial(self.message_controller.list, + queue, project, echo=True, + client_uuid=client, + include_claimed=True) + _handle_status('list_messages', func) + + # delete messages + if claim_id and claim_msgs: + for message in claim_msgs: + func = functools.partial(self. + message_controller.delete, + queue, message['id'], + project, claim=claim_id) + succeeded, _ = _handle_status('delete_messages', func) + if not succeeded: + break + # delete claim + func = functools.partial(self.claim_controller.delete, + queue, claim_id, project) + _handle_status('delete_claim', func) + + # delete queue + func = functools.partial(self.queue_controller.delete, + queue, project=project) + _handle_status('delete_queue', func) + return op_status + @abc.abstractproperty def queue_controller(self): """Returns the driver's queue controller.""" diff --git a/zaqar/queues/storage/mongodb/driver.py b/zaqar/queues/storage/mongodb/driver.py index cc9a0b866..35916c393 100644 --- a/zaqar/queues/storage/mongodb/driver.py +++ b/zaqar/queues/storage/mongodb/driver.py @@ -95,6 +95,24 @@ class DataDriver(storage.DataDriverBase): except pymongo.errors.PyMongoError: return False + def _health(self): + KPI = {} + KPI['storage_reachable'] = self.is_alive() + KPI['operation_status'] = self._get_operation_status() + message_volume = {'free': 0, 'claimed': 0, 'total': 0} + + for msg_col in [db.messages for db in self.message_databases]: + msg_count_claimed = msg_col.find({'c.id': {'$ne': None}}).count() + message_volume['claimed'] += msg_count_claimed + + msg_count_total = msg_col.find().count() + message_volume['total'] += msg_count_total + + message_volume['free'] = (message_volume['total'] - + message_volume['claimed']) + KPI['message_volume'] = message_volume + return KPI + @decorators.lazy_property(write=False) def queues_database(self): """Database dedicated to the "queues" collection. diff --git a/zaqar/queues/storage/pipeline.py b/zaqar/queues/storage/pipeline.py index 5daa737e1..292201033 100644 --- a/zaqar/queues/storage/pipeline.py +++ b/zaqar/queues/storage/pipeline.py @@ -99,6 +99,9 @@ class DataDriver(base.DataDriverBase): def is_alive(self): return self._storage.is_alive() + def _health(self): + return self._storage._health() + @decorators.lazy_property(write=False) def queue_controller(self): stages = _get_storage_pipeline('queue', self.conf) diff --git a/zaqar/queues/storage/pooling.py b/zaqar/queues/storage/pooling.py index 10a0f9b05..35966e01a 100644 --- a/zaqar/queues/storage/pooling.py +++ b/zaqar/queues/storage/pooling.py @@ -1,4 +1,5 @@ # Copyright (c) 2013 Rackspace, Inc. +# Copyright 2014 Catalyst IT Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy @@ -75,6 +76,19 @@ class DataDriver(storage.DataDriverBase): for pool in self._pool_catalog._pools_ctrl.list(limit=0)) + def _health(self): + KPI = {} + # Leverage the is_alive to indicate if the backend storage is + # reachable or not + KPI['catalog_reachable'] = self.is_alive() + + # Messages of each pool + for pool in self._pool_catalog._pools_ctrl.list(): + driver = self._pool_catalog.get_driver(pool['name']) + KPI[pool['name']] = driver._health() + + return KPI + @decorators.lazy_property(write=False) def queue_controller(self): return QueueController(self._pool_catalog) diff --git a/zaqar/queues/storage/sqlalchemy/driver.py b/zaqar/queues/storage/sqlalchemy/driver.py index e43efb878..20125f2d6 100644 --- a/zaqar/queues/storage/sqlalchemy/driver.py +++ b/zaqar/queues/storage/sqlalchemy/driver.py @@ -1,4 +1,5 @@ # Copyright (c) 2013 Red Hat, Inc. +# Copyright 2014 Catalyst IT Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy @@ -122,6 +123,28 @@ class DataDriver(storage.DataDriverBase): def is_alive(self): return True + def _health(self): + KPI = {} + # Leverage the is_alive to indicate if the backend storage is + # reachable or not + KPI['storage_reachable'] = self.is_alive() + KPI['operation_status'] = self._get_operation_status() + message_volume = {'free': 0, 'claimed': 0, 'total': 0} + + # NOTE(flwang): Using SQL directly to get better performance than + # sqlalchemy. + msg_count_claimed = self.get('SELECT COUNT(*) FROM MESSAGES' + ' WHERE CID IS NOT NULL') + message_volume['claimed'] = int(msg_count_claimed[0]) + + msg_count_total = self.get('SELECT COUNT(*) FROM MESSAGES') + message_volume['total'] = int(msg_count_total[0]) + + message_volume['free'] = (message_volume['total'] - + message_volume['claimed']) + KPI['message_volume'] = message_volume + return KPI + class ControlDriver(storage.ControlDriverBase): diff --git a/zaqar/queues/transport/wsgi/v1_1/__init__.py b/zaqar/queues/transport/wsgi/v1_1/__init__.py index 59d34d208..721c4ed8e 100644 --- a/zaqar/queues/transport/wsgi/v1_1/__init__.py +++ b/zaqar/queues/transport/wsgi/v1_1/__init__.py @@ -67,10 +67,6 @@ def public_endpoints(driver): driver._validate, claim_controller)), - # Health - ('/health', - health.Resource(driver._storage)), - # Ping ('/ping', ping.Resource(driver._storage)) @@ -85,4 +81,7 @@ def private_endpoints(driver): pools.Listing(pools_controller)), ('/pools/{pool}', pools.Resource(pools_controller)), + # Health + ('/health', + health.Resource(driver._storage)), ] diff --git a/zaqar/queues/transport/wsgi/v1_1/health.py b/zaqar/queues/transport/wsgi/v1_1/health.py index 03eac08b2..7ee29f7ba 100644 --- a/zaqar/queues/transport/wsgi/v1_1/health.py +++ b/zaqar/queues/transport/wsgi/v1_1/health.py @@ -1,4 +1,5 @@ -# Copyright (c) 2013 Rackspace, Inc. +# Copyright (c) 2014 Rackspace, Inc. +# Copyright 2014 Catalyst IT Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy @@ -12,7 +13,12 @@ # License for the specific language governing permissions and limitations under # the License. -import falcon +from zaqar.i18n import _ +from zaqar.openstack.common import log as logging +from zaqar.queues.transport import utils +from zaqar.queues.transport.wsgi import errors as wsgi_errors + +LOG = logging.getLogger(__name__) class Resource(object): @@ -23,8 +29,12 @@ class Resource(object): self.driver = driver def on_get(self, req, resp, **kwargs): - resp.status = (falcon.HTTP_204 if self.driver.is_alive() - else falcon.HTTP_503) + try: + resp_dict = self.driver.health() - def on_head(self, req, resp, **kwargs): - resp.status = falcon.HTTP_204 + resp.content_location = req.path + resp.body = utils.to_json(resp_dict) + except Exception as ex: + LOG.exception(ex) + description = _(u'Health status could not be read.') + raise wsgi_errors.HTTPServiceUnavailable(description) \ No newline at end of file diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py index 995feaa9c..0a26a2692 100644 --- a/zaqar/tests/faulty_storage.py +++ b/zaqar/tests/faulty_storage.py @@ -27,6 +27,9 @@ class DataDriver(storage.DataDriverBase): def is_alive(self): raise NotImplementedError() + def _health(self): + raise NotImplementedError() + @property def queue_controller(self): return QueueController(self) diff --git a/zaqar/tests/functional/base.py b/zaqar/tests/functional/base.py index 95764019d..289f7dc42 100644 --- a/zaqar/tests/functional/base.py +++ b/zaqar/tests/functional/base.py @@ -81,8 +81,12 @@ class FunctionalTestBase(testing.TestBase): self.client = http.Client() else: + if self.server_class == ZaqarAdminServer: + self.mconf.pooling = True + self.mconf.admin_mode = True + self.client = http.WSGIClient( - bootstrap.Bootstrap(config.cfg.CONF).transport.app) + bootstrap.Bootstrap(self.mconf).transport.app) self.headers = helpers.create_zaqar_headers(self.cfg) diff --git a/zaqar/tests/queues/transport/wsgi/v1_1/__init__.py b/zaqar/tests/queues/transport/wsgi/v1_1/__init__.py index a17ae81de..c8f16d722 100644 --- a/zaqar/tests/queues/transport/wsgi/v1_1/__init__.py +++ b/zaqar/tests/queues/transport/wsgi/v1_1/__init__.py @@ -15,6 +15,7 @@ from zaqar.tests.queues.transport.wsgi.v1_1 import test_auth from zaqar.tests.queues.transport.wsgi.v1_1 import test_claims from zaqar.tests.queues.transport.wsgi.v1_1 import test_default_limits +from zaqar.tests.queues.transport.wsgi.v1_1 import test_health from zaqar.tests.queues.transport.wsgi.v1_1 import test_home from zaqar.tests.queues.transport.wsgi.v1_1 import test_media_type from zaqar.tests.queues.transport.wsgi.v1_1 import test_messages @@ -27,6 +28,8 @@ TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver TestClaimsMongoDB = test_claims.TestClaimsMongoDB TestClaimsSqlalchemy = test_claims.TestClaimsSqlalchemy TestDefaultLimits = test_default_limits.TestDefaultLimits +TestHealthMongoDB = test_health.TestHealthMongoDB +TestHealthFaultyDriver = test_health.TestHealthFaultyDriver TestHomeDocument = test_home.TestHomeDocument TestMediaType = test_media_type.TestMediaType TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver diff --git a/zaqar/tests/queues/transport/wsgi/v1_1/test_health.py b/zaqar/tests/queues/transport/wsgi/v1_1/test_health.py new file mode 100644 index 000000000..f405b70f9 --- /dev/null +++ b/zaqar/tests/queues/transport/wsgi/v1_1/test_health.py @@ -0,0 +1,98 @@ +# Copyright 2014 Catalyst IT Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import ddt +import falcon +import mock + +from zaqar.openstack.common import jsonutils +from zaqar.queues.storage import errors +import zaqar.queues.storage.mongodb as mongo +from zaqar import tests as testing +from zaqar.tests.queues.transport.wsgi import base + + +@ddt.ddt +class TestHealth(base.TestBase): + + def setUp(self): + super(TestHealth, self).setUp() + + def test_basic(self): + path = self.url_prefix + '/health' + body = self.simulate_get(path) + health = jsonutils.loads(body[0]) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + self.assertTrue(health['storage_reachable']) + self.assertIsNotNone(health['message_volume']) + for op in health['operation_status']: + self.assertTrue(health['operation_status'][op]['succeeded']) + + @mock.patch.object(mongo.driver.DataDriver, '_health') + def test_message_volume(self, mock_driver_get): + def _health(): + KPI = {} + KPI['message_volume'] = {'free': 1, 'claimed': 2, 'total': 3} + return KPI + + mock_driver_get.side_effect = _health + + path = self.url_prefix + '/health' + body = self.simulate_get(path) + health = jsonutils.loads(body[0]) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + message_volume = health['message_volume'] + self.assertEqual(message_volume['free'], 1) + self.assertEqual(message_volume['claimed'], 2) + self.assertEqual(message_volume['total'], 3) + + @mock.patch.object(mongo.messages.MessageController, 'delete') + def test_operation_status(self, mock_messages_delete): + mock_messages_delete.side_effect = errors.NotPermitted() + + path = self.url_prefix + '/health' + body = self.simulate_get(path) + health = jsonutils.loads(body[0]) + self.assertEqual(self.srmock.status, falcon.HTTP_200) + op_status = health['operation_status'] + for op in op_status.keys(): + if op == 'delete_messages': + self.assertFalse(op_status[op]['succeeded']) + self.assertIsNotNone(op_status[op]['ref']) + else: + self.assertTrue(op_status[op]['succeeded']) + + +class TestHealthMongoDB(TestHealth): + + config_file = 'wsgi_mongodb.conf' + + @testing.requires_mongodb + def setUp(self): + super(TestHealthMongoDB, self).setUp() + + def tearDown(self): + super(TestHealthMongoDB, self).tearDown() + + +class TestHealthFaultyDriver(base.TestBaseFaulty): + + config_file = 'wsgi_faulty.conf' + + def test_simple(self): + path = self.url_prefix + '/health' + self.simulate_get(path) + self.assertEqual(self.srmock.status, falcon.HTTP_503)