Refactor threshold evaluator

This change creates a base class for evaluator.

The alarm service load all evaluators.
A evaluator needs to implement abstract method of this base class.

The alarm service now check in alarm.type use the right extension to
evaluate the alarm.

The previous threshold evaluator code has been moved into the threshold
extension.

Related to blueprint alarming-logical-combination

Change-Id: If6057b7db1e894333e6e9f1edb41ab75bc2c4444
This commit is contained in:
Mehdi Abaakouk 2013-09-13 08:58:25 +02:00
parent 48c85f740a
commit 42f02ab3c4
9 changed files with 266 additions and 215 deletions

View File

@ -0,0 +1,78 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 eNovance <licensing@enovance.com>
#
# Authors: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo.config import cfg
from ceilometerclient import client as ceiloclient
from ceilometer.openstack.common import log
from ceilometer.openstack.common.gettextutils import _
LOG = log.getLogger(__name__)
class Evaluator(object):
"""Base class for alarm rule evaluator plugins."""
__metaclass__ = abc.ABCMeta
def __init__(self, notifier):
self.notifier = notifier
self.api_client = None
@property
def _client(self):
"""Construct or reuse an authenticated API client."""
if not self.api_client:
auth_config = cfg.CONF.service_credentials
creds = dict(
os_auth_url=auth_config.os_auth_url,
os_tenant_name=auth_config.os_tenant_name,
os_password=auth_config.os_password,
os_username=auth_config.os_username,
cacert=auth_config.os_cacert,
endpoint_type=auth_config.os_endpoint_type,
)
self.api_client = ceiloclient.get_client(2, **creds)
return self.api_client
def _refresh(self, alarm, state, reason):
"""Refresh alarm state."""
try:
previous = alarm.state
if previous != state:
LOG.info(_('alarm %(id)s transitioning to %(state)s because '
'%(reason)s') % {'id': alarm.alarm_id,
'state': state,
'reason': reason})
self._client.alarms.update(alarm.alarm_id, **dict(state=state))
alarm.state = state
if self.notifier:
self.notifier.notify(alarm, previous, reason)
except Exception:
# retry will occur naturally on the next evaluation
# cycle (unless alarm state reverts in the meantime)
LOG.exception(_('alarm state update failed'))
@abc.abstractmethod
def evaluate(self, alarm):
pass

View File

@ -3,6 +3,7 @@
# Copyright © 2013 Red Hat, Inc
#
# Author: Eoghan Glynn <eglynn@redhat.com>
# Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -19,11 +20,9 @@
import datetime
import operator
from oslo.config import cfg
from ceilometer.alarm import evaluator
from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils
from ceilometerclient import client as ceiloclient
from ceilometer.openstack.common.gettextutils import _
LOG = log.getLogger(__name__)
@ -42,10 +41,7 @@ OK = 'ok'
ALARM = 'alarm'
class Evaluator(object):
"""This class implements the basic alarm threshold evaluation
logic.
"""
class ThresholdEvaluator(evaluator.Evaluator):
# the sliding evaluation window is extended to allow
# for reporting/ingestion lag
@ -55,31 +51,6 @@ class Evaluator(object):
# avoid unknown state
quorum = 1
def __init__(self, notifier=None):
self.alarms = []
self.notifier = notifier
self.api_client = None
def assign_alarms(self, alarms):
"""Assign alarms to be evaluated."""
self.alarms = alarms
@property
def _client(self):
"""Construct or reuse an authenticated API client."""
if not self.api_client:
auth_config = cfg.CONF.service_credentials
creds = dict(
os_auth_url=auth_config.os_auth_url,
os_tenant_name=auth_config.os_tenant_name,
os_password=auth_config.os_password,
os_username=auth_config.os_username,
cacert=auth_config.os_cacert,
endpoint_type=auth_config.os_endpoint_type,
)
self.api_client = ceiloclient.get_client(2, **creds)
return self.api_client
@classmethod
def _bound_duration(cls, alarm, constraints):
"""Bound the duration of the statistics query."""
@ -118,25 +89,6 @@ class Evaluator(object):
LOG.exception(_('alarm stats retrieval failed'))
return []
def _refresh(self, alarm, state, reason):
"""Refresh alarm state."""
try:
previous = alarm.state
if previous != state:
LOG.info(_('alarm %(id)s transitioning to %(state)s because '
'%(reason)s') % {'id': alarm.alarm_id,
'state': state,
'reason': reason})
self._client.alarms.update(alarm.alarm_id, **dict(state=state))
alarm.state = state
if self.notifier:
self.notifier.notify(alarm, previous, reason)
except Exception:
# retry will occur naturally on the next evaluation
# cycle (unless alarm state reverts in the meantime)
LOG.exception(_('alarm state update failed'))
def _sufficient(self, alarm, statistics):
"""Ensure there is sufficient data for evaluation,
transitioning to unknown otherwise.
@ -194,40 +146,27 @@ class Evaluator(object):
reason = self._reason(alarm, statistics, distilled, state)
self._refresh(alarm, state, reason)
def evaluate(self):
"""Evaluate the alarms assigned to this evaluator."""
def evaluate(self, alarm):
query = self._bound_duration(
alarm,
alarm.rule['query']
)
LOG.info(_('initiating evaluation cycle on %d alarms') %
len(self.alarms))
statistics = self._sanitize(
alarm,
self._statistics(alarm, query)
)
for alarm in self.alarms:
if self._sufficient(alarm, statistics):
def _compare(stat):
op = COMPARATORS[alarm.rule['comparison_operator']]
value = getattr(stat, alarm.rule['statistic'])
limit = alarm.rule['threshold']
LOG.debug(_('comparing value %(value)s against threshold'
' %(limit)s') %
{'value': value, 'limit': limit})
return op(value, limit)
if not alarm.enabled:
LOG.debug(_('skipping alarm %s') % alarm.alarm_id)
continue
LOG.debug(_('evaluating alarm %s') % alarm.alarm_id)
query = self._bound_duration(
alarm,
alarm.rule['query']
)
statistics = self._sanitize(
alarm,
self._statistics(alarm, query)
)
if self._sufficient(alarm, statistics):
def _compare(stat):
op = COMPARATORS[alarm.rule['comparison_operator']]
value = getattr(stat, alarm.rule['statistic'])
limit = alarm.rule['threshold']
LOG.debug(_('comparing value %(value)s against threshold'
' %(limit)s') %
{'value': value, 'limit': limit})
return op(value, limit)
self._transition(alarm,
statistics,
list(map(_compare, statistics)))
self._transition(alarm,
statistics,
map(_compare, statistics))

View File

@ -33,11 +33,13 @@ from ceilometerclient import client as ceiloclient
OPTS = [
cfg.IntOpt('threshold_evaluation_interval',
cfg.IntOpt('evaluation_interval',
default=60,
help='Period of threshold evaluation cycle, should'
help='Period of evaluation cycle, should'
' be >= than configured pipeline interval for'
' collection of underlying metrics.'),
' collection of underlying metrics.',
deprecated_opts=[cfg.DeprecatedOpt(
'threshold_evaluation_interval', group='alarm')])
]
cfg.CONF.register_opts(OPTS, group='alarm')
@ -49,54 +51,68 @@ LOG = log.getLogger(__name__)
class SingletonAlarmService(os_service.Service):
ALARM_NAMESPACE = 'ceilometer.alarm'
EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator"
def __init__(self):
super(SingletonAlarmService, self).__init__()
self.extension_manager = extension.ExtensionManager(
namespace=self.ALARM_NAMESPACE,
self.api_client = None
self.evaluators = extension.ExtensionManager(
self.EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(rpc_alarm.RPCAlarmNotifier(),)
)
invoke_args=(rpc_alarm.RPCAlarmNotifier(),))
self.supported_evaluators = [ext.name for ext in
self.evaluators.extensions]
def start(self):
super(SingletonAlarmService, self).start()
for ext in self.extension_manager.extensions:
if ext.name == 'threshold_eval':
self.threshold_eval = ext.obj
interval = cfg.CONF.alarm.threshold_evaluation_interval
args = [ext.obj, self._client()]
self.tg.add_timer(
interval,
self._evaluate_all_alarms,
0,
*args)
break
interval = cfg.CONF.alarm.evaluation_interval
self.tg.add_timer(
interval,
self._evaluate_all_alarms,
0)
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
@staticmethod
def _client():
auth_config = cfg.CONF.service_credentials
creds = dict(
os_auth_url=auth_config.os_auth_url,
os_tenant_name=auth_config.os_tenant_name,
os_password=auth_config.os_password,
os_username=auth_config.os_username,
cacert=auth_config.os_cacert,
endpoint_type=auth_config.os_endpoint_type,
)
return ceiloclient.get_client(2, **creds)
@property
def _client(self):
"""Construct or reuse an authenticated API client."""
if not self.api_client:
auth_config = cfg.CONF.service_credentials
creds = dict(
os_auth_url=auth_config.os_auth_url,
os_tenant_name=auth_config.os_tenant_name,
os_password=auth_config.os_password,
os_username=auth_config.os_username,
cacert=auth_config.os_cacert,
endpoint_type=auth_config.os_endpoint_type,
)
self.api_client = ceiloclient.get_client(2, **creds)
return self.api_client
@staticmethod
def _evaluate_all_alarms(threshold_eval, api_client):
def _evaluate_all_alarms(self):
try:
alarms = api_client.alarms.list()
threshold_eval.assign_alarms(alarms)
threshold_eval.evaluate()
alarms = self._client.alarms.list()
LOG.info(_('initiating evaluation cycle on %d alarms') %
len(alarms))
for alarm in alarms:
self._evaluate_alarm(alarm)
except Exception:
LOG.exception(_('threshold evaluation cycle failed'))
def _evaluate_alarm(self, alarm):
"""Evaluate the alarms assigned to this evaluator."""
if not alarm.enabled:
LOG.debug(_('skipping alarm %s: alarm disabled') %
alarm.alarm_id)
return
if alarm.type not in self.supported_evaluators:
LOG.debug(_('skipping alarm %s: type unsupported') %
alarm.alarm_id)
return
LOG.debug(_('evaluating alarm %s') % alarm.alarm_id)
self.evaluators[alarm.type].obj.evaluate(alarm)
def singleton_alarm():
prepare_service()

View File

@ -610,10 +610,10 @@
# Options defined in ceilometer.alarm.service
#
# Period of threshold evaluation cycle, should be >= than
# configured pipeline interval for collection of underlying
# metrics. (integer value)
#threshold_evaluation_interval=60
# Period of evaluation cycle, should be >= than configured
# pipeline interval for collection of underlying metrics.
# (integer value)
#evaluation_interval=60
#

View File

@ -102,8 +102,8 @@ ceilometer.publisher =
udp = ceilometer.publisher.udp:UDPPublisher
file = ceilometer.publisher.file:FilePublisher
ceilometer.alarm =
threshold_eval = ceilometer.alarm.threshold_evaluation:Evaluator
ceilometer.alarm.evaluator =
threshold = ceilometer.alarm.evaluator.threshold:ThresholdEvaluator
ceilometer.alarm.notifier =
log = ceilometer.alarm.notifier.log:LogAlarmNotifier
@ -129,6 +129,7 @@ ceilometer.dispatcher =
database = ceilometer.collector.dispatcher.database:DatabaseDispatcher
file = ceilometer.collector.dispatcher.file:FileDispatcher
[build_sphinx]
all_files = 1
build-dir = doc/build

View File

View File

@ -0,0 +1,53 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 eNovance <licensing@enovance.com>
#
# Author: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
# Eoghan Glynn <eglynn@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base class for tests in ceilometer/alarm/evaluator/
"""
import mock
from ceilometer.openstack.common import timeutils
from ceilometer.tests import base
class TestEvaluatorBase(base.TestCase):
def setUp(self):
super(TestEvaluatorBase, self).setUp()
self.api_client = mock.Mock()
self.notifier = mock.MagicMock()
self.evaluator = self.EVALUATOR(self.notifier)
self.prepare_alarms()
def tearDown(self):
super(TestEvaluatorBase, self).tearDown()
timeutils.utcnow.override_time = None
@staticmethod
def prepare_alarms(self):
self.alarms = []
def _evaluate_all_alarms(self):
for alarm in self.alarms:
self.evaluator.evaluate(alarm)
def _set_all_alarms(self, state):
for alarm in self.alarms:
alarm.state = state
def _assert_all_alarms(self, state):
for alarm in self.alarms:
self.assertEqual(alarm.state, state)

View File

@ -15,25 +15,24 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/alarm/threshold_evaluation.py
"""Tests for ceilometer/alarm/evaluator/threshold.py
"""
import datetime
import mock
import uuid
from ceilometer.alarm import threshold_evaluation
from ceilometer.alarm.evaluator import threshold
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models
from ceilometer.tests import base
from ceilometerclient import exc
from ceilometerclient.v2 import statistics
from tests.alarm.evaluator import base
class TestEvaluate(base.TestCase):
def setUp(self):
super(TestEvaluate, self).setUp()
self.api_client = mock.Mock()
self.notifier = mock.MagicMock()
class TestEvaluate(base.TestEvaluatorBase):
EVALUATOR = threshold.ThresholdEvaluator
def prepare_alarms(self):
self.alarms = [
models.Alarm(name='instance_running_hot',
description='instance_running_hot',
@ -92,25 +91,11 @@ class TestEvaluate(base.TestCase):
'value': 'my_group'}])
),
]
self.evaluator = threshold_evaluation.Evaluator(self.notifier)
self.evaluator.assign_alarms(self.alarms)
def tearDown(self):
super(TestEvaluate, self).tearDown()
timeutils.utcnow.override_time = None
@staticmethod
def _get_stat(attr, value):
return statistics.Statistics(None, {attr: value})
def _set_all_alarms(self, state):
for alarm in self.alarms:
alarm.state = state
def _assert_all_alarms(self, state):
for alarm in self.alarms:
self.assertEqual(alarm.state, state)
def test_retry_transient_api_failure(self):
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
@ -123,9 +108,9 @@ class TestEvaluate(base.TestCase):
broken,
avgs,
maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('insufficient data')
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('ok')
def test_simple_insufficient(self):
@ -133,7 +118,7 @@ class TestEvaluate(base.TestCase):
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
self.api_client.statistics.list.return_value = []
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('insufficient data')
expected = [mock.call(alarm.alarm_id, state='insufficient data')
for alarm in self.alarms]
@ -146,25 +131,6 @@ class TestEvaluate(base.TestCase):
for alarm in self.alarms]
self.assertEqual(self.notifier.notify.call_args_list, expected)
def test_disabled_is_skipped(self):
self._set_all_alarms('ok')
self.alarms[1].enabled = False
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
self.api_client.statistics.list.return_value = []
self.evaluator.evaluate()
self.assertEqual(self.alarms[0].state, 'insufficient data')
self.assertEqual(self.alarms[1].state, 'ok')
self.api_client.alarms.update.assert_called_once_with(
self.alarms[0].alarm_id,
state='insufficient data'
)
self.notifier.notify.assert_called_once_with(
self.alarms[0],
'ok',
mock.ANY
)
def test_simple_alarm_trip(self):
self._set_all_alarms('ok')
with mock.patch('ceilometerclient.client.get_client',
@ -174,7 +140,7 @@ class TestEvaluate(base.TestCase):
maxs = [self._get_stat('max', self.alarms[1].rule['threshold'] - v)
for v in xrange(4)]
self.api_client.statistics.list.side_effect = [avgs, maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('alarm')
expected = [mock.call(alarm.alarm_id, state='alarm')
for alarm in self.alarms]
@ -197,7 +163,7 @@ class TestEvaluate(base.TestCase):
maxs = [self._get_stat('max', self.alarms[1].rule['threshold'] + v)
for v in xrange(1, 5)]
self.api_client.statistics.list.side_effect = [avgs, maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('ok')
expected = [mock.call(alarm.alarm_id, state='ok')
for alarm in self.alarms]
@ -220,7 +186,7 @@ class TestEvaluate(base.TestCase):
maxs = [self._get_stat('max', self.alarms[1].rule['threshold'] - v)
for v in xrange(-1, 3)]
self.api_client.statistics.list.side_effect = [avgs, maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('ok')
self.assertEqual(self.api_client.alarms.update.call_args_list,
[])
@ -236,7 +202,7 @@ class TestEvaluate(base.TestCase):
maxs = [self._get_stat('max', self.alarms[1].rule['threshold'] - v)
for v in xrange(-1, 3)]
self.api_client.statistics.list.side_effect = [avgs, maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('ok')
self.assertEqual(self.api_client.alarms.update.call_args_list,
[])
@ -255,7 +221,7 @@ class TestEvaluate(base.TestCase):
maxs = [self._get_stat('max', self.alarms[1].rule['threshold'] - v)
for v in xrange(4)]
self.api_client.statistics.list.side_effect = [avgs, maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('alarm')
self.assertEqual(self.api_client.alarms.update.call_args_list,
[])
@ -275,7 +241,7 @@ class TestEvaluate(base.TestCase):
maxs = [self._get_stat('max', self.alarms[1].rule['threshold'] - v)
for v in xrange(4)]
self.api_client.statistics.list.side_effect = [avgs, maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('alarm')
expected = [mock.call(alarm.alarm_id, state='alarm')
for alarm in self.alarms]
@ -298,7 +264,7 @@ class TestEvaluate(base.TestCase):
maxs = [self._get_stat('max', self.alarms[1].rule['threshold'] - v)
for v in xrange(4)]
self.api_client.statistics.list.side_effect = [avgs, maxs]
self.evaluator.evaluate()
self._evaluate_all_alarms()
self._assert_all_alarms('alarm')
expected = [mock.call(alarm.alarm_id, state='alarm')
for alarm in self.alarms]

View File

@ -18,13 +18,11 @@
"""Tests for ceilometer/alarm/service.py
"""
import mock
import uuid
from stevedore import extension
from stevedore.tests import manager as extension_tests
from ceilometer.alarm import service
from ceilometer.storage import models
from ceilometer.tests import base
@ -32,18 +30,19 @@ class TestSingletonAlarmService(base.TestCase):
def setUp(self):
super(TestSingletonAlarmService, self).setUp()
self.threshold_eval = mock.Mock()
self.extension_mgr = extension_tests.TestExtensionManager(
self.evaluators = extension_tests.TestExtensionManager(
[
extension.Extension(
'threshold_eval',
'threshold',
None,
None,
self.threshold_eval, ),
self.threshold_eval),
])
self.api_client = mock.MagicMock()
self.singleton = service.SingletonAlarmService()
self.singleton.tg = mock.Mock()
self.singleton.extension_manager = self.extension_mgr
self.singleton.evaluators = self.evaluators
self.singleton.supported_evaluators = ['threshold']
def test_start(self):
with mock.patch('ceilometerclient.client.get_client',
@ -52,46 +51,45 @@ class TestSingletonAlarmService(base.TestCase):
expected = [
mock.call(60,
self.singleton._evaluate_all_alarms,
0,
self.threshold_eval,
self.api_client),
0),
mock.call(604800, mock.ANY),
]
actual = self.singleton.tg.add_timer.call_args_list
self.assertEqual(actual, expected)
def test_evaluation_cycle(self):
alarm = mock.Mock(enabled=True,
type='threshold')
self.api_client.alarms.list.return_value = [alarm]
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
self.singleton._evaluate_all_alarms()
self.threshold_eval.evaluate.assert_called_once_with(alarm)
def test_disabled_is_skipped(self):
alarms = [
models.Alarm(name='instance_running_hot',
type='threshold',
user_id='foobar',
project_id='snafu',
enabled=True,
description='',
repeat_actions=False,
state='insufficient data',
state_timestamp=None,
timestamp=None,
ok_actions=[],
alarm_actions=[],
insufficient_data_actions=[],
alarm_id=str(uuid.uuid4()),
rule=dict(
statistic='avg',
comparison_operator='gt',
threshold=80.0,
evaluation_periods=5,
period=60,
query=[],
)),
mock.Mock(enabled=False,
type='threshold'),
mock.Mock(enabled=True,
type='threshold'),
]
self.api_client.alarms.list.return_value = alarms
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
self.singleton.start()
self.singleton._evaluate_all_alarms(
self.threshold_eval,
self.api_client
)
self.threshold_eval.assign_alarms.assert_called_once_with(alarms)
self.threshold_eval.evaluate.assert_called_once_with()
self.singleton._evaluate_all_alarms()
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
def test_unknown_extention_skipped(self):
alarms = [
mock.Mock(type='not_existing_type'),
mock.Mock(type='threshold')
]
self.api_client.alarms.list.return_value = alarms
with mock.patch('ceilometerclient.client.get_client',
return_value=self.api_client):
self.singleton.start()
self.singleton._evaluate_all_alarms()
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])