Enabling self-disabled pollster

Avoid loading pollster if no required environment, and stopped polling if
resources become unavailable

Change-Id: Ib3cba8ab5236e28b05a091619ebcf149767e35d8
Implements: blueprint self-disabled-pollster
This commit is contained in:
Edwin Zhai 2014-12-12 10:54:46 +08:00
parent ce34a4eae0
commit 9e5b690502
7 changed files with 262 additions and 12 deletions

View File

@ -29,6 +29,7 @@ import six
from six.moves.urllib import parse as urlparse from six.moves.urllib import parse as urlparse
from stevedore import extension from stevedore import extension
from ceilometer.agent import plugin_base
from ceilometer import coordination from ceilometer import coordination
from ceilometer.i18n import _ from ceilometer.i18n import _
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
@ -63,6 +64,7 @@ class Resources(object):
self.agent_manager = agent_manager self.agent_manager = agent_manager
self._resources = [] self._resources = []
self._discovery = [] self._discovery = []
self.blacklist = []
def setup(self, pipeline): def setup(self, pipeline):
self._resources = pipeline.resources self._resources = pipeline.resources
@ -127,15 +129,22 @@ class PollingTask(object):
LOG.info(_("Polling pollster %(poll)s in the context of " LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"), "%(src)s"),
dict(poll=pollster.name, src=source_name)) dict(poll=pollster.name, src=source_name))
pollster_resources = None pollster_resources = []
if pollster.obj.default_discovery: if pollster.obj.default_discovery:
pollster_resources = self.manager.discover( pollster_resources = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache) [pollster.obj.default_discovery], discovery_cache)
key = Resources.key(source_name, pollster) key = Resources.key(source_name, pollster)
source_resources = list( source_resources = list(
self.resources[key].get(discovery_cache)) self.resources[key].get(discovery_cache))
polling_resources = (source_resources or candidate_res = (source_resources or
pollster_resources) pollster_resources)
# Exclude the failed resource from polling
black_res = self.resources[key].blacklist
polling_resources = [
x for x in candidate_res if x not in black_res]
# If no resources, skip for this pollster
if not polling_resources: if not polling_resources:
LOG.info(_("Skip polling pollster %s, no resources" LOG.info(_("Skip polling pollster %s, no resources"
" found"), pollster.name) " found"), pollster.name)
@ -148,6 +157,12 @@ class PollingTask(object):
resources=polling_resources resources=polling_resources
)) ))
publisher(samples) publisher(samples)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.append(err.fail_res)
except Exception as err: except Exception as err:
LOG.warning(_( LOG.warning(_(
'Continue after error from %(name)s: %(error)s') 'Continue after error from %(name)s: %(error)s')
@ -198,9 +213,18 @@ class AgentManager(os_service.Service):
def _extensions(category, agent_ns=None): def _extensions(category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
else 'ceilometer.%s' % category) else 'ceilometer.%s' % category)
def _catch_extension_load_error(mgr, ep, exc):
# Extension raising ExtensionLoadError can be ignored
if isinstance(exc, plugin_base.ExtensionLoadError):
LOG.error(_("Skip loading extension for %s") % ep.name)
return
raise exc
return extension.ExtensionManager( return extension.ExtensionManager(
namespace=namespace, namespace=namespace,
invoke_on_load=True, invoke_on_load=True,
on_load_failure_callback=_catch_extension_load_error,
) )
def join_partitioning_groups(self): def join_partitioning_groups(self):

View File

@ -187,10 +187,48 @@ class NotificationBase(PluginBase):
p(list(self.process_notification(notification))) p(list(self.process_notification(notification)))
class ExtensionLoadError(Exception):
"""Error of loading pollster plugin.
PollsterBase provides a hook, setup_environment, called in pollster loading
to setup required HW/SW dependency. Any exception from it would be
propagated as ExtensionLoadError, then skip loading this pollster.
"""
pass
class PollsterPermanentError(Exception):
"""Permenant error when polling.
When unrecoverable error happened in polling, pollster can raise this
exception with failed resource to prevent itself from polling any more.
Resource is one of parameter resources from get_samples that cause polling
error.
"""
def __init__(self, resource):
self.fail_res = resource
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class PollsterBase(PluginBase): class PollsterBase(PluginBase):
"""Base class for plugins that support the polling API.""" """Base class for plugins that support the polling API."""
def setup_environment(self):
"""Setup required environment for pollster.
Each subclass could overwrite it for specific usage. Any exception
raised in this function would prevent pollster being loaded.
"""
pass
def __init__(self):
super(PollsterBase, self).__init__()
try:
self.setup_environment()
except Exception as err:
raise ExtensionLoadError(err)
@abc.abstractproperty @abc.abstractproperty
def default_discovery(self): def default_discovery(self):
"""Default discovery to use for this pollster. """Default discovery to use for this pollster.

View File

@ -0,0 +1,29 @@
# Copyright 2014 Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Pollsters for IPMI and Intel Node Manager
"""
from oslo_config import cfg
OPTS = [
cfg.IntOpt('polling_retry',
default=3,
help='Tolerance of IPMI/NM polling failures '
'before disable this pollster. '
'Negative indicates retrying forever.')
]
cfg.CONF.register_opts(OPTS, group='ipmi')

View File

@ -19,17 +19,31 @@ from oslo_utils import timeutils
import six import six
from ceilometer.agent import plugin_base from ceilometer.agent import plugin_base
from ceilometer.i18n import _
from ceilometer.ipmi.platform import exception as nmexcept
from ceilometer.ipmi.platform import intel_node_manager as node_manager from ceilometer.ipmi.platform import intel_node_manager as node_manager
from ceilometer.openstack.common import log
from ceilometer import sample from ceilometer import sample
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_opt('host', 'ceilometer.service') CONF.import_opt('host', 'ceilometer.service')
CONF.import_opt('polling_retry', 'ceilometer.ipmi.pollsters',
group='ipmi')
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class _Base(plugin_base.PollsterBase): class _Base(plugin_base.PollsterBase):
def __init__(self):
def setup_environment(self):
super(_Base, self).setup_environment()
self.nodemanager = node_manager.NodeManager() self.nodemanager = node_manager.NodeManager()
self.polling_failures = 0
# Do not load this extension if no NM support
if not self.nodemanager.nm_support:
raise plugin_base.ExtensionLoadError()
@property @property
def default_discovery(self): def default_discovery(self):
@ -40,7 +54,22 @@ class _Base(plugin_base.PollsterBase):
"""Return data sample for IPMI.""" """Return data sample for IPMI."""
def get_samples(self, manager, cache, resources): def get_samples(self, manager, cache, resources):
stats = self.read_data() # Only one resource for Node Manager pollster
try:
stats = self.read_data()
except nmexcept.IPMIException:
self.polling_failures += 1
LOG.warning(_('Polling %(name)s faild for %(cnt)s times!')
% ({'name': self.NAME,
'cnt': self.polling_failures}))
if (CONF.ipmi.polling_retry >= 0 and
self.polling_failures > CONF.ipmi.polling_retry):
LOG.warning(_('Pollster for %s is disabled!') % self.NAME)
raise plugin_base.PollsterPermanentError(resources[0])
else:
return
self.polling_failures = 0
metadata = { metadata = {
'node': CONF.host 'node': CONF.host

View File

@ -16,12 +16,19 @@ from oslo_config import cfg
from oslo_utils import timeutils from oslo_utils import timeutils
from ceilometer.agent import plugin_base from ceilometer.agent import plugin_base
from ceilometer.i18n import _
from ceilometer.ipmi.notifications import ironic as parser from ceilometer.ipmi.notifications import ironic as parser
from ceilometer.ipmi.platform import exception as ipmiexcept
from ceilometer.ipmi.platform import ipmi_sensor from ceilometer.ipmi.platform import ipmi_sensor
from ceilometer.openstack.common import log
from ceilometer import sample from ceilometer import sample
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_opt('host', 'ceilometer.service') CONF.import_opt('host', 'ceilometer.service')
CONF.import_opt('polling_retry', 'ceilometer.ipmi.pollsters',
group='ipmi')
LOG = log.getLogger(__name__)
class InvalidSensorData(ValueError): class InvalidSensorData(ValueError):
@ -31,8 +38,14 @@ class InvalidSensorData(ValueError):
class SensorPollster(plugin_base.PollsterBase): class SensorPollster(plugin_base.PollsterBase):
METRIC = None METRIC = None
def __init__(self): def setup_environment(self):
super(SensorPollster, self).setup_environment()
self.ipmi = ipmi_sensor.IPMISensor() self.ipmi = ipmi_sensor.IPMISensor()
self.polling_failures = 0
# Do not load this extension if no IPMI support
if not self.ipmi.ipmi_support:
raise plugin_base.ExtensionLoadError()
@property @property
def default_discovery(self): def default_discovery(self):
@ -47,7 +60,23 @@ class SensorPollster(plugin_base.PollsterBase):
return [] return []
def get_samples(self, manager, cache, resources): def get_samples(self, manager, cache, resources):
stats = self.ipmi.read_sensor_any(self.METRIC) # Only one resource for IPMI pollster
try:
stats = self.ipmi.read_sensor_any(self.METRIC)
except ipmiexcept.IPMIException:
self.polling_failures += 1
LOG.warning(_(
'Polling %(mtr)s sensor failed for %(cnt)s times!')
% ({'mtr': self.METRIC,
'cnt': self.polling_failures}))
if (CONF.ipmi.polling_retry >= 0 and
self.polling_failures > CONF.ipmi.polling_retry):
LOG.warning(_('Pollster for %s is disabled!') % self.METRIC)
raise plugin_base.PollsterPermanentError(resources[0])
else:
return
self.polling_failures = 0
sensor_type_data = self._get_sensor_types(stats, self.METRIC) sensor_type_data = self._get_sensor_types(stats, self.METRIC)

View File

@ -42,6 +42,7 @@ import ceilometer.image.glance
import ceilometer.image.notifications import ceilometer.image.notifications
import ceilometer.ipmi.notifications.ironic import ceilometer.ipmi.notifications.ironic
import ceilometer.ipmi.platform.intel_node_manager import ceilometer.ipmi.platform.intel_node_manager
import ceilometer.ipmi.pollsters
import ceilometer.middleware import ceilometer.middleware
import ceilometer.network.notifications import ceilometer.network.notifications
import ceilometer.neutron_client import ceilometer.neutron_client
@ -116,7 +117,9 @@ def list_opts():
('dispatcher_file', ceilometer.dispatcher.file.OPTS), ('dispatcher_file', ceilometer.dispatcher.file.OPTS),
('event', ceilometer.event.converter.OPTS), ('event', ceilometer.event.converter.OPTS),
('hardware', ceilometer.hardware.discovery.OPTS), ('hardware', ceilometer.hardware.discovery.OPTS),
('ipmi', ceilometer.ipmi.platform.intel_node_manager.OPTS), ('ipmi',
itertools.chain(ceilometer.ipmi.platform.intel_node_manager.OPTS,
ceilometer.ipmi.pollsters.OPTS)),
('notification', ceilometer.notification.OPTS), ('notification', ceilometer.notification.OPTS),
('polling', ceilometer.agent.manager.OPTS), ('polling', ceilometer.agent.manager.OPTS),
('publisher', ceilometer.publisher.utils.OPTS), ('publisher', ceilometer.publisher.utils.OPTS),

View File

@ -26,6 +26,10 @@ from ceilometer import pipeline
from ceilometer.tests.agent import agentbase from ceilometer.tests.agent import agentbase
class PollingException(Exception):
pass
class TestManager(base.BaseTestCase): class TestManager(base.BaseTestCase):
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
@ -45,6 +49,41 @@ class TestManager(base.BaseTestCase):
pollster_list=['storage.*']) pollster_list=['storage.*'])
self.assertEqual(0, len(list(mgr.extensions))) self.assertEqual(0, len(list(mgr.extensions)))
# Test plugin load behavior based on Node Manager pollsters.
# pollster_list is just a filter, so sensor pollsters under 'ipmi'
# namespace would be also instanced. Still need mock __init__ for it.
@mock.patch('ceilometer.ipmi.pollsters.node._Base.__init__',
mock.Mock(return_value=None))
@mock.patch('ceilometer.ipmi.pollsters.sensor.SensorPollster.__init__',
mock.Mock(return_value=None))
def test_load_normal_plugins(self):
mgr = manager.AgentManager(namespaces=['ipmi'],
pollster_list=['hardware.ipmi.node.*'])
# 2 pollsters for Node Manager
self.assertEqual(2, len(mgr.extensions))
# Skip loading pollster upon ExtensionLoadError
@mock.patch('ceilometer.ipmi.pollsters.node._Base.__init__',
mock.Mock(side_effect=plugin_base.ExtensionLoadError))
@mock.patch('ceilometer.ipmi.pollsters.sensor.SensorPollster.__init__',
mock.Mock(return_value=None))
def test_load_failed_plugins(self):
mgr = manager.AgentManager(namespaces=['ipmi'],
pollster_list=['hardware.ipmi.node.*'])
# 0 pollsters
self.assertEqual(0, len(mgr.extensions))
# Exceptions other than ExtensionLoadError are propagated
@mock.patch('ceilometer.ipmi.pollsters.node._Base.__init__',
mock.Mock(side_effect=PollingException))
@mock.patch('ceilometer.ipmi.pollsters.sensor.SensorPollster.__init__',
mock.Mock(return_value=None))
def test_load_exceptional_plugins(self):
self.assertRaises(PollingException,
manager.AgentManager,
['ipmi'],
['hardware.ipmi.node.*'])
class TestPollsterKeystone(agentbase.TestPollster): class TestPollsterKeystone(agentbase.TestPollster):
@plugin_base.check_keystone @plugin_base.check_keystone
@ -55,6 +94,23 @@ class TestPollsterKeystone(agentbase.TestPollster):
resources=resources) resources=resources)
class TestPollsterPollingException(agentbase.TestPollster):
polling_failures = 0
def get_samples(self, manager, cache, resources):
func = super(TestPollsterPollingException, self).get_samples
sample = func(manager=manager,
cache=cache,
resources=resources)
# Raise polling exception after 2 times
self.polling_failures += 1
if self.polling_failures > 2:
raise plugin_base.PollsterPermanentError()
return sample
class TestRunTasks(agentbase.BaseAgentManagerTestCase): class TestRunTasks(agentbase.BaseAgentManagerTestCase):
class PollsterKeystone(TestPollsterKeystone): class PollsterKeystone(TestPollsterKeystone):
@ -71,6 +127,20 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
timestamp=agentbase.default_test_data.timestamp, timestamp=agentbase.default_test_data.timestamp,
resource_metadata=agentbase.default_test_data.resource_metadata) resource_metadata=agentbase.default_test_data.resource_metadata)
class PollsterPollingException(TestPollsterPollingException):
samples = []
resources = []
test_data = agentbase.TestSample(
name='testpollingexception',
type=agentbase.default_test_data.type,
unit=agentbase.default_test_data.unit,
volume=agentbase.default_test_data.volume,
user_id=agentbase.default_test_data.user_id,
project_id=agentbase.default_test_data.project_id,
resource_id=agentbase.default_test_data.resource_id,
timestamp=agentbase.default_test_data.timestamp,
resource_metadata=agentbase.default_test_data.resource_metadata)
@staticmethod @staticmethod
def create_manager(): def create_manager():
return manager.AgentManager() return manager.AgentManager()
@ -85,14 +155,20 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
def tearDown(self): def tearDown(self):
self.PollsterKeystone.samples = [] self.PollsterKeystone.samples = []
self.PollsterKeystone.resources = [] self.PollsterKeystone.resources = []
self.PollsterPollingException.samples = []
self.PollsterPollingException.resources = []
super(TestRunTasks, self).tearDown() super(TestRunTasks, self).tearDown()
def create_extension_list(self): def create_extension_list(self):
exts = super(TestRunTasks, self).create_extension_list() exts = super(TestRunTasks, self).create_extension_list()
exts.append(extension.Extension('testkeystone', exts.extend([extension.Extension('testkeystone',
None, None,
None, None,
self.PollsterKeystone(),)) self.PollsterKeystone(), ),
extension.Extension('testpollingexception',
None,
None,
self.PollsterPollingException(), )])
return exts return exts
def test_get_sample_resources(self): def test_get_sample_resources(self):
@ -126,3 +202,25 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
super(TestRunTasks, self).test_interval_exception_isolation() super(TestRunTasks, self).test_interval_exception_isolation()
self.assertEqual(1, len(self.PollsterException.samples)) self.assertEqual(1, len(self.PollsterException.samples))
self.assertEqual(1, len(self.PollsterExceptionAnother.samples)) self.assertEqual(1, len(self.PollsterExceptionAnother.samples))
def test_polling_exception(self):
self.pipeline_cfg = [
{
'name': "test_pollingexception",
'interval': 10,
'counters': ['testpollingexception'],
'resources': ['test://'] if self.source_resources else [],
'transformers': [],
'publishers': ["test"],
},
]
self.mgr.pipeline_manager = pipeline.PipelineManager(
self.pipeline_cfg,
self.transformer_manager)
polling_tasks = self.mgr.setup_polling_tasks()
# 2 samples after 4 pollings, as pollster got disabled unpon exception
for x in range(0, 4):
self.mgr.interval_task(polling_tasks.values()[0])
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
self.assertEqual(2, len(pub.samples))