feat(v1.1): Implement new /health endpoint for v1.1

The new /health endpoint of v1.1 is different from the /health
of v1.0. In v1.1 /health endpoint we will return a set of KPI
(Key Performance Indicator) of the target Marconi server. The
API response will be like below:

{
    "mongo_pool_1": {
        "message_volume": {
            "claimed": 0,
            "total": 0,
            "free": 0
        },
        "storage_reachable": true,
        "operation_status": {
            "create_queue": {
                "seconds": 0.0021300315856933594,
                "ref": null,
                "succeeded": true
            },
            "post_messages": {
                "seconds": 0.033502817153930664,
                "ref": null,
                "succeeded": true
            },
            "list_messages": {
                "seconds": 0.000013113021850585938,
                "ref": null,
                "succeeded": true
            },
            "claim_messages": {
                "seconds": 0.0013759136199951172,
                "ref": "3f515f37-58a0-4c81-8214-3e92979b82e7",
                "succeeded": false
            },
            "delete_queue": {
                "seconds": 0.0030739307403564453,
                "ref": null,
                "succeeded": true
            }
        }
    },
    "mongo_pool_2": {
        "message_volume": {
            "claimed": 0,
            "total": 0,
            "free": 0
        },
        "storage_reachable": true,
        "operation_status": {
            "create_queue": {
                "seconds": 0.0011799335479736328,
                "ref": null,
                "succeeded": true
            },
            "post_messages": {
                "seconds": 0.024316072463989258,
                "ref": null,
                "succeeded": true
            },
            "list_messages": {
                "seconds": 0.000008106231689453125,
                "ref": null,
                "succeeded": true
            },
            "claim_messages": {
                "seconds": 0.000576019287109375,
                "ref": "68629fda-b4ce-4cf9-978a-df0df8df36a7",
                "succeeded": false
            },
            "delete_queue": {
                "seconds": 0.003300905227661133,
                "ref": null,
                "succeeded": true
            }
        }
    },
    "catalog_reachable": true
}

docImpact
Implements: blueprint detailed-health

Change-Id: I6b281132e2fef8ce65ce54b9d6be297571f8b170
This commit is contained in:
Fei Long Wang 2014-02-18 09:28:51 +08:00 committed by Fei Long Wang
parent 5254d264d8
commit 13df51be82
14 changed files with 379 additions and 31 deletions

View File

@ -0,0 +1,89 @@
# Copyright (c) 2014 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ddt
from zaqar.tests.functional import base
from zaqar.tests.functional import helpers
@ddt.ddt
class TestHealth(base.V1_1FunctionalTestBase):
server_class = base.ZaqarAdminServer
def setUp(self):
super(TestHealth, self).setUp()
self.base_url = ("{url}/{version}".format(
url=self.cfg.zaqar.url,
version="v1.1"
))
self.cfg.zaqar.version = "v1.1"
self.client.set_base_url(self.base_url)
@ddt.data(
{
'name': "pool_1",
'weight': 10,
'uri': "sqlite:///:memory:"
}
)
def test_health_with_pool(self, params):
# FIXME(flwang): Please use mongodb after the sqlalchemy is disabled
# as pool node and the mongodb is working on gate successfully.
doc = helpers.create_pool_body(
weight=params.get('weight', 10),
uri=params.get('uri', "sqlite:///:memory:")
)
pool_name = params.get('name', "pool_1")
self.addCleanup(self.client.delete, url='/pools/' + pool_name)
result = self.client.put('/pools/' + pool_name, data=doc)
self.assertEqual(result.status_code, 201)
queue_name = 'fake_queue'
self.addCleanup(self.client.delete, url='/queues/' + queue_name)
result = self.client.put('/queues/' + queue_name)
self.assertEqual(result.status_code, 201)
sample_messages = {'messages': [
{'body': 239, 'ttl': 999},
{'body': {'key': 'value'}, 'ttl': 888}
]}
result = self.client.post('/queues/%s/messages' % queue_name,
data=sample_messages)
self.assertEqual(result.status_code, 201)
claim_metadata = {'ttl': 100, 'grace': 300}
result = self.client.post('/queues/%s/claims' % queue_name,
data=claim_metadata)
self.assertEqual(result.status_code, 201)
response = self.client.get('/health')
self.assertEqual(response.status_code, 200)
health = response.json()
self.assertEqual(health['catalog_reachable'], True)
self.assertEqual(health[pool_name]['storage_reachable'], True)
op_status = health[pool_name]['operation_status']
for op in op_status.keys():
self.assertTrue(op_status[op]['succeeded'])
message_volume = health[pool_name]['message_volume']
self.assertEqual(message_volume['claimed'], 2)
self.assertEqual(message_volume['free'], 0)
self.assertEqual(message_volume['total'], 2)

View File

@ -235,15 +235,6 @@ class TestQueueMisc(base.V1_1FunctionalTestBase):
test_list_queue_invalid_limit.tags = ['negative']
def test_check_health(self):
"""Test health endpoint."""
result = self.client.get('/{0}/health'
.format("v1.1"))
self.assertEqual(result.status_code, 204)
test_check_health.tags = ['positive']
def test_check_queue_exists(self):
"""Checks if queue exists."""

View File

@ -121,19 +121,12 @@ class TestPing(base.V1_1Base):
self.assertEqual(response, [])
class TestHealth(base.V1_1Base):
class TestHealthMongoDB(v1_1.TestHealthMongoDB):
url_prefix = URL_PREFIX
config_file = 'wsgi_sqlalchemy.conf'
def test_get(self):
response = self.simulate_get('/v1.1/health')
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.assertEqual(response, [])
def test_head(self):
response = self.simulate_head('/v1.1/health')
self.assertEqual(self.srmock.status, falcon.HTTP_204)
self.assertEqual(response, [])
class TestHealthFaultyDriver(v1_1.TestHealthFaultyDriver):
url_prefix = URL_PREFIX
@ddt.ddt

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Red Hat, Inc.
# Copyright 2014 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,15 +17,22 @@
"""Implements the DriverBase abstract class for Zaqar storage drivers."""
import abc
import functools
import time
import uuid
import six
import zaqar.openstack.common.log as logging
DEFAULT_QUEUES_PER_PAGE = 10
DEFAULT_MESSAGES_PER_PAGE = 10
DEFAULT_POOLS_PER_PAGE = 10
DEFAULT_MESSAGES_PER_CLAIM = 10
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class DriverBase(object):
@ -66,6 +74,98 @@ class DataDriverBase(DriverBase):
"""Check whether the storage is ready."""
raise NotImplementedError
def health(self):
"""Return the health status of service."""
overall_health = {}
# NOTE(flwang): KPI extracted from different storage backends,
# _health() will be implemented by different storage drivers.
backend_health = self._health()
if backend_health:
overall_health.update(backend_health)
return overall_health
@abc.abstractmethod
def _health(self):
"""Return the health status based on different backends."""
raise NotImplementedError
def _get_operation_status(self):
op_status = {}
status_template = lambda s, t, r: {'succeeded': s,
'seconds': t,
'ref': r}
project = str(uuid.uuid4())
queue = str(uuid.uuid4())
client = str(uuid.uuid4())
msg_template = lambda s: {'ttl': 600, 'body': {'event': 'p_%s' % s}}
messages = [msg_template(i) for i in range(100)]
claim_metadata = {'ttl': 60, 'grace': 300}
# NOTE (flwang): Using time.time() instead of timeit since timeit will
# make the method calling be complicated.
def _handle_status(operation_type, callable_operation):
succeeded = True
ref = None
result = None
try:
start = time.time()
result = callable_operation()
except Exception as e:
ref = str(uuid.uuid4())
LOG.exception(e, extra={'instance_uuid': ref})
succeeded = False
status = status_template(succeeded, time.time() - start, ref)
op_status[operation_type] = status
return succeeded, result
# create queue
func = functools.partial(self.queue_controller.create,
queue, project=project)
succeeded, _ = _handle_status('create_queue', func)
# post messages
if succeeded:
func = functools.partial(self.message_controller.post,
queue, messages, client, project=project)
_, msg_ids = _handle_status('post_messages', func)
# claim messages
if msg_ids:
func = functools.partial(self.claim_controller.create,
queue, claim_metadata,
project=project)
_, (claim_id, claim_msgs) = _handle_status('claim_messages',
func)
# list messages
func = functools.partial(self.message_controller.list,
queue, project, echo=True,
client_uuid=client,
include_claimed=True)
_handle_status('list_messages', func)
# delete messages
if claim_id and claim_msgs:
for message in claim_msgs:
func = functools.partial(self.
message_controller.delete,
queue, message['id'],
project, claim=claim_id)
succeeded, _ = _handle_status('delete_messages', func)
if not succeeded:
break
# delete claim
func = functools.partial(self.claim_controller.delete,
queue, claim_id, project)
_handle_status('delete_claim', func)
# delete queue
func = functools.partial(self.queue_controller.delete,
queue, project=project)
_handle_status('delete_queue', func)
return op_status
@abc.abstractproperty
def queue_controller(self):
"""Returns the driver's queue controller."""

View File

@ -95,6 +95,24 @@ class DataDriver(storage.DataDriverBase):
except pymongo.errors.PyMongoError:
return False
def _health(self):
KPI = {}
KPI['storage_reachable'] = self.is_alive()
KPI['operation_status'] = self._get_operation_status()
message_volume = {'free': 0, 'claimed': 0, 'total': 0}
for msg_col in [db.messages for db in self.message_databases]:
msg_count_claimed = msg_col.find({'c.id': {'$ne': None}}).count()
message_volume['claimed'] += msg_count_claimed
msg_count_total = msg_col.find().count()
message_volume['total'] += msg_count_total
message_volume['free'] = (message_volume['total'] -
message_volume['claimed'])
KPI['message_volume'] = message_volume
return KPI
@decorators.lazy_property(write=False)
def queues_database(self):
"""Database dedicated to the "queues" collection.

View File

@ -99,6 +99,9 @@ class DataDriver(base.DataDriverBase):
def is_alive(self):
return self._storage.is_alive()
def _health(self):
return self._storage._health()
@decorators.lazy_property(write=False)
def queue_controller(self):
stages = _get_storage_pipeline('queue', self.conf)

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Rackspace, Inc.
# Copyright 2014 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
@ -75,6 +76,19 @@ class DataDriver(storage.DataDriverBase):
for pool in
self._pool_catalog._pools_ctrl.list(limit=0))
def _health(self):
KPI = {}
# Leverage the is_alive to indicate if the backend storage is
# reachable or not
KPI['catalog_reachable'] = self.is_alive()
# Messages of each pool
for pool in self._pool_catalog._pools_ctrl.list():
driver = self._pool_catalog.get_driver(pool['name'])
KPI[pool['name']] = driver._health()
return KPI
@decorators.lazy_property(write=False)
def queue_controller(self):
return QueueController(self._pool_catalog)

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Red Hat, Inc.
# Copyright 2014 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
@ -122,6 +123,28 @@ class DataDriver(storage.DataDriverBase):
def is_alive(self):
return True
def _health(self):
KPI = {}
# Leverage the is_alive to indicate if the backend storage is
# reachable or not
KPI['storage_reachable'] = self.is_alive()
KPI['operation_status'] = self._get_operation_status()
message_volume = {'free': 0, 'claimed': 0, 'total': 0}
# NOTE(flwang): Using SQL directly to get better performance than
# sqlalchemy.
msg_count_claimed = self.get('SELECT COUNT(*) FROM MESSAGES'
' WHERE CID IS NOT NULL')
message_volume['claimed'] = int(msg_count_claimed[0])
msg_count_total = self.get('SELECT COUNT(*) FROM MESSAGES')
message_volume['total'] = int(msg_count_total[0])
message_volume['free'] = (message_volume['total'] -
message_volume['claimed'])
KPI['message_volume'] = message_volume
return KPI
class ControlDriver(storage.ControlDriverBase):

View File

@ -67,10 +67,6 @@ def public_endpoints(driver):
driver._validate,
claim_controller)),
# Health
('/health',
health.Resource(driver._storage)),
# Ping
('/ping',
ping.Resource(driver._storage))
@ -85,4 +81,7 @@ def private_endpoints(driver):
pools.Listing(pools_controller)),
('/pools/{pool}',
pools.Resource(pools_controller)),
# Health
('/health',
health.Resource(driver._storage)),
]

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Rackspace, Inc.
# Copyright (c) 2014 Rackspace, Inc.
# Copyright 2014 Catalyst IT Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
@ -12,7 +13,12 @@
# License for the specific language governing permissions and limitations under
# the License.
import falcon
from zaqar.i18n import _
from zaqar.openstack.common import log as logging
from zaqar.queues.transport import utils
from zaqar.queues.transport.wsgi import errors as wsgi_errors
LOG = logging.getLogger(__name__)
class Resource(object):
@ -23,8 +29,12 @@ class Resource(object):
self.driver = driver
def on_get(self, req, resp, **kwargs):
resp.status = (falcon.HTTP_204 if self.driver.is_alive()
else falcon.HTTP_503)
try:
resp_dict = self.driver.health()
def on_head(self, req, resp, **kwargs):
resp.status = falcon.HTTP_204
resp.content_location = req.path
resp.body = utils.to_json(resp_dict)
except Exception as ex:
LOG.exception(ex)
description = _(u'Health status could not be read.')
raise wsgi_errors.HTTPServiceUnavailable(description)

View File

@ -27,6 +27,9 @@ class DataDriver(storage.DataDriverBase):
def is_alive(self):
raise NotImplementedError()
def _health(self):
raise NotImplementedError()
@property
def queue_controller(self):
return QueueController(self)

View File

@ -81,8 +81,12 @@ class FunctionalTestBase(testing.TestBase):
self.client = http.Client()
else:
if self.server_class == ZaqarAdminServer:
self.mconf.pooling = True
self.mconf.admin_mode = True
self.client = http.WSGIClient(
bootstrap.Bootstrap(config.cfg.CONF).transport.app)
bootstrap.Bootstrap(self.mconf).transport.app)
self.headers = helpers.create_zaqar_headers(self.cfg)

View File

@ -15,6 +15,7 @@
from zaqar.tests.queues.transport.wsgi.v1_1 import test_auth
from zaqar.tests.queues.transport.wsgi.v1_1 import test_claims
from zaqar.tests.queues.transport.wsgi.v1_1 import test_default_limits
from zaqar.tests.queues.transport.wsgi.v1_1 import test_health
from zaqar.tests.queues.transport.wsgi.v1_1 import test_home
from zaqar.tests.queues.transport.wsgi.v1_1 import test_media_type
from zaqar.tests.queues.transport.wsgi.v1_1 import test_messages
@ -27,6 +28,8 @@ TestClaimsFaultyDriver = test_claims.TestClaimsFaultyDriver
TestClaimsMongoDB = test_claims.TestClaimsMongoDB
TestClaimsSqlalchemy = test_claims.TestClaimsSqlalchemy
TestDefaultLimits = test_default_limits.TestDefaultLimits
TestHealthMongoDB = test_health.TestHealthMongoDB
TestHealthFaultyDriver = test_health.TestHealthFaultyDriver
TestHomeDocument = test_home.TestHomeDocument
TestMediaType = test_media_type.TestMediaType
TestMessagesFaultyDriver = test_messages.TestMessagesFaultyDriver

View File

@ -0,0 +1,98 @@
# Copyright 2014 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ddt
import falcon
import mock
from zaqar.openstack.common import jsonutils
from zaqar.queues.storage import errors
import zaqar.queues.storage.mongodb as mongo
from zaqar import tests as testing
from zaqar.tests.queues.transport.wsgi import base
@ddt.ddt
class TestHealth(base.TestBase):
def setUp(self):
super(TestHealth, self).setUp()
def test_basic(self):
path = self.url_prefix + '/health'
body = self.simulate_get(path)
health = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
self.assertTrue(health['storage_reachable'])
self.assertIsNotNone(health['message_volume'])
for op in health['operation_status']:
self.assertTrue(health['operation_status'][op]['succeeded'])
@mock.patch.object(mongo.driver.DataDriver, '_health')
def test_message_volume(self, mock_driver_get):
def _health():
KPI = {}
KPI['message_volume'] = {'free': 1, 'claimed': 2, 'total': 3}
return KPI
mock_driver_get.side_effect = _health
path = self.url_prefix + '/health'
body = self.simulate_get(path)
health = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
message_volume = health['message_volume']
self.assertEqual(message_volume['free'], 1)
self.assertEqual(message_volume['claimed'], 2)
self.assertEqual(message_volume['total'], 3)
@mock.patch.object(mongo.messages.MessageController, 'delete')
def test_operation_status(self, mock_messages_delete):
mock_messages_delete.side_effect = errors.NotPermitted()
path = self.url_prefix + '/health'
body = self.simulate_get(path)
health = jsonutils.loads(body[0])
self.assertEqual(self.srmock.status, falcon.HTTP_200)
op_status = health['operation_status']
for op in op_status.keys():
if op == 'delete_messages':
self.assertFalse(op_status[op]['succeeded'])
self.assertIsNotNone(op_status[op]['ref'])
else:
self.assertTrue(op_status[op]['succeeded'])
class TestHealthMongoDB(TestHealth):
config_file = 'wsgi_mongodb.conf'
@testing.requires_mongodb
def setUp(self):
super(TestHealthMongoDB, self).setUp()
def tearDown(self):
super(TestHealthMongoDB, self).tearDown()
class TestHealthFaultyDriver(base.TestBaseFaulty):
config_file = 'wsgi_faulty.conf'
def test_simple(self):
path = self.url_prefix + '/health'
self.simulate_get(path)
self.assertEqual(self.srmock.status, falcon.HTTP_503)