Merge "Make GnocchiThreshold evaluator derived from ThresholdEvaluator"
This commit is contained in:
commit
8aebe3bff8
@ -13,31 +13,17 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import operator
|
||||
|
||||
from aodh.alarm.evaluator import threshold
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import timeutils
|
||||
import requests
|
||||
import six.moves
|
||||
|
||||
from aodh.alarm import evaluator
|
||||
from aodh.i18n import _
|
||||
from aodh import keystone_client
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
COMPARATORS = {
|
||||
'gt': operator.gt,
|
||||
'lt': operator.lt,
|
||||
'ge': operator.ge,
|
||||
'le': operator.le,
|
||||
'eq': operator.eq,
|
||||
'ne': operator.ne,
|
||||
}
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('gnocchi_url',
|
||||
default="http://localhost:8041",
|
||||
@ -48,18 +34,10 @@ cfg.CONF.register_opts(OPTS, group="alarms")
|
||||
cfg.CONF.import_opt('http_timeout', 'aodh.service')
|
||||
|
||||
|
||||
class GnocchiThresholdEvaluator(evaluator.Evaluator):
|
||||
|
||||
# the sliding evaluation window is extended to allow
|
||||
# for reporting/ingestion lag
|
||||
look_back = 1
|
||||
|
||||
# minimum number of datapoints within sliding window to
|
||||
# avoid unknown state
|
||||
quorum = 1
|
||||
class GnocchiThresholdEvaluator(threshold.ThresholdEvaluator):
|
||||
|
||||
def __init__(self, notifier):
|
||||
super(GnocchiThresholdEvaluator, self).__init__(notifier)
|
||||
super(threshold.ThresholdEvaluator, self).__init__(notifier)
|
||||
self.gnocchi_url = cfg.CONF.alarms.gnocchi_url
|
||||
self._ks_client = None
|
||||
|
||||
@ -115,92 +93,8 @@ class GnocchiThresholdEvaluator(evaluator.Evaluator):
|
||||
else:
|
||||
return jsonutils.loads(r.text)
|
||||
|
||||
@classmethod
|
||||
def _bound_duration(cls, alarm):
|
||||
"""Bound the duration of the statistics query."""
|
||||
now = timeutils.utcnow()
|
||||
# when exclusion of weak datapoints is enabled, we extend
|
||||
# the look-back period so as to allow a clearer sample count
|
||||
# trend to be established
|
||||
window = (alarm.rule['granularity'] *
|
||||
(alarm.rule['evaluation_periods'] + cls.look_back))
|
||||
start = now - datetime.timedelta(seconds=window)
|
||||
LOG.debug(_('query stats from %(start)s to '
|
||||
'%(now)s') % {'start': start, 'now': now})
|
||||
return start.isoformat(), now.isoformat()
|
||||
|
||||
def _sufficient(self, alarm, statistics):
|
||||
"""Check for the sufficiency of the data for evaluation.
|
||||
|
||||
Ensure there is sufficient data for evaluation, transitioning to
|
||||
unknown otherwise.
|
||||
"""
|
||||
sufficient = len(statistics) >= self.quorum
|
||||
if not sufficient and alarm.state != evaluator.UNKNOWN:
|
||||
reason = _('%d datapoints are unknown') % alarm.rule[
|
||||
'evaluation_periods']
|
||||
reason_data = self._reason_data('unknown',
|
||||
alarm.rule['evaluation_periods'],
|
||||
None)
|
||||
self._refresh(alarm, evaluator.UNKNOWN, reason, reason_data)
|
||||
return sufficient
|
||||
|
||||
@staticmethod
|
||||
def _reason_data(disposition, count, most_recent):
|
||||
"""Create a reason data dictionary for this evaluator type."""
|
||||
return {'type': 'threshold', 'disposition': disposition,
|
||||
'count': count, 'most_recent': most_recent}
|
||||
|
||||
@classmethod
|
||||
def _reason(cls, alarm, statistics, distilled, state):
|
||||
"""Fabricate reason string."""
|
||||
count = len(statistics)
|
||||
disposition = 'inside' if state == evaluator.OK else 'outside'
|
||||
last = statistics[-1]
|
||||
transition = alarm.state != state
|
||||
reason_data = cls._reason_data(disposition, count, last)
|
||||
if transition:
|
||||
return (_('Transition to %(state)s due to %(count)d samples'
|
||||
' %(disposition)s threshold, most recent:'
|
||||
' %(most_recent)s')
|
||||
% dict(reason_data, state=state)), reason_data
|
||||
return (_('Remaining as %(state)s due to %(count)d samples'
|
||||
' %(disposition)s threshold, most recent: %(most_recent)s')
|
||||
% dict(reason_data, state=state)), reason_data
|
||||
|
||||
def _transition(self, alarm, statistics, compared):
|
||||
"""Transition alarm state if necessary.
|
||||
|
||||
The transition rules are currently hardcoded as:
|
||||
|
||||
- transitioning from a known state requires an unequivocal
|
||||
set of datapoints
|
||||
|
||||
- transitioning from unknown is on the basis of the most
|
||||
recent datapoint if equivocal
|
||||
|
||||
Ultimately this will be policy-driven.
|
||||
"""
|
||||
distilled = all(compared)
|
||||
unequivocal = distilled or not any(compared)
|
||||
unknown = alarm.state == evaluator.UNKNOWN
|
||||
continuous = alarm.repeat_actions
|
||||
|
||||
if unequivocal:
|
||||
state = evaluator.ALARM if distilled else evaluator.OK
|
||||
reason, reason_data = self._reason(alarm, statistics,
|
||||
distilled, state)
|
||||
if alarm.state != state or continuous:
|
||||
self._refresh(alarm, state, reason, reason_data)
|
||||
elif unknown or continuous:
|
||||
trending_state = evaluator.ALARM if compared[-1] else evaluator.OK
|
||||
state = trending_state if unknown else alarm.state
|
||||
reason, reason_data = self._reason(alarm, statistics,
|
||||
distilled, state)
|
||||
self._refresh(alarm, state, reason, reason_data)
|
||||
|
||||
@staticmethod
|
||||
def _select_best_granularity(alarm, statistics):
|
||||
def _sanitize(alarm, statistics):
|
||||
"""Return the datapoints that correspond to the alarm granularity"""
|
||||
# TODO(sileht): if there's no direct match, but there is an archive
|
||||
# policy with granularity that's an even divisor or the period,
|
||||
@ -208,26 +102,3 @@ class GnocchiThresholdEvaluator(evaluator.Evaluator):
|
||||
# but not a stddev-of-stddevs).
|
||||
return [stats[2] for stats in statistics
|
||||
if stats[1] == alarm.rule['granularity']]
|
||||
|
||||
def evaluate(self, alarm):
|
||||
if not self.within_time_constraint(alarm):
|
||||
LOG.debug(_('Attempted to evaluate alarm %s, but it is not '
|
||||
'within its time constraint.') % alarm.alarm_id)
|
||||
return
|
||||
|
||||
start, end = self._bound_duration(alarm)
|
||||
statistics = self._statistics(alarm, start, end)
|
||||
statistics = self._select_best_granularity(alarm, statistics)
|
||||
|
||||
if self._sufficient(alarm, statistics):
|
||||
def _compare(value):
|
||||
op = COMPARATORS[alarm.rule['comparison_operator']]
|
||||
limit = alarm.rule['threshold']
|
||||
LOG.debug(_('comparing value %(value)s against threshold'
|
||||
' %(limit)s') %
|
||||
{'value': value, 'limit': limit})
|
||||
return op(value, limit)
|
||||
|
||||
self._transition(alarm,
|
||||
statistics,
|
||||
list(six.moves.map(_compare, statistics)))
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import datetime
|
||||
import operator
|
||||
import six
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
@ -42,7 +43,7 @@ class ThresholdEvaluator(evaluator.Evaluator):
|
||||
look_back = 1
|
||||
|
||||
@classmethod
|
||||
def _bound_duration(cls, alarm, constraints):
|
||||
def _bound_duration(cls, alarm):
|
||||
"""Bound the duration of the statistics query."""
|
||||
now = timeutils.utcnow()
|
||||
# when exclusion of weak datapoints is enabled, we extend
|
||||
@ -50,15 +51,12 @@ class ThresholdEvaluator(evaluator.Evaluator):
|
||||
# trend to be established
|
||||
look_back = (cls.look_back if not alarm.rule.get('exclude_outliers')
|
||||
else alarm.rule['evaluation_periods'])
|
||||
window = (alarm.rule['period'] *
|
||||
(alarm.rule['evaluation_periods'] + look_back))
|
||||
window = ((alarm.rule.get('period', None) or alarm.rule['granularity'])
|
||||
* (alarm.rule['evaluation_periods'] + look_back))
|
||||
start = now - datetime.timedelta(seconds=window)
|
||||
LOG.debug(_('query stats from %(start)s to '
|
||||
'%(now)s') % {'start': start, 'now': now})
|
||||
after = dict(field='timestamp', op='ge', value=start.isoformat())
|
||||
before = dict(field='timestamp', op='le', value=now.isoformat())
|
||||
constraints.extend([before, after])
|
||||
return constraints
|
||||
return start.isoformat(), now.isoformat()
|
||||
|
||||
@staticmethod
|
||||
def _sanitize(alarm, statistics):
|
||||
@ -81,11 +79,17 @@ class ThresholdEvaluator(evaluator.Evaluator):
|
||||
# in practice statistics are always sorted by period start, not
|
||||
# strictly required by the API though
|
||||
statistics = statistics[-alarm.rule['evaluation_periods']:]
|
||||
result_statistics = [getattr(stat, alarm.rule['statistic'])
|
||||
for stat in statistics]
|
||||
LOG.debug(_('pruned statistics to %d') % len(statistics))
|
||||
return statistics
|
||||
return result_statistics
|
||||
|
||||
def _statistics(self, alarm, query):
|
||||
def _statistics(self, alarm, start, end):
|
||||
"""Retrieve statistics over the current window."""
|
||||
after = dict(field='timestamp', op='ge', value=start)
|
||||
before = dict(field='timestamp', op='le', value=end)
|
||||
query = alarm.rule['query']
|
||||
query.extend([before, after])
|
||||
LOG.debug(_('stats query %s') % query)
|
||||
try:
|
||||
return self._client.statistics.list(
|
||||
@ -111,8 +115,7 @@ class ThresholdEvaluator(evaluator.Evaluator):
|
||||
# consistent since thirdparty software may depend on old format.
|
||||
reason = _('%d datapoints are unknown') % alarm.rule[
|
||||
'evaluation_periods']
|
||||
last = None if not statistics else (
|
||||
getattr(statistics[-1], alarm.rule['statistic']))
|
||||
last = None if not statistics else statistics[-1]
|
||||
reason_data = self._reason_data('unknown',
|
||||
alarm.rule['evaluation_periods'],
|
||||
last)
|
||||
@ -130,7 +133,7 @@ class ThresholdEvaluator(evaluator.Evaluator):
|
||||
"""Fabricate reason string."""
|
||||
count = len(statistics)
|
||||
disposition = 'inside' if state == evaluator.OK else 'outside'
|
||||
last = getattr(statistics[-1], alarm.rule['statistic'])
|
||||
last = statistics[-1]
|
||||
transition = alarm.state != state
|
||||
reason_data = cls._reason_data(disposition, count, last)
|
||||
if transition:
|
||||
@ -179,20 +182,13 @@ class ThresholdEvaluator(evaluator.Evaluator):
|
||||
'within its time constraint.') % alarm.alarm_id)
|
||||
return
|
||||
|
||||
query = self._bound_duration(
|
||||
alarm,
|
||||
alarm.rule['query']
|
||||
)
|
||||
|
||||
statistics = self._sanitize(
|
||||
alarm,
|
||||
self._statistics(alarm, query)
|
||||
)
|
||||
start, end = self._bound_duration(alarm)
|
||||
statistics = self._statistics(alarm, start, end)
|
||||
statistics = self._sanitize(alarm, statistics)
|
||||
|
||||
if self._sufficient(alarm, statistics):
|
||||
def _compare(stat):
|
||||
def _compare(value):
|
||||
op = COMPARATORS[alarm.rule['comparison_operator']]
|
||||
value = getattr(stat, alarm.rule['statistic'])
|
||||
limit = alarm.rule['threshold']
|
||||
LOG.debug(_('comparing value %(value)s against threshold'
|
||||
' %(limit)s') %
|
||||
@ -201,4 +197,4 @@ class ThresholdEvaluator(evaluator.Evaluator):
|
||||
|
||||
self._transition(alarm,
|
||||
statistics,
|
||||
[_compare(statistic) for statistic in statistics])
|
||||
list(six.moves.map(_compare, statistics)))
|
||||
|
@ -158,7 +158,7 @@ class TestGnocchiThresholdEvaluate(base.TestEvaluatorBase):
|
||||
means = self._get_stats(60, [self.alarms[0].rule['threshold'] - v
|
||||
for v in moves.xrange(5)])
|
||||
maxs = self._get_stats(300, [self.alarms[1].rule['threshold'] + v
|
||||
for v in moves.xrange(1, 4)])
|
||||
for v in moves.xrange(4)])
|
||||
avgs2 = self._get_stats(50, [self.alarms[2].rule['threshold'] - v
|
||||
for v in moves.xrange(6)])
|
||||
self.requests.get.side_effect = [Exception('boom'),
|
||||
|
@ -350,15 +350,9 @@ class TestEvaluate(base.TestEvaluatorBase):
|
||||
alarm.rule['exclude_outliers'] = exclude_outliers
|
||||
with mock.patch.object(timeutils, 'utcnow') as mock_utcnow:
|
||||
mock_utcnow.return_value = datetime.datetime(2012, 7, 2, 10, 45)
|
||||
constraint = self.evaluator._bound_duration(alarm, [])
|
||||
self.assertEqual([
|
||||
{'field': 'timestamp',
|
||||
'op': 'le',
|
||||
'value': timeutils.utcnow().isoformat()},
|
||||
{'field': 'timestamp',
|
||||
'op': 'ge',
|
||||
'value': start},
|
||||
], constraint)
|
||||
constraint = self.evaluator._bound_duration(alarm)
|
||||
self.assertEqual((start, timeutils.utcnow().isoformat()),
|
||||
constraint)
|
||||
|
||||
def test_bound_duration_outlier_exclusion_defaulted(self):
|
||||
self._do_test_bound_duration('2012-07-02T10:39:00')
|
||||
|
Loading…
x
Reference in New Issue
Block a user