remove deprecated partitioned alarm service
in Juno we deprecated the non-tooz based partitioned alarm service. this patch removes this code so partitioning is handled solely by tooz. DocImpact Change-Id: Ie1f1f6593e058167fd1aa1587cfe19f2b867d5e1
This commit is contained in:
parent
6c70765a4f
commit
a8fe0dcbe7
@ -1,309 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Red Hat, Inc
|
||||
#
|
||||
# Authors: Eoghan Glynn <eglynn@redhat.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import math
|
||||
import random
|
||||
import uuid
|
||||
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from ceilometer.alarm import rpc as rpc_alarm
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer.openstack.common import log
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class PartitionIdentity(object):
|
||||
"""Representation of a partition's identity for age comparison."""
|
||||
|
||||
def __init__(self, uuid, priority):
|
||||
self.uuid = uuid
|
||||
self.priority = priority
|
||||
|
||||
def __repr__(self):
|
||||
return '%s:%s' % (self.uuid, self.priority)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.uuid, self.priority))
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, PartitionIdentity):
|
||||
return False
|
||||
return self.priority == other.priority and self.uuid == other.uuid
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __lt__(self, other):
|
||||
if not other:
|
||||
return True
|
||||
if not isinstance(other, PartitionIdentity):
|
||||
return False
|
||||
older = self.priority < other.priority
|
||||
tie_broken = (self.priority == other.priority and
|
||||
self.uuid < other.uuid)
|
||||
return older or tie_broken
|
||||
|
||||
def __gt__(self, other):
|
||||
return not (self.__lt__(other) or self.__eq__(other))
|
||||
|
||||
|
||||
class PartitionCoordinator(object):
|
||||
"""Implements the alarm partition coordination protocol.
|
||||
|
||||
A simple protocol based on AMQP fanout RPC is used.
|
||||
|
||||
All available partitions report their presence periodically.
|
||||
|
||||
The priority of each partition in terms of assuming mastership
|
||||
is determined by earliest start-time (with a UUID-based tiebreaker
|
||||
in the unlikely event of a time clash).
|
||||
|
||||
A single partition assumes mastership at any given time, taking
|
||||
responsibility for allocating the alarms to be evaluated across
|
||||
the set of currently available partitions.
|
||||
|
||||
When a partition lifecycle event is detected (i.e. a pre-existing
|
||||
partition fails to report its presence, or a new one is started
|
||||
up), a complete rebalance of the alarms is initiated.
|
||||
|
||||
Individual alarm lifecycle events, on the other hand, do not
|
||||
require a full re-balance. Instead new alarms are allocated as
|
||||
they are detected, whereas deleted alarms are initially allowed to
|
||||
remain within the allocation (as the individual evaluators are tolerant
|
||||
of assigned alarms not existing, and the deleted alarms should be
|
||||
randomly distributed over the partitions). However once the number of
|
||||
alarms deleted since the last rebalance reaches a certain limit, a
|
||||
rebalance will be initiated to maintain equity.
|
||||
|
||||
As presence reports are received, each partition keeps track of the
|
||||
oldest partition it currently knows about, allowing an assumption of
|
||||
mastership to be aborted if an older partition belatedly reports.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# uniqueness is based on a combination of starting timestamp
|
||||
# and UUID
|
||||
self.start = timeutils.utcnow()
|
||||
self.this = PartitionIdentity(str(uuid.uuid4()),
|
||||
float(self.start.strftime('%s.%f')))
|
||||
self.oldest = None
|
||||
|
||||
# fan-out RPC
|
||||
self.coordination_rpc = rpc_alarm.RPCAlarmPartitionCoordination()
|
||||
|
||||
# state maintained by the master
|
||||
self.is_master = False
|
||||
self.presence_changed = False
|
||||
self.reports = {}
|
||||
self.last_alarms = set()
|
||||
self.deleted_alarms = set()
|
||||
|
||||
# alarms for evaluation, relevant to all partitions regardless
|
||||
# of role
|
||||
self.assignment = []
|
||||
|
||||
def _distribute(self, alarms, rebalance):
|
||||
"""Distribute alarms over known set of evaluators.
|
||||
|
||||
:param alarms: the alarms to distribute
|
||||
:param rebalance: true if this is a full rebalance
|
||||
:return: true if the distribution completed, false if aborted
|
||||
"""
|
||||
verb = 'assign' if rebalance else 'allocate'
|
||||
method = (self.coordination_rpc.assign if rebalance
|
||||
else self.coordination_rpc.allocate)
|
||||
LOG.debug(_('triggering %s') % verb)
|
||||
LOG.debug(_('known evaluators %s') % self.reports)
|
||||
per_evaluator = int(math.ceil(len(alarms) /
|
||||
float(len(self.reports) + 1)))
|
||||
LOG.debug(_('per evaluator allocation %s') % per_evaluator)
|
||||
# for small distributions (e.g. of newly created alarms)
|
||||
# we deliberately skew to non-master evaluators
|
||||
evaluators = self.reports.keys()
|
||||
random.shuffle(evaluators)
|
||||
offset = 0
|
||||
for evaluator in evaluators:
|
||||
# TODO(eglynn): use pagination in the alarms API to chunk large
|
||||
# large allocations
|
||||
if self.oldest < self.this:
|
||||
LOG.warn(_('%(this)s bailing on distribution cycle '
|
||||
'as older partition detected: %(older)s') %
|
||||
dict(this=self.this, older=self.oldest))
|
||||
return False
|
||||
allocation = alarms[offset:offset + per_evaluator]
|
||||
if allocation:
|
||||
LOG.debug(_('%(verb)s-ing %(alloc)s to %(eval)s') %
|
||||
dict(verb=verb, alloc=allocation, eval=evaluator))
|
||||
method(evaluator.uuid, allocation)
|
||||
offset += per_evaluator
|
||||
LOG.debug(_('master taking %s for self') % alarms[offset:])
|
||||
if rebalance:
|
||||
self.assignment = alarms[offset:]
|
||||
else:
|
||||
self.assignment.extend(alarms[offset:])
|
||||
return True
|
||||
|
||||
def _deletion_requires_rebalance(self, alarms):
|
||||
"""Track the level of deletion activity since the last full rebalance.
|
||||
|
||||
We delay rebalancing until a certain threshold of deletion activity
|
||||
has occurred.
|
||||
|
||||
:param alarms: current set of alarms
|
||||
:return: True if the level of alarm deletion since the last rebalance
|
||||
is sufficient so as to require a full rebalance
|
||||
"""
|
||||
deleted_alarms = self.last_alarms - set(alarms)
|
||||
LOG.debug(_('newly deleted alarms %s') % deleted_alarms)
|
||||
self.deleted_alarms.update(deleted_alarms)
|
||||
if len(self.deleted_alarms) > len(alarms) / 5:
|
||||
LOG.debug(_('alarm deletion activity requires rebalance'))
|
||||
self.deleted_alarms = set()
|
||||
return True
|
||||
return False
|
||||
|
||||
def _record_oldest(self, partition, stale=False):
|
||||
"""Check if reported partition is the oldest we know about.
|
||||
|
||||
:param partition: reported partition
|
||||
:param stale: true if reported partition detected as stale.
|
||||
"""
|
||||
if stale and self.oldest == partition:
|
||||
# current oldest partition detected as stale
|
||||
self.oldest = None
|
||||
elif not self.oldest:
|
||||
# no known oldest partition
|
||||
self.oldest = partition
|
||||
elif partition < self.oldest:
|
||||
# new oldest
|
||||
self.oldest = partition
|
||||
|
||||
def _is_master(self, interval):
|
||||
"""Determine if the current partition is the master."""
|
||||
now = timeutils.utcnow()
|
||||
if timeutils.delta_seconds(self.start, now) < interval * 2:
|
||||
LOG.debug(_('%s still warming up') % self.this)
|
||||
return False
|
||||
is_master = True
|
||||
for partition, last_heard in self.reports.items():
|
||||
delta = timeutils.delta_seconds(last_heard, now)
|
||||
LOG.debug(_('last heard from %(report)s %(delta)s seconds ago') %
|
||||
dict(report=partition, delta=delta))
|
||||
if delta > interval * 2:
|
||||
del self.reports[partition]
|
||||
self._record_oldest(partition, stale=True)
|
||||
LOG.debug(_('%(this)s detects stale evaluator: %(stale)s') %
|
||||
dict(this=self.this, stale=partition))
|
||||
self.presence_changed = True
|
||||
elif partition < self.this:
|
||||
is_master = False
|
||||
LOG.info(_('%(this)s sees older potential master: %(older)s')
|
||||
% dict(this=self.this, older=partition))
|
||||
LOG.info(_('%(this)s is master?: %(is_master)s') %
|
||||
dict(this=self.this, is_master=is_master))
|
||||
return is_master
|
||||
|
||||
def _master_role(self, assuming, api_client):
|
||||
"""Carry out the master role, initiating a distribution if required.
|
||||
|
||||
:param assuming: true if newly assumed mastership
|
||||
:param api_client: the API client to use for alarms.
|
||||
:return: True if not overtaken by an older partition
|
||||
"""
|
||||
alarms = [a.alarm_id for a in api_client.alarms.list()]
|
||||
created_alarms = list(set(alarms) - self.last_alarms)
|
||||
LOG.debug(_('newly created alarms %s') % created_alarms)
|
||||
sufficient_deletion = self._deletion_requires_rebalance(alarms)
|
||||
if assuming or sufficient_deletion or self.presence_changed:
|
||||
still_ahead = self._distribute(alarms, rebalance=True)
|
||||
elif created_alarms:
|
||||
still_ahead = self._distribute(list(created_alarms),
|
||||
rebalance=False)
|
||||
else:
|
||||
# nothing to distribute, but check anyway if overtaken
|
||||
still_ahead = self.this < self.oldest
|
||||
self.last_alarms = set(alarms)
|
||||
LOG.info(_('%(this)s not overtaken as master? %(still_ahead)s') %
|
||||
({'this': self.this, 'still_ahead': still_ahead}))
|
||||
return still_ahead
|
||||
|
||||
def check_mastership(self, eval_interval, api_client):
|
||||
"""Periodically check if the mastership role should be assumed.
|
||||
|
||||
:param eval_interval: the alarm evaluation interval
|
||||
:param api_client: the API client to use for alarms.
|
||||
"""
|
||||
LOG.debug(_('%s checking mastership status') % self.this)
|
||||
try:
|
||||
assuming = not self.is_master
|
||||
self.is_master = (self._is_master(eval_interval) and
|
||||
self._master_role(assuming, api_client))
|
||||
self.presence_changed = False
|
||||
except Exception:
|
||||
LOG.exception(_('mastership check failed'))
|
||||
|
||||
def presence(self, uuid, priority):
|
||||
"""Accept an incoming report of presence."""
|
||||
report = PartitionIdentity(uuid, priority)
|
||||
if report != self.this:
|
||||
if report not in self.reports:
|
||||
self.presence_changed = True
|
||||
self._record_oldest(report)
|
||||
self.reports[report] = timeutils.utcnow()
|
||||
LOG.debug(_('%(this)s knows about %(reports)s') %
|
||||
dict(this=self.this, reports=self.reports))
|
||||
|
||||
def assign(self, uuid, alarms):
|
||||
"""Accept an incoming alarm assignment."""
|
||||
if uuid == self.this.uuid:
|
||||
LOG.debug(_('%(this)s got assignment: %(alarms)s') %
|
||||
dict(this=self.this, alarms=alarms))
|
||||
self.assignment = alarms
|
||||
|
||||
def allocate(self, uuid, alarms):
|
||||
"""Accept an incoming alarm allocation."""
|
||||
if uuid == self.this.uuid:
|
||||
LOG.debug(_('%(this)s got allocation: %(alarms)s') %
|
||||
dict(this=self.this, alarms=alarms))
|
||||
self.assignment.extend(alarms)
|
||||
|
||||
def report_presence(self):
|
||||
"""Report the presence of the current partition."""
|
||||
LOG.debug(_('%s reporting presence') % self.this)
|
||||
try:
|
||||
self.coordination_rpc.presence(self.this.uuid, self.this.priority)
|
||||
except Exception:
|
||||
LOG.exception(_('presence reporting failed'))
|
||||
|
||||
def assigned_alarms(self, api_client):
|
||||
"""Return the alarms assigned to the current partition."""
|
||||
if not self.assignment:
|
||||
LOG.debug(_('%s has no assigned alarms to evaluate') % self.this)
|
||||
return []
|
||||
|
||||
try:
|
||||
LOG.debug(_('%(this)s alarms for evaluation: %(alarms)s') %
|
||||
dict(this=self.this, alarms=self.assignment))
|
||||
return [a for a in api_client.alarms.list(q=[{'field': 'enabled',
|
||||
'value': True}])
|
||||
if a.alarm_id in self.assignment]
|
||||
except Exception:
|
||||
LOG.exception(_('assignment retrieval failed'))
|
||||
return []
|
@ -29,13 +29,6 @@ OPTS = [
|
||||
default='alarm_notifier',
|
||||
help='The topic that ceilometer uses for alarm notifier '
|
||||
'messages.'),
|
||||
cfg.StrOpt('partition_rpc_topic',
|
||||
default='alarm_partition_coordination',
|
||||
help='The topic that ceilometer uses for alarm partition '
|
||||
'coordination messages. DEPRECATED: RPC-based partitioned'
|
||||
'alarm evaluation service will be removed in Kilo in '
|
||||
'favour of the default alarm evaluation service using '
|
||||
'tooz for partitioning.'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OPTS, group='alarm')
|
||||
@ -70,29 +63,3 @@ class RPCAlarmNotifier(object):
|
||||
'current': alarm.state,
|
||||
'reason': six.text_type(reason),
|
||||
'reason_data': reason_data})
|
||||
|
||||
|
||||
class RPCAlarmPartitionCoordination(object):
|
||||
def __init__(self):
|
||||
transport = messaging.get_transport()
|
||||
self.client = messaging.get_rpc_client(
|
||||
transport, topic=cfg.CONF.alarm.partition_rpc_topic,
|
||||
version="1.0")
|
||||
|
||||
def presence(self, uuid, priority):
|
||||
cctxt = self.client.prepare(fanout=True)
|
||||
return cctxt.cast(context.get_admin_context(),
|
||||
'presence', data={'uuid': uuid,
|
||||
'priority': priority})
|
||||
|
||||
def assign(self, uuid, alarms):
|
||||
cctxt = self.client.prepare(fanout=True)
|
||||
return cctxt.cast(context.get_admin_context(),
|
||||
'assign', data={'uuid': uuid,
|
||||
'alarms': alarms})
|
||||
|
||||
def allocate(self, uuid, alarms):
|
||||
cctxt = self.client.prepare(fanout=True)
|
||||
return cctxt.cast(context.get_admin_context(),
|
||||
'allocate', data={'uuid': uuid,
|
||||
'alarms': alarms})
|
||||
|
@ -26,7 +26,6 @@ import six
|
||||
from stevedore import extension
|
||||
|
||||
from ceilometer import alarm as ceilometer_alarm
|
||||
from ceilometer.alarm.partition import coordination as alarm_coordination
|
||||
from ceilometer.alarm import rpc as rpc_alarm
|
||||
from ceilometer import coordination as coordination
|
||||
from ceilometer.i18n import _
|
||||
@ -153,78 +152,6 @@ class AlarmEvaluationService(AlarmService, os_service.Service):
|
||||
self.PARTITIONING_GROUP_NAME, all_alarms)
|
||||
|
||||
|
||||
class SingletonAlarmService(AlarmService, os_service.Service):
|
||||
|
||||
def __init__(self):
|
||||
super(SingletonAlarmService, self).__init__()
|
||||
|
||||
def start(self):
|
||||
super(SingletonAlarmService, self).start()
|
||||
if self.evaluators:
|
||||
interval = cfg.CONF.alarm.evaluation_interval
|
||||
self.tg.add_timer(
|
||||
interval,
|
||||
self._evaluate_assigned_alarms,
|
||||
0)
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
|
||||
def _assigned_alarms(self):
|
||||
return self._client.alarms.list(q=[{'field': 'enabled',
|
||||
'value': True}])
|
||||
|
||||
|
||||
class PartitionedAlarmService(AlarmService, os_service.Service):
|
||||
|
||||
def __init__(self):
|
||||
super(PartitionedAlarmService, self).__init__()
|
||||
transport = messaging.get_transport()
|
||||
self.rpc_server = messaging.get_rpc_server(
|
||||
transport, cfg.CONF.alarm.partition_rpc_topic, self)
|
||||
|
||||
self.partition_coordinator = alarm_coordination.PartitionCoordinator()
|
||||
|
||||
def start(self):
|
||||
super(PartitionedAlarmService, self).start()
|
||||
if self.evaluators:
|
||||
eval_interval = cfg.CONF.alarm.evaluation_interval
|
||||
self.tg.add_timer(
|
||||
eval_interval / 4,
|
||||
self.partition_coordinator.report_presence,
|
||||
0)
|
||||
self.tg.add_timer(
|
||||
eval_interval / 2,
|
||||
self.partition_coordinator.check_mastership,
|
||||
eval_interval,
|
||||
*[eval_interval, self._client])
|
||||
self.tg.add_timer(
|
||||
eval_interval,
|
||||
self._evaluate_assigned_alarms,
|
||||
eval_interval)
|
||||
self.rpc_server.start()
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
|
||||
def stop(self):
|
||||
self.rpc_server.stop()
|
||||
super(PartitionedAlarmService, self).stop()
|
||||
|
||||
def _assigned_alarms(self):
|
||||
return self.partition_coordinator.assigned_alarms(self._client)
|
||||
|
||||
def presence(self, context, data):
|
||||
self.partition_coordinator.presence(data.get('uuid'),
|
||||
data.get('priority'))
|
||||
|
||||
def assign(self, context, data):
|
||||
self.partition_coordinator.assign(data.get('uuid'),
|
||||
data.get('alarms'))
|
||||
|
||||
def allocate(self, context, data):
|
||||
self.partition_coordinator.allocate(data.get('uuid'),
|
||||
data.get('alarms'))
|
||||
|
||||
|
||||
class AlarmNotifierService(os_service.Service):
|
||||
|
||||
def __init__(self):
|
||||
|
@ -14,29 +14,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from stevedore import driver
|
||||
|
||||
from ceilometer.alarm import service as alarm_service
|
||||
from ceilometer.openstack.common import log
|
||||
from ceilometer.openstack.common import service as os_service
|
||||
from ceilometer import service
|
||||
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('evaluation_service', default='default',
|
||||
help='Driver to use for alarm evaluation service. DEPRECATED: '
|
||||
'"singleton" and "partitioned" alarm evaluator '
|
||||
'services will be removed in Kilo in favour of the '
|
||||
'default alarm evaluation service using tooz for '
|
||||
'partitioning.'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(OPTS, group='alarm')
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def notifier():
|
||||
service.prepare_service()
|
||||
os_service.launch(alarm_service.AlarmNotifierService()).wait()
|
||||
@ -44,10 +26,4 @@ def notifier():
|
||||
|
||||
def evaluator():
|
||||
service.prepare_service()
|
||||
eval_service_mgr = driver.DriverManager(
|
||||
"ceilometer.alarm.evaluator_service",
|
||||
cfg.CONF.alarm.evaluation_service,
|
||||
invoke_on_load=True)
|
||||
LOG.debug("Alarm evaluator loaded: %s" %
|
||||
eval_service_mgr.driver.__class__.__name__)
|
||||
os_service.launch(eval_service_mgr.driver).wait()
|
||||
os_service.launch(alarm_service.AlarmEvaluationService()).wait()
|
||||
|
@ -1,459 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer/alarm/partition/coordination.py
|
||||
"""
|
||||
import datetime
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
from oslo_utils import timeutils
|
||||
from six import moves
|
||||
|
||||
from ceilometer.alarm.partition import coordination
|
||||
from ceilometer.alarm.storage import models
|
||||
from ceilometer.tests import base as tests_base
|
||||
from ceilometer.tests import constants
|
||||
|
||||
|
||||
class MockLoggingHandler(logging.Handler):
|
||||
"""Mock logging handler to check for expected logs."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.reset()
|
||||
logging.Handler.__init__(self, *args, **kwargs)
|
||||
|
||||
def emit(self, record):
|
||||
self.messages[record.levelname.lower()].append(record.getMessage())
|
||||
|
||||
def reset(self):
|
||||
self.messages = {'debug': [],
|
||||
'info': [],
|
||||
'warning': [],
|
||||
'error': [],
|
||||
'critical': []}
|
||||
|
||||
|
||||
class TestCoordinate(tests_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestCoordinate, self).setUp()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.setup_messaging(self.CONF)
|
||||
|
||||
self.test_interval = 120
|
||||
self.CONF.import_opt('evaluation_interval',
|
||||
'ceilometer.alarm.service',
|
||||
group='alarm')
|
||||
self.CONF.set_override('evaluation_interval',
|
||||
self.test_interval,
|
||||
group='alarm')
|
||||
self.api_client = mock.Mock()
|
||||
self.override_start = datetime.datetime(2012, 7, 2, 10, 45)
|
||||
patcher = mock.patch.object(timeutils, 'utcnow')
|
||||
self.addCleanup(patcher.stop)
|
||||
self.mock_utcnow = patcher.start()
|
||||
self.mock_utcnow.return_value = self.override_start
|
||||
self.partition_coordinator = coordination.PartitionCoordinator()
|
||||
self.partition_coordinator.coordination_rpc = mock.Mock()
|
||||
# add extra logger to check exception conditions and logged content
|
||||
self.str_handler = MockLoggingHandler()
|
||||
coordination.LOG.logger.addHandler(self.str_handler)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestCoordinate, self).tearDown()
|
||||
# clean up the logger
|
||||
coordination.LOG.logger.removeHandler(self.str_handler)
|
||||
self.str_handler.close()
|
||||
|
||||
def _no_alarms(self):
|
||||
self.api_client.alarms.list.return_value = []
|
||||
|
||||
def _some_alarms(self, count):
|
||||
alarm_ids = [str(uuid.uuid4()) for _ in moves.xrange(count)]
|
||||
alarms = [self._make_alarm(aid) for aid in alarm_ids]
|
||||
self.api_client.alarms.list.return_value = alarms
|
||||
return alarm_ids
|
||||
|
||||
def _current_alarms(self):
|
||||
return self.api_client.alarms.list.return_value
|
||||
|
||||
def _dump_alarms(self, shave):
|
||||
alarms = self.api_client.alarms.list.return_value
|
||||
alarms = alarms[:shave]
|
||||
alarm_ids = [a.alarm_id for a in alarms]
|
||||
self.api_client.alarms.list.return_value = alarms
|
||||
return alarm_ids
|
||||
|
||||
def _add_alarms(self, boost):
|
||||
new_alarm_ids = [str(uuid.uuid4()) for _ in moves.xrange(boost)]
|
||||
alarms = self.api_client.alarms.list.return_value
|
||||
for aid in new_alarm_ids:
|
||||
alarms.append(self._make_alarm(aid))
|
||||
self.api_client.alarms.list.return_value = alarms
|
||||
return new_alarm_ids
|
||||
|
||||
@staticmethod
|
||||
def _make_alarm(uuid):
|
||||
return models.Alarm(name='instance_running_hot',
|
||||
type='threshold',
|
||||
user_id='foobar',
|
||||
project_id='snafu',
|
||||
enabled=True,
|
||||
description='',
|
||||
repeat_actions=False,
|
||||
state='insufficient data',
|
||||
state_timestamp=constants.MIN_DATETIME,
|
||||
timestamp=constants.MIN_DATETIME,
|
||||
ok_actions=[],
|
||||
alarm_actions=[],
|
||||
insufficient_data_actions=[],
|
||||
alarm_id=uuid,
|
||||
severity='critical',
|
||||
time_constraints=[],
|
||||
rule=dict(
|
||||
statistic='avg',
|
||||
comparison_operator='gt',
|
||||
threshold=80.0,
|
||||
evaluation_periods=5,
|
||||
period=60,
|
||||
query=[],
|
||||
))
|
||||
|
||||
def _advance_time(self, factor):
|
||||
delta = datetime.timedelta(seconds=self.test_interval * factor)
|
||||
self.mock_utcnow.return_value = timeutils.utcnow() + delta
|
||||
|
||||
def _younger_by(self, offset):
|
||||
return self.partition_coordinator.this.priority + offset
|
||||
|
||||
def _older_by(self, offset):
|
||||
return self.partition_coordinator.this.priority - offset
|
||||
|
||||
def _check_mastership(self, expected):
|
||||
self.partition_coordinator.check_mastership(self.test_interval,
|
||||
self.api_client)
|
||||
self.assertEqual(expected, self.partition_coordinator.is_master)
|
||||
|
||||
def _new_partition(self, offset):
|
||||
younger = self._younger_by(offset)
|
||||
pid = uuid.uuid4()
|
||||
self.partition_coordinator.presence(pid, younger)
|
||||
return pid, younger
|
||||
|
||||
def _check_assignments(self, others, alarm_ids, per_worker,
|
||||
expect_uneffected=None):
|
||||
rpc = self.partition_coordinator.coordination_rpc
|
||||
calls = rpc.assign.call_args_list
|
||||
return self._check_distribution(others, alarm_ids, per_worker, calls,
|
||||
expect_uneffected or [])
|
||||
|
||||
def _check_allocation(self, others, alarm_ids, per_worker):
|
||||
rpc = self.partition_coordinator.coordination_rpc
|
||||
calls = rpc.allocate.call_args_list
|
||||
return self._check_distribution(others, alarm_ids, per_worker, calls)
|
||||
|
||||
def _check_distribution(self, others, alarm_ids, per_worker, calls,
|
||||
expect_uneffected=None):
|
||||
expect_uneffected = expect_uneffected or []
|
||||
uneffected = [pid for pid, _ in others]
|
||||
uneffected.extend(expect_uneffected)
|
||||
remainder = list(alarm_ids)
|
||||
for call in calls:
|
||||
args, _ = call
|
||||
target, alarms = args
|
||||
self.assertIn(target, uneffected)
|
||||
uneffected.remove(target)
|
||||
self.assertEqual(per_worker, len(alarms))
|
||||
for aid in alarms:
|
||||
self.assertIn(aid, remainder)
|
||||
remainder.remove(aid)
|
||||
self.assertEqual(set(expect_uneffected), set(uneffected))
|
||||
return remainder
|
||||
|
||||
def _forget_assignments(self, expected_assignments):
|
||||
rpc = self.partition_coordinator.coordination_rpc
|
||||
self.assertEqual(expected_assignments, len(rpc.assign.call_args_list))
|
||||
rpc.reset_mock()
|
||||
|
||||
def test_mastership_not_assumed_during_warmup(self):
|
||||
self._no_alarms()
|
||||
|
||||
for _ in moves.xrange(7):
|
||||
# still warming up
|
||||
self._advance_time(0.25)
|
||||
self._check_mastership(False)
|
||||
|
||||
# now warmed up
|
||||
self._advance_time(0.25)
|
||||
self._check_mastership(True)
|
||||
|
||||
def test_uncontested_mastership_assumed(self):
|
||||
self._no_alarms()
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
def test_contested_mastership_assumed(self):
|
||||
self._no_alarms()
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
for offset in moves.xrange(1, 5):
|
||||
younger = self._younger_by(offset)
|
||||
self.partition_coordinator.presence(uuid.uuid4(), younger)
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
def test_bested_mastership_relinquished(self):
|
||||
self._no_alarms()
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
older = self._older_by(1)
|
||||
self.partition_coordinator.presence(uuid.uuid4(), older)
|
||||
|
||||
self._check_mastership(False)
|
||||
|
||||
def _do_test_tie_broken_mastership(self, seed, expect_mastership):
|
||||
self._no_alarms()
|
||||
self.partition_coordinator.this.uuid = uuid.UUID(int=1)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
tied = self.partition_coordinator.this.priority
|
||||
self.partition_coordinator.presence(uuid.UUID(int=seed), tied)
|
||||
|
||||
self._check_mastership(expect_mastership)
|
||||
|
||||
def test_tie_broken_mastership_assumed(self):
|
||||
self._do_test_tie_broken_mastership(2, True)
|
||||
|
||||
def test_tie_broken_mastership_relinquished(self):
|
||||
self._do_test_tie_broken_mastership(0, False)
|
||||
|
||||
def test_fair_distribution(self):
|
||||
alarm_ids = self._some_alarms(49)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
others = [self._new_partition(i) for i in moves.xrange(1, 5)]
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
remainder = self._check_assignments(others, alarm_ids, 10)
|
||||
self.assertEqual(set(self.partition_coordinator.assignment),
|
||||
set(remainder))
|
||||
|
||||
def test_rebalance_on_partition_startup(self):
|
||||
alarm_ids = self._some_alarms(49)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
others = [self._new_partition(i) for i in moves.xrange(1, 5)]
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
self. _forget_assignments(4)
|
||||
|
||||
others.append(self._new_partition(5))
|
||||
self._check_mastership(True)
|
||||
|
||||
remainder = self._check_assignments(others, alarm_ids, 9)
|
||||
self.assertEqual(set(self.partition_coordinator.assignment),
|
||||
set(remainder))
|
||||
|
||||
def test_rebalance_on_partition_staleness(self):
|
||||
alarm_ids = self._some_alarms(49)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
others = [self._new_partition(i) for i in moves.xrange(1, 5)]
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
self. _forget_assignments(4)
|
||||
|
||||
self._advance_time(4)
|
||||
|
||||
stale, _ = others.pop()
|
||||
for pid, younger in others:
|
||||
self.partition_coordinator.presence(pid, younger)
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
remainder = self._check_assignments(others, alarm_ids, 13, [stale])
|
||||
self.assertEqual(set(self.partition_coordinator.assignment),
|
||||
set(remainder))
|
||||
|
||||
def test_rebalance_on_sufficient_deletion(self):
|
||||
alarm_ids = self._some_alarms(49)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
others = [self._new_partition(i) for i in moves.xrange(1, 5)]
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
self._forget_assignments(4)
|
||||
|
||||
alarm_ids = self._dump_alarms(len(alarm_ids) / 2)
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
remainder = self._check_assignments(others, alarm_ids, 5)
|
||||
self.assertEqual(set(self.partition_coordinator.assignment),
|
||||
set(remainder))
|
||||
|
||||
def test_no_rebalance_on_insufficient_deletion(self):
|
||||
alarm_ids = self._some_alarms(49)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
others = [self._new_partition(i) for i in moves.xrange(1, 5)]
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
self._forget_assignments(4)
|
||||
|
||||
alarm_ids = self._dump_alarms(45)
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
expect_uneffected = [pid for pid, _ in others]
|
||||
self._check_assignments(others, alarm_ids, 10, expect_uneffected)
|
||||
|
||||
def test_no_rebalance_on_creation(self):
|
||||
self._some_alarms(49)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
others = [self._new_partition(i) for i in moves.xrange(1, 5)]
|
||||
|
||||
self._check_mastership(True)
|
||||
|
||||
self._forget_assignments(4)
|
||||
|
||||
new_alarm_ids = self._add_alarms(8)
|
||||
|
||||
master_assignment = set(self.partition_coordinator.assignment)
|
||||
self._check_mastership(True)
|
||||
|
||||
remainder = self._check_allocation(others, new_alarm_ids, 2)
|
||||
self.assertEqual(0, len(remainder))
|
||||
self.assertEqual(set(self.partition_coordinator.assignment),
|
||||
master_assignment)
|
||||
|
||||
def test_bail_when_overtaken_in_distribution(self):
|
||||
self._some_alarms(49)
|
||||
|
||||
self._advance_time(3)
|
||||
|
||||
for i in moves.xrange(1, 5):
|
||||
self._new_partition(i)
|
||||
|
||||
def overtake(*args):
|
||||
self._new_partition(-1)
|
||||
|
||||
rpc = self.partition_coordinator.coordination_rpc
|
||||
rpc.assign.side_effect = overtake
|
||||
|
||||
self._check_mastership(False)
|
||||
|
||||
self.assertEqual(1, len(rpc.assign.call_args_list))
|
||||
|
||||
def test_assigned_alarms_no_assignment(self):
|
||||
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
|
||||
self.assertEqual(0, len(alarms))
|
||||
|
||||
def test_assigned_alarms_assignment(self):
|
||||
alarm_ids = self._some_alarms(6)
|
||||
|
||||
uuid = self.partition_coordinator.this.uuid
|
||||
self.partition_coordinator.assign(uuid, alarm_ids)
|
||||
|
||||
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
|
||||
self.assertEqual(self._current_alarms(), alarms)
|
||||
|
||||
def test_assigned_alarms_allocation(self):
|
||||
alarm_ids = self._some_alarms(6)
|
||||
|
||||
uuid = self.partition_coordinator.this.uuid
|
||||
self.partition_coordinator.assign(uuid, alarm_ids)
|
||||
|
||||
new_alarm_ids = self._add_alarms(2)
|
||||
self.partition_coordinator.allocate(uuid, new_alarm_ids)
|
||||
|
||||
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
|
||||
self.assertEqual(self._current_alarms(), alarms)
|
||||
|
||||
def test_assigned_alarms_deleted_assignment(self):
|
||||
alarm_ids = self._some_alarms(6)
|
||||
|
||||
uuid = self.partition_coordinator.this.uuid
|
||||
self.partition_coordinator.assign(uuid, alarm_ids)
|
||||
|
||||
self._dump_alarms(len(alarm_ids) / 2)
|
||||
|
||||
alarms = self.partition_coordinator.assigned_alarms(self.api_client)
|
||||
self.assertEqual(self._current_alarms(), alarms)
|
||||
|
||||
def test__record_oldest(self):
|
||||
# Test when the partition to be recorded is the same as the oldest.
|
||||
self.partition_coordinator._record_oldest(
|
||||
self.partition_coordinator.oldest, True)
|
||||
self.assertIsNone(self.partition_coordinator.oldest)
|
||||
|
||||
def test_check_mastership(self):
|
||||
# Test the method exception condition.
|
||||
self.partition_coordinator._is_master = mock.Mock(
|
||||
side_effect=Exception('Boom!'))
|
||||
self.partition_coordinator.check_mastership(10, None)
|
||||
self.assertIn('mastership check failed',
|
||||
self.str_handler.messages['error'])
|
||||
|
||||
def test_report_presence(self):
|
||||
self.partition_coordinator.coordination_rpc.presence = mock.Mock(
|
||||
side_effect=Exception('Boom!'))
|
||||
self.partition_coordinator.report_presence()
|
||||
self.assertIn('presence reporting failed',
|
||||
self.str_handler.messages['error'])
|
||||
|
||||
def test_assigned_alarms(self):
|
||||
api_client = mock.MagicMock()
|
||||
api_client.alarms.list = mock.Mock(side_effect=Exception('Boom!'))
|
||||
self.partition_coordinator.assignment = ['something']
|
||||
self.partition_coordinator.assigned_alarms(api_client)
|
||||
self.assertIn('assignment retrieval failed',
|
||||
self.str_handler.messages['error'])
|
||||
|
||||
|
||||
class TestPartitionIdentity(tests_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestPartitionIdentity, self).setUp()
|
||||
self.id_1st = coordination.PartitionIdentity(str(uuid.uuid4()), 1)
|
||||
self.id_2nd = coordination.PartitionIdentity(str(uuid.uuid4()), 2)
|
||||
|
||||
def test_identity_ops(self):
|
||||
self.assertNotEqual(self.id_1st, 'Nothing')
|
||||
self.assertNotEqual(self.id_1st, self.id_2nd)
|
||||
self.assertTrue(self.id_1st < None)
|
||||
self.assertFalse(self.id_1st < 'Nothing')
|
||||
self.assertTrue(self.id_2nd > self.id_1st)
|
@ -1,105 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer.alarm.service.PartitionedAlarmService.
|
||||
"""
|
||||
import contextlib
|
||||
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
from stevedore import extension
|
||||
|
||||
from ceilometer.alarm import service
|
||||
from ceilometer.tests import base as tests_base
|
||||
|
||||
|
||||
class TestPartitionedAlarmService(tests_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestPartitionedAlarmService, self).setUp()
|
||||
|
||||
self.threshold_eval = mock.Mock()
|
||||
self.api_client = mock.MagicMock()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
|
||||
self.CONF.set_override('host',
|
||||
'fake_host')
|
||||
self.CONF.set_override('partition_rpc_topic',
|
||||
'fake_topic',
|
||||
group='alarm')
|
||||
self.setup_messaging(self.CONF)
|
||||
|
||||
self.partitioned = service.PartitionedAlarmService()
|
||||
self.partitioned.tg = mock.Mock()
|
||||
self.partitioned.partition_coordinator = mock.Mock()
|
||||
self.extension_mgr = extension.ExtensionManager.make_test_instance(
|
||||
[
|
||||
extension.Extension(
|
||||
'threshold',
|
||||
None,
|
||||
None,
|
||||
self.threshold_eval, ),
|
||||
]
|
||||
)
|
||||
self.partitioned.extension_manager = self.extension_mgr
|
||||
|
||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
||||
def test_lifecycle(self):
|
||||
test_interval = 120
|
||||
self.CONF.set_override('evaluation_interval',
|
||||
test_interval,
|
||||
group='alarm')
|
||||
get_client = 'ceilometerclient.client.get_client'
|
||||
with contextlib.nested(
|
||||
mock.patch(get_client, return_value=self.api_client),
|
||||
mock.patch.object(self.partitioned.rpc_server, 'start')):
|
||||
self.partitioned.start()
|
||||
pc = self.partitioned.partition_coordinator
|
||||
expected = [
|
||||
mock.call(test_interval / 4,
|
||||
pc.report_presence,
|
||||
0),
|
||||
mock.call(test_interval / 2,
|
||||
pc.check_mastership,
|
||||
test_interval,
|
||||
test_interval,
|
||||
self.api_client),
|
||||
mock.call(test_interval,
|
||||
self.partitioned._evaluate_assigned_alarms,
|
||||
test_interval),
|
||||
mock.call(604800, mock.ANY),
|
||||
]
|
||||
actual = self.partitioned.tg.add_timer.call_args_list
|
||||
self.assertEqual(expected, actual)
|
||||
self.partitioned.stop()
|
||||
|
||||
def test_presence_reporting(self):
|
||||
priority = 42
|
||||
self.partitioned.presence(mock.Mock(),
|
||||
dict(uuid='uuid', priority=priority))
|
||||
pc = self.partitioned.partition_coordinator
|
||||
pc.presence.assert_called_once_with('uuid', priority)
|
||||
|
||||
def test_alarm_assignment(self):
|
||||
alarms = [mock.Mock()]
|
||||
self.partitioned.assign(mock.Mock(),
|
||||
dict(uuid='uuid', alarms=alarms))
|
||||
pc = self.partitioned.partition_coordinator
|
||||
pc.assign.assert_called_once_with('uuid', alarms)
|
||||
|
||||
def test_alarm_allocation(self):
|
||||
alarms = [mock.Mock()]
|
||||
self.partitioned.allocate(mock.Mock(),
|
||||
dict(uuid='uuid', alarms=alarms))
|
||||
pc = self.partitioned.partition_coordinator
|
||||
pc.allocate.assert_called_once_with('uuid', alarms)
|
@ -18,9 +18,7 @@
|
||||
import uuid
|
||||
|
||||
from ceilometerclient.v2 import alarms
|
||||
import eventlet
|
||||
from oslo_config import fixture as fixture_config
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from ceilometer.alarm import rpc as rpc_alarm
|
||||
@ -170,78 +168,3 @@ class FakeCoordinator(object):
|
||||
def _record(self, method, data):
|
||||
self.notified.append((method, data))
|
||||
self.rpc.stop()
|
||||
|
||||
|
||||
class TestRPCAlarmPartitionCoordination(tests_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestRPCAlarmPartitionCoordination, self).setUp()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.setup_messaging(self.CONF)
|
||||
|
||||
self.coordinator_server = FakeCoordinator(self.transport)
|
||||
self.coordinator_server.rpc.start()
|
||||
eventlet.sleep() # must be sure that fanout queue is created
|
||||
|
||||
self.coordination = rpc_alarm.RPCAlarmPartitionCoordination()
|
||||
self.alarms = [
|
||||
alarms.Alarm(None, info={
|
||||
'name': 'instance_running_hot',
|
||||
'meter_name': 'cpu_util',
|
||||
'comparison_operator': 'gt',
|
||||
'threshold': 80.0,
|
||||
'evaluation_periods': 5,
|
||||
'statistic': 'avg',
|
||||
'state': 'ok',
|
||||
'ok_actions': ['http://host:8080/path'],
|
||||
'user_id': 'foobar',
|
||||
'project_id': 'snafu',
|
||||
'period': 60,
|
||||
'alarm_id': str(uuid.uuid4()),
|
||||
'matching_metadata':{'resource_id':
|
||||
'my_instance'}
|
||||
}),
|
||||
alarms.Alarm(None, info={
|
||||
'name': 'group_running_idle',
|
||||
'meter_name': 'cpu_util',
|
||||
'comparison_operator': 'le',
|
||||
'threshold': 10.0,
|
||||
'statistic': 'max',
|
||||
'evaluation_periods': 4,
|
||||
'state': 'insufficient data',
|
||||
'insufficient_data_actions': ['http://other_host/path'],
|
||||
'user_id': 'foobar',
|
||||
'project_id': 'snafu',
|
||||
'period': 300,
|
||||
'alarm_id': str(uuid.uuid4()),
|
||||
'matching_metadata':{'metadata.user_metadata.AS':
|
||||
'my_group'}
|
||||
}),
|
||||
]
|
||||
|
||||
def test_coordination_presence(self):
|
||||
id = str(uuid.uuid4())
|
||||
priority = float(timeutils.utcnow().strftime('%s.%f'))
|
||||
self.coordination.presence(id, priority)
|
||||
self.coordinator_server.rpc.wait()
|
||||
method, args = self.coordinator_server.notified[0]
|
||||
self.assertEqual(id, args['uuid'])
|
||||
self.assertEqual(priority, args['priority'])
|
||||
self.assertEqual('presence', method)
|
||||
|
||||
def test_coordination_assign(self):
|
||||
id = str(uuid.uuid4())
|
||||
self.coordination.assign(id, self.alarms)
|
||||
self.coordinator_server.rpc.wait()
|
||||
method, args = self.coordinator_server.notified[0]
|
||||
self.assertEqual(id, args['uuid'])
|
||||
self.assertEqual(2, len(args['alarms']))
|
||||
self.assertEqual('assign', method)
|
||||
|
||||
def test_coordination_allocate(self):
|
||||
id = str(uuid.uuid4())
|
||||
self.coordination.allocate(id, self.alarms)
|
||||
self.coordinator_server.rpc.wait()
|
||||
method, args = self.coordinator_server.notified[0]
|
||||
self.assertEqual(id, args['uuid'])
|
||||
self.assertEqual(2, len(args['alarms']))
|
||||
self.assertEqual('allocate', method)
|
||||
|
@ -1,119 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Tests for ceilometer.alarm.service.SingletonAlarmService.
|
||||
"""
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
from stevedore import extension
|
||||
|
||||
from ceilometer.alarm import service
|
||||
from ceilometer.tests import base as tests_base
|
||||
|
||||
|
||||
class TestSingletonAlarmService(tests_base.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestSingletonAlarmService, self).setUp()
|
||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||
self.setup_messaging(self.CONF)
|
||||
|
||||
self.threshold_eval = mock.Mock()
|
||||
self.evaluators = extension.ExtensionManager.make_test_instance(
|
||||
[
|
||||
extension.Extension(
|
||||
'threshold',
|
||||
None,
|
||||
None,
|
||||
self.threshold_eval),
|
||||
]
|
||||
)
|
||||
self.api_client = mock.MagicMock()
|
||||
self.singleton = service.SingletonAlarmService()
|
||||
self.singleton.tg = mock.Mock()
|
||||
self.singleton.evaluators = self.evaluators
|
||||
self.singleton.supported_evaluators = ['threshold']
|
||||
|
||||
def test_start(self):
|
||||
test_interval = 120
|
||||
self.CONF.set_override('evaluation_interval',
|
||||
test_interval,
|
||||
group='alarm')
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
self.singleton.start()
|
||||
expected = [
|
||||
mock.call(test_interval,
|
||||
self.singleton._evaluate_assigned_alarms,
|
||||
0),
|
||||
mock.call(604800, mock.ANY),
|
||||
]
|
||||
actual = self.singleton.tg.add_timer.call_args_list
|
||||
self.assertEqual(expected, actual)
|
||||
|
||||
def test_evaluation_cycle(self):
|
||||
alarm = mock.Mock(type='threshold')
|
||||
self.api_client.alarms.list.return_value = [alarm]
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
self.singleton._evaluate_assigned_alarms()
|
||||
self.threshold_eval.evaluate.assert_called_once_with(alarm)
|
||||
|
||||
def test_evaluation_cycle_with_bad_alarm(self):
|
||||
alarms = [
|
||||
mock.Mock(type='threshold', name='bad'),
|
||||
mock.Mock(type='threshold', name='good'),
|
||||
]
|
||||
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
|
||||
self.api_client.alarms.list.return_value = alarms
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
self.singleton._evaluate_assigned_alarms()
|
||||
self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])],
|
||||
self.threshold_eval.evaluate.call_args_list)
|
||||
|
||||
def test_unknown_extension_skipped(self):
|
||||
alarms = [
|
||||
mock.Mock(type='not_existing_type'),
|
||||
mock.Mock(type='threshold')
|
||||
]
|
||||
|
||||
self.api_client.alarms.list.return_value = alarms
|
||||
with mock.patch('ceilometerclient.client.get_client',
|
||||
return_value=self.api_client):
|
||||
self.singleton.start()
|
||||
self.singleton._evaluate_assigned_alarms()
|
||||
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
|
||||
|
||||
def test_singleton_endpoint_types(self):
|
||||
endpoint_types = ["internalURL", "publicURL"]
|
||||
for endpoint_type in endpoint_types:
|
||||
self.CONF.set_override('os_endpoint_type',
|
||||
endpoint_type,
|
||||
group='service_credentials')
|
||||
with mock.patch('ceilometerclient.client.get_client') as client:
|
||||
self.singleton.api_client = None
|
||||
self.singleton._evaluate_assigned_alarms()
|
||||
conf = self.CONF.service_credentials
|
||||
expected = [mock.call(2,
|
||||
os_auth_url=conf.os_auth_url,
|
||||
os_region_name=conf.os_region_name,
|
||||
os_tenant_name=conf.os_tenant_name,
|
||||
os_password=conf.os_password,
|
||||
os_username=conf.os_username,
|
||||
os_cacert=conf.os_cacert,
|
||||
os_endpoint_type=conf.os_endpoint_type,
|
||||
timeout=self.CONF.http_timeout,
|
||||
insecure=conf.insecure)]
|
||||
actual = client.call_args_list
|
||||
self.assertEqual(expected, actual)
|
@ -106,49 +106,6 @@ class BinSendSampleTestCase(base.BaseTestCase):
|
||||
self.assertEqual(0, subp.wait())
|
||||
|
||||
|
||||
class BinAlarmEvaluatorServiceTestCase(base.BaseTestCase):
|
||||
def _do_test(self, driver, driver_class):
|
||||
pipeline_cfg_file = self.path_get('etc/ceilometer/pipeline.yaml')
|
||||
content = ("[DEFAULT]\n"
|
||||
"rpc_backend=fake\n"
|
||||
"pipeline_cfg_file={0}\n"
|
||||
"debug=true\n"
|
||||
"[database]\n"
|
||||
"time_to_live=1\n"
|
||||
"connection=log://localhost\n".format(pipeline_cfg_file))
|
||||
|
||||
if driver:
|
||||
content += "[alarm]\nevaluation_service=%s\n" % driver
|
||||
|
||||
self.tempfile = fileutils.write_to_tempfile(content=content,
|
||||
prefix='ceilometer',
|
||||
suffix='.conf')
|
||||
self.subp = subprocess.Popen(['ceilometer-alarm-evaluator',
|
||||
"--config-file=%s" % self.tempfile],
|
||||
stderr=subprocess.PIPE)
|
||||
err = self.subp.stderr.read(1024)
|
||||
self.assertIn("Alarm evaluator loaded: %s" % driver_class, err)
|
||||
|
||||
def tearDown(self):
|
||||
super(BinAlarmEvaluatorServiceTestCase, self).tearDown()
|
||||
self.subp.kill()
|
||||
self.subp.wait()
|
||||
os.remove(self.tempfile)
|
||||
|
||||
def test_default_config(self):
|
||||
self._do_test(None, "AlarmEvaluationService")
|
||||
|
||||
def test_singleton_driver(self):
|
||||
self._do_test('singleton', "SingletonAlarmService")
|
||||
|
||||
def test_backward_compat(self):
|
||||
self._do_test("ceilometer.alarm.service.PartitionedAlarmService",
|
||||
"PartitionedAlarmService")
|
||||
|
||||
def test_partitioned_driver(self):
|
||||
self._do_test("partitioned", "PartitionedAlarmService")
|
||||
|
||||
|
||||
class BinApiTestCase(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -309,14 +309,6 @@ ceilometer.alarm.evaluator =
|
||||
gnocchi_aggregation_by_metrics_threshold = ceilometer.alarm.evaluator.gnocchi:GnocchiThresholdEvaluator
|
||||
gnocchi_aggregation_by_resources_threshold = ceilometer.alarm.evaluator.gnocchi:GnocchiThresholdEvaluator
|
||||
|
||||
ceilometer.alarm.evaluator_service =
|
||||
default = ceilometer.alarm.service:AlarmEvaluationService
|
||||
singleton = ceilometer.alarm.service:SingletonAlarmService
|
||||
partitioned = ceilometer.alarm.service:PartitionedAlarmService
|
||||
# NOTE(sileht): for backward compatibility
|
||||
ceilometer.alarm.service.SingletonAlarmService = ceilometer.alarm.service:SingletonAlarmService
|
||||
ceilometer.alarm.service.PartitionedAlarmService = ceilometer.alarm.service:PartitionedAlarmService
|
||||
|
||||
ceilometer.alarm.notifier =
|
||||
log = ceilometer.alarm.notifier.log:LogAlarmNotifier
|
||||
test = ceilometer.alarm.notifier.test:TestAlarmNotifier
|
||||
|
Loading…
x
Reference in New Issue
Block a user