Basic alarm threshold evaluation logic.
Partially addresses BP alarm-distributed-threshold-evaluation. Threshold evaluation logic encapsulating basic alarm statistics querying, threshold comparison, and state transition rules. Change-Id: I0f3a50809985d25ab0eceb990b142da8701a9616
This commit is contained in:
parent
1ba1bc7eda
commit
135612f0f9
0
ceilometer/alarm/__init__.py
Normal file
0
ceilometer/alarm/__init__.py
Normal file
219
ceilometer/alarm/threshold_evaluation.py
Normal file
219
ceilometer/alarm/threshold_evaluation.py
Normal file
@ -0,0 +1,219 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2013 Red Hat, Inc
|
||||
#
|
||||
# Author: Eoghan Glynn <eglynn@redhat.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import operator
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometerclient import client as ceiloclient
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
COMPARATORS = {
|
||||
'gt': operator.gt,
|
||||
'lt': operator.lt,
|
||||
'ge': operator.ge,
|
||||
'le': operator.le,
|
||||
'eq': operator.eq,
|
||||
'ne': operator.ne,
|
||||
}
|
||||
|
||||
UNKNOWN = 'insufficient data'
|
||||
OK = 'ok'
|
||||
ALARM = 'alarm'
|
||||
|
||||
|
||||
class Evaluator(object):
|
||||
"""This class implements the basic alarm threshold evaluation
|
||||
logic.
|
||||
"""
|
||||
|
||||
# the sliding evaluation window is extended to allow
|
||||
# for reporting/ingestion lag
|
||||
look_back = 1
|
||||
|
||||
# minimum number of datapoints within sliding window to
|
||||
# avoid unknown state
|
||||
quorum = 1
|
||||
|
||||
def __init__(self, notifier):
|
||||
self.alarms = []
|
||||
self.notifier = notifier
|
||||
self.api_client = None
|
||||
|
||||
def assign_alarms(self, alarms):
|
||||
"""Assign alarms to be evaluated."""
|
||||
self.alarms = alarms
|
||||
|
||||
@property
|
||||
def _client(self):
|
||||
"""Construct or reuse an authenticated API client."""
|
||||
if not self.api_client:
|
||||
auth_config = cfg.CONF.service_credentials
|
||||
creds = dict(
|
||||
os_auth_url=auth_config.os_auth_url,
|
||||
os_tenant_name=auth_config.os_tenant_name,
|
||||
os_password=auth_config.os_password,
|
||||
os_username=auth_config.os_username
|
||||
)
|
||||
self.api_client = ceiloclient.get_client(2, **creds)
|
||||
return self.api_client
|
||||
|
||||
@staticmethod
|
||||
def _constraints(alarm):
|
||||
"""Assert the constraints on the statistics query."""
|
||||
constraints = []
|
||||
for (field, value) in alarm.matching_metadata.iteritems():
|
||||
constraints.append(dict(field=field, op='eq', value=value))
|
||||
return constraints
|
||||
|
||||
@classmethod
|
||||
def _bound_duration(cls, alarm, constraints):
|
||||
"""Bound the duration of the statistics query."""
|
||||
now = datetime.datetime.utcnow()
|
||||
window = (alarm.period *
|
||||
(alarm.evaluation_periods + cls.look_back))
|
||||
start = now - datetime.timedelta(seconds=window)
|
||||
LOG.debug(_('query stats from %(start)s to %(now)s') % locals())
|
||||
after = dict(field='timestamp', op='ge', value=start.isoformat())
|
||||
before = dict(field='timestamp', op='le', value=now.isoformat())
|
||||
constraints.extend([before, after])
|
||||
return constraints
|
||||
|
||||
@staticmethod
|
||||
def _sanitize(alarm, statistics):
|
||||
"""Sanitize statistics.
|
||||
Ultimately this will be the hook for the exclusion of chaotic
|
||||
datapoints for example.
|
||||
"""
|
||||
LOG.debug(_('sanitize stats %s') % statistics)
|
||||
# in practice statistics are always sorted by period start, not
|
||||
# strictly required by the API though
|
||||
statistics = statistics[:alarm.evaluation_periods]
|
||||
LOG.debug(_('pruned statistics to %d') % len(statistics))
|
||||
return statistics
|
||||
|
||||
def _statistics(self, alarm, query):
|
||||
"""Retrieve statistics over the current window."""
|
||||
LOG.debug(_('stats query %s') % query)
|
||||
try:
|
||||
return self._client.statistics.list(alarm.counter_name,
|
||||
q=query,
|
||||
period=alarm.period)
|
||||
except Exception:
|
||||
LOG.exception(_('alarm stats retrieval failed'))
|
||||
return []
|
||||
|
||||
def _update(self, alarm, state, reason):
|
||||
"""Refresh alarm state."""
|
||||
id = alarm.alarm_id
|
||||
LOG.info(_('alarm %(id)s transitioning to %(state)s'
|
||||
' because %(reason)s') % locals())
|
||||
try:
|
||||
self._client.alarms.update(id, **dict(state=state))
|
||||
alarm.state = state
|
||||
if self.notifier:
|
||||
self.notifier.notify(alarm, state, reason)
|
||||
except Exception:
|
||||
# retry will occur naturally on the next evaluation
|
||||
# cycle (unless alarm state reverts in the meantime)
|
||||
LOG.exception(_('alarm state update failed'))
|
||||
|
||||
def _sufficient(self, alarm, statistics):
|
||||
"""Ensure there is sufficient data for evaluation,
|
||||
transitioning to unknown otherwise.
|
||||
"""
|
||||
sufficient = len(statistics) >= self.quorum
|
||||
if not sufficient and alarm.state != UNKNOWN:
|
||||
reason = _('%d datapoints are unknown') % alarm.evaluation_periods
|
||||
self._update(alarm, UNKNOWN, reason)
|
||||
return sufficient
|
||||
|
||||
@staticmethod
|
||||
def _reason(alarm, statistics, distilled, state):
|
||||
"""Fabricate reason string."""
|
||||
count = len(statistics)
|
||||
disposition = 'inside' if state == OK else 'outside'
|
||||
last = getattr(statistics[-1], alarm.statistic)
|
||||
return (_('Transition to %(state)s due to %(count)d samples'
|
||||
' %(disposition)s threshold, most recent: %(last)s') %
|
||||
locals())
|
||||
|
||||
def _transition(self, alarm, statistics, compared):
|
||||
"""Transition alarm state if necessary.
|
||||
|
||||
The transition rules are currently hardcoded as:
|
||||
|
||||
- transitioning from a known state requires an unequivocal
|
||||
set of datapoints
|
||||
|
||||
- transitioning from unknown is on the basis of the most
|
||||
recent datapoint if equivocal
|
||||
|
||||
Ultimately this will be policy-driven.
|
||||
"""
|
||||
distilled = all(compared)
|
||||
unequivocal = distilled or not any(compared)
|
||||
if unequivocal:
|
||||
state = ALARM if distilled else OK
|
||||
if alarm.state != state:
|
||||
reason = self._reason(alarm, statistics, distilled, state)
|
||||
self._update(alarm, state, reason)
|
||||
elif alarm.state == UNKNOWN:
|
||||
state = ALARM if compared[-1] else OK
|
||||
reason = self._reason(alarm, statistics, distilled, state)
|
||||
self._update(alarm, state, reason)
|
||||
|
||||
def evaluate(self):
|
||||
"""Evaluate the alarms assigned to this evaluator."""
|
||||
|
||||
LOG.info(_('initiating evaluation cycle on %d alarms') %
|
||||
len(self.alarms))
|
||||
|
||||
for alarm in self.alarms:
|
||||
|
||||
if not alarm.enabled:
|
||||
LOG.debug(_('skipping alarm %s') % alarm.alarm_id)
|
||||
continue
|
||||
LOG.debug(_('evaluating alarm %s') % alarm.alarm_id)
|
||||
|
||||
query = self._bound_duration(
|
||||
alarm,
|
||||
self._constraints(alarm)
|
||||
)
|
||||
|
||||
statistics = self._sanitize(
|
||||
alarm,
|
||||
self._statistics(alarm, query)
|
||||
)
|
||||
|
||||
if self._sufficient(alarm, statistics):
|
||||
|
||||
def _compare(stat):
|
||||
op = COMPARATORS[alarm.comparison_operator]
|
||||
value = getattr(stat, alarm.statistic)
|
||||
limit = alarm.threshold
|
||||
LOG.debug(_('comparing value %(value)s against threshold'
|
||||
' %(limit)s') % locals())
|
||||
return op(value, limit)
|
||||
|
||||
self._transition(alarm,
|
||||
statistics,
|
||||
list(map(_compare, statistics)))
|
@ -16,6 +16,7 @@ msgpack-python
|
||||
python-glanceclient
|
||||
python-novaclient>=2.6.10
|
||||
python-keystoneclient>=0.2,<0.3
|
||||
python-ceilometerclient>=1.0.1
|
||||
python-swiftclient
|
||||
lxml
|
||||
requests>=1.1,<1.2.1
|
||||
|
0
tests/alarm/__init__.py
Normal file
0
tests/alarm/__init__.py
Normal file
212
tests/alarm/test_threshold_evaluation.py
Normal file
212
tests/alarm/test_threshold_evaluation.py
Normal file
@ -0,0 +1,212 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2013 Red Hat, Inc
|
||||
#
|
||||
# Author: Eoghan Glynn <eglynn@redhat.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer/alarm/threshold_evaluation.py
|
||||
"""
|
||||
import mock
|
||||
import uuid
|
||||
|
||||
from ceilometer.alarm import threshold_evaluation
|
||||
from ceilometer.storage import models
|
||||
from ceilometer.tests import base
|
||||
from ceilometerclient import exc
|
||||
from ceilometerclient.v2 import statistics
|
||||
|
||||
|
||||
class TestEvaluate(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestEvaluate, self).setUp()
|
||||
self.api_client = mock.Mock()
|
||||
self.notifier = mock.MagicMock()
|
||||
self.alarms = [
|
||||
models.Alarm(name='instance_running_hot',
|
||||
counter_name='cpu_util',
|
||||
comparison_operator='gt',
|
||||
threshold=80.0,
|
||||
evaluation_periods=5,
|
||||
statistic='avg',
|
||||
user_id='foobar',
|
||||
project_id='snafu',
|
||||
period=60,
|
||||
alarm_id=str(uuid.uuid4()),
|
||||
matching_metadata={'resource_id':
|
||||
'my_instance'}),
|
||||
models.Alarm(name='group_running_idle',
|
||||
counter_name='cpu_util',
|
||||
comparison_operator='le',
|
||||
threshold=10.0,
|
||||
statistic='max',
|
||||
evaluation_periods=4,
|
||||
user_id='foobar',
|
||||
project_id='snafu',
|
||||
period=300,
|
||||
alarm_id=str(uuid.uuid4()),
|
||||
matching_metadata={'metadata.user_metadata.AS':
|
||||
'my_group'}),
|
||||
]
|
||||
self.evaluator = threshold_evaluation.Evaluator(self.notifier)
|
||||
self.evaluator.assign_alarms(self.alarms)
|
||||
|
||||
@staticmethod
|
||||
def _get_stat(attr, value):
|
||||
return statistics.Statistics(None, {attr: value})
|
||||
|
||||
def _set_all_alarms(self, state):
|
||||
for alarm in self.alarms:
|
||||
alarm.state = state
|
||||
|
||||
def _assert_all_alarms(self, state):
|
||||
for alarm in self.alarms:
|
||||
self.assertEqual(alarm.state, state)
|
||||
|
||||
def test_retry_transient_api_failure(self):
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
broken = exc.CommunicationError(message='broken')
|
||||
avgs = [self._get_stat('avg', self.alarms[0].threshold - v)
|
||||
for v in xrange(5)]
|
||||
maxs = [self._get_stat('max', self.alarms[1].threshold + v)
|
||||
for v in xrange(1, 4)]
|
||||
self.api_client.statistics.list.side_effect = [broken,
|
||||
broken,
|
||||
avgs,
|
||||
maxs]
|
||||
self.evaluator.evaluate()
|
||||
self._assert_all_alarms('insufficient data')
|
||||
self.evaluator.evaluate()
|
||||
self._assert_all_alarms('ok')
|
||||
|
||||
def test_simple_insufficient(self):
|
||||
self._set_all_alarms('ok')
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
self.api_client.statistics.list.return_value = []
|
||||
self.evaluator.evaluate()
|
||||
self._assert_all_alarms('insufficient data')
|
||||
expected = [mock.call(alarm.alarm_id, state='insufficient data')
|
||||
for alarm in self.alarms]
|
||||
update_calls = self.api_client.alarms.update.call_args_list
|
||||
self.assertEqual(update_calls, expected)
|
||||
expected = [mock.call(alarm,
|
||||
'insufficient data',
|
||||
('%d datapoints are unknown' %
|
||||
alarm.evaluation_periods))
|
||||
for alarm in self.alarms]
|
||||
self.assertEqual(self.notifier.notify.call_args_list, expected)
|
||||
|
||||
def test_disabled_is_skipped(self):
|
||||
self._set_all_alarms('ok')
|
||||
self.alarms[1].enabled = False
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
self.api_client.statistics.list.return_value = []
|
||||
self.evaluator.evaluate()
|
||||
self.assertEqual(self.alarms[0].state, 'insufficient data')
|
||||
self.assertEqual(self.alarms[1].state, 'ok')
|
||||
self.api_client.alarms.update.assert_called_once_with(
|
||||
self.alarms[0].alarm_id,
|
||||
state='insufficient data'
|
||||
)
|
||||
self.notifier.notify.assert_called_once_with(
|
||||
self.alarms[0],
|
||||
'insufficient data',
|
||||
mock.ANY
|
||||
)
|
||||
|
||||
def test_simple_alarm_trip(self):
|
||||
self._set_all_alarms('ok')
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
avgs = [self._get_stat('avg', self.alarms[0].threshold + v)
|
||||
for v in xrange(1, 6)]
|
||||
maxs = [self._get_stat('max', self.alarms[1].threshold - v)
|
||||
for v in xrange(4)]
|
||||
self.api_client.statistics.list.side_effect = [avgs, maxs]
|
||||
self.evaluator.evaluate()
|
||||
self._assert_all_alarms('alarm')
|
||||
expected = [mock.call(alarm.alarm_id, state='alarm')
|
||||
for alarm in self.alarms]
|
||||
update_calls = self.api_client.alarms.update.call_args_list
|
||||
self.assertEqual(update_calls, expected)
|
||||
reasons = ['Transition to alarm due to 5 samples outside'
|
||||
' threshold, most recent: 85.0',
|
||||
'Transition to alarm due to 4 samples outside'
|
||||
' threshold, most recent: 7.0']
|
||||
expected = [mock.call(alarm, 'alarm', reason)
|
||||
for alarm, reason in zip(self.alarms, reasons)]
|
||||
self.assertEqual(self.notifier.notify.call_args_list, expected)
|
||||
|
||||
def test_simple_alarm_clear(self):
|
||||
self._set_all_alarms('alarm')
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
avgs = [self._get_stat('avg', self.alarms[0].threshold - v)
|
||||
for v in xrange(5)]
|
||||
maxs = [self._get_stat('max', self.alarms[1].threshold + v)
|
||||
for v in xrange(1, 5)]
|
||||
self.api_client.statistics.list.side_effect = [avgs, maxs]
|
||||
self.evaluator.evaluate()
|
||||
self._assert_all_alarms('ok')
|
||||
expected = [mock.call(alarm.alarm_id, state='ok')
|
||||
for alarm in self.alarms]
|
||||
update_calls = self.api_client.alarms.update.call_args_list
|
||||
self.assertEqual(update_calls, expected)
|
||||
reasons = ['Transition to ok due to 5 samples inside'
|
||||
' threshold, most recent: 76.0',
|
||||
'Transition to ok due to 4 samples inside'
|
||||
' threshold, most recent: 14.0']
|
||||
expected = [mock.call(alarm, 'ok', reason)
|
||||
for alarm, reason in zip(self.alarms, reasons)]
|
||||
self.assertEqual(self.notifier.notify.call_args_list, expected)
|
||||
|
||||
def test_equivocal_from_known_state(self):
|
||||
self._set_all_alarms('ok')
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
avgs = [self._get_stat('avg', self.alarms[0].threshold + v)
|
||||
for v in xrange(5)]
|
||||
maxs = [self._get_stat('max', self.alarms[1].threshold - v)
|
||||
for v in xrange(-1, 3)]
|
||||
self.api_client.statistics.list.side_effect = [avgs, maxs]
|
||||
self.evaluator.evaluate()
|
||||
self._assert_all_alarms('ok')
|
||||
self.assertEqual(self.api_client.alarms.update.call_args_list,
|
||||
[])
|
||||
self.assertEqual(self.notifier.notify.call_args_list, [])
|
||||
|
||||
def test_equivocal_from_unknown(self):
|
||||
self._set_all_alarms('insufficient data')
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
avgs = [self._get_stat('avg', self.alarms[0].threshold + v)
|
||||
for v in xrange(1, 6)]
|
||||
maxs = [self._get_stat('max', self.alarms[1].threshold - v)
|
||||
for v in xrange(4)]
|
||||
self.api_client.statistics.list.side_effect = [avgs, maxs]
|
||||
self.evaluator.evaluate()
|
||||
self._assert_all_alarms('alarm')
|
||||
expected = [mock.call(alarm.alarm_id, state='alarm')
|
||||
for alarm in self.alarms]
|
||||
update_calls = self.api_client.alarms.update.call_args_list
|
||||
self.assertEqual(update_calls, expected)
|
||||
reasons = ['Transition to alarm due to 5 samples outside'
|
||||
' threshold, most recent: 85.0',
|
||||
'Transition to alarm due to 4 samples outside'
|
||||
' threshold, most recent: 7.0']
|
||||
expected = [mock.call(alarm, 'alarm', reason)
|
||||
for alarm, reason in zip(self.alarms, reasons)]
|
||||
self.assertEqual(self.notifier.notify.call_args_list, expected)
|
Loading…
x
Reference in New Issue
Block a user