diff --git a/ceilometer/alarm/partition/__init__.py b/ceilometer/alarm/partition/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ceilometer/alarm/partition/coordination.py b/ceilometer/alarm/partition/coordination.py new file mode 100644 index 000000000..fd5cb335f --- /dev/null +++ b/ceilometer/alarm/partition/coordination.py @@ -0,0 +1,308 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 Red Hat, Inc +# +# Authors: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import math +import random +import uuid + +from ceilometer.alarm import rpc as rpc_alarm +from ceilometer.openstack.common import log +from ceilometer.openstack.common import timeutils +from ceilometer.openstack.common.gettextutils import _ + + +LOG = log.getLogger(__name__) + + +class PartitionIdentity(object): + """Representation of a partition's identity for age comparison.""" + + def __init__(self, uuid, priority): + self.uuid = uuid + self.priority = priority + + def __repr__(self): + return '%s:%s' % (self.uuid, self.priority) + + def __hash__(self): + return hash((self.uuid, self.priority)) + + def __eq__(self, other): + if not isinstance(other, PartitionIdentity): + return False + return self.priority == other.priority and self.uuid == other.uuid + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + if not other: + return True + if not isinstance(other, PartitionIdentity): + return False + older = self.priority < other.priority + tie_broken = (self.priority == other.priority and + self.uuid < other.uuid) + return older or tie_broken + + def __gt__(self, other): + return not (self.__lt__(other) or self.__eq__(other)) + + +class PartitionCoordinator(object): + """Implements the alarm partition coordination protocol. + + A simple protocol based on AMQP fanout RPC is used. + + All available partitions report their presence periodically. + + The priority of each partition in terms of assuming mastership + is determined by earliest start-time (with a UUID-based tiebreaker + in the unlikely event of a time clash). + + A single partition assumes mastership at any given time, taking + responsibility for allocating the alarms to be evaluated across + the set of currently available partitions. + + When a partition lifecycle event is detected (i.e. a pre-existing + partition fails to report its presence, or a new one is started + up), a complete rebalance of the alarms is initiated. + + Individual alarm lifecycle events, on the other hand, do not + require a full re-balance. Instead new alarms are allocated as + they are detected, whereas deleted alarms are initially allowed to + remain within the allocation (as the individual evaluators are tolerant + of assigned alarms not existing, and the deleted alarms should be + randomly distributed over the partitions). However once the number of + alarms deleted since the last rebalance reaches a certain limit, a + rebalance will be initiated to maintain equity. + + As presence reports are received, each partition keeps track of the + oldest partition it currently knows about, allowing an assumption of + mastership to be aborted if an older partition belatedly reports. + """ + + def __init__(self): + # uniqueness is based on a combination of starting timestamp + # and UUID + self.start = timeutils.utcnow() + self.this = PartitionIdentity(str(uuid.uuid4()), + float(self.start.strftime('%s.%f'))) + self.oldest = None + + # fan-out RPC + self.coordination_rpc = rpc_alarm.RPCAlarmPartitionCoordination() + + # state maintained by the master + self.is_master = False + self.presence_changed = False + self.reports = {} + self.last_alarms = set() + self.deleted_alarms = set() + + # alarms for evaluation, relevant to all partitions regardless + # of role + self.assignment = [] + + def _distribute(self, alarms, rebalance): + """Distribute alarms over known set of evaluators. + + :param alarms: the alarms to distribute + :param rebalance: true if this is a full rebalance + :return: true if the distribution completed, false if aborted + """ + verb = 'assign' if rebalance else 'allocate' + method = (self.coordination_rpc.assign if rebalance + else self.coordination_rpc.allocate) + LOG.debug(_('triggering %s') % verb) + LOG.debug(_('known evaluators %s') % self.reports) + per_evaluator = int(math.ceil(len(alarms) / + float(len(self.reports) + 1))) + LOG.debug(_('per evaluator allocation %s') % per_evaluator) + # for small distributions (e.g. of newly created alarms) + # we deliberately skew to non-master evaluators + evaluators = self.reports.keys() + random.shuffle(evaluators) + offset = 0 + for evaluator in evaluators: + # TODO(eglynn): use pagination in the alarms API to chunk large + # large allocations + if self.oldest < self.this: + LOG.warn(_('%(this)s bailing on distribution cycle ' + 'as older partition detected: %(older)s') % + dict(this=self.this, older=self.oldest)) + return False + allocation = alarms[offset:offset + per_evaluator] + if allocation: + LOG.debug(_('%(verb)s-ing %(alloc)s to %(eval)s') % + dict(verb=verb, alloc=allocation, eval=evaluator)) + method(evaluator.uuid, allocation) + offset += per_evaluator + LOG.debug(_('master taking %s for self') % alarms[offset:]) + if rebalance: + self.assignment = alarms[offset:] + else: + self.assignment.extend(alarms[offset:]) + return True + + def _deletion_requires_rebalance(self, alarms): + """Track the level of deletion activity since the last full rebalance. + + We delay rebalancing until a certain threshold of deletion activity + has occurred. + + :param alarms: current set of alarms + :return: True if the level of alarm deletion since the last rebalance + is sufficient so as to require a full rebalance + """ + deleted_alarms = self.last_alarms - set(alarms) + LOG.debug(_('newly deleted alarms %s') % deleted_alarms) + self.deleted_alarms.update(deleted_alarms) + if len(self.deleted_alarms) > len(alarms) / 5: + LOG.debug(_('alarm deletion activity requires rebalance')) + self.deleted_alarms = set() + return True + return False + + def _record_oldest(self, partition, stale=False): + """Check if reported partition is the oldest we know about. + + :param partition: reported partition + :param stale: true if reported partition detected as stale. + """ + if stale and self.oldest == partition: + # current oldest partition detected as stale + self.oldest = None + elif not self.oldest: + # no known oldest partition + self.oldest = partition + elif partition < self.oldest: + # new oldest + self.oldest = partition + + def _is_master(self, interval): + """Determine if the current partition is the master.""" + now = timeutils.utcnow() + if timeutils.delta_seconds(self.start, now) < interval * 2: + LOG.debug(_('%s still warming up') % self.this) + return False + is_master = True + for partition, last_heard in self.reports.items(): + delta = timeutils.delta_seconds(last_heard, now) + LOG.debug(_('last heard from %(report)s %(delta)s seconds ago') % + dict(report=partition, delta=delta)) + if delta > interval * 2: + del self.reports[partition] + self._record_oldest(partition, stale=True) + LOG.debug(_('%(this)s detects stale evaluator: %(stale)s') % + dict(this=self.this, stale=partition)) + self.presence_changed = True + elif partition < self.this: + is_master = False + LOG.info(_('%(this)s sees older potential master: %(older)s') + % dict(this=self.this, older=partition)) + LOG.info(_('%(this)s is master?: %(is_master)s') % + dict(this=self.this, is_master=is_master)) + return is_master + + def _master_role(self, assuming, api_client): + """Carry out the master role, initiating a distribution if required. + + :param assuming: true if newly assumed mastership + :param api_client: the API client to use for alarms. + :return: True if not overtaken by an older partition + """ + alarms = [a.alarm_id for a in api_client.alarms.list()] + created_alarms = list(set(alarms) - self.last_alarms) + LOG.debug(_('newly created alarms %s') % created_alarms) + sufficient_deletion = self._deletion_requires_rebalance(alarms) + if (assuming or sufficient_deletion or self.presence_changed): + still_ahead = self._distribute(alarms, rebalance=True) + elif created_alarms: + still_ahead = self._distribute(list(created_alarms), + rebalance=False) + else: + # nothing to distribute, but check anyway if overtaken + still_ahead = self.this < self.oldest + self.last_alarms = set(alarms) + LOG.info('%(this)s not overtaken as master? %(still_ahead)s' % + dict(this=self.this, still_ahead=still_ahead)) + return still_ahead + + def check_mastership(self, eval_interval, api_client): + """Periodically check if the mastership role should be assumed. + + :param eval_interval: the alarm evaluation interval + :param api_client: the API client to use for alarms. + """ + LOG.debug(_('%s checking mastership status') % self.this) + try: + assuming = not self.is_master + self.is_master = (self._is_master(eval_interval) and + self._master_role(assuming, api_client)) + self.presence_changed = False + except Exception: + LOG.exception(_('mastership check failed')) + + def presence(self, uuid, priority): + """Accept an incoming report of presence.""" + report = PartitionIdentity(uuid, priority) + if report != self.this: + if report not in self.reports: + self.presence_changed = True + self._record_oldest(report) + self.reports[report] = timeutils.utcnow() + LOG.debug(_('%(this)s knows about %(reports)s') % + dict(this=self.this, reports=self.reports)) + + def assign(self, uuid, alarms): + """Accept an incoming alarm assignment.""" + if uuid == self.this.uuid: + LOG.debug(_('%(this)s got assignment: %(alarms)s') % + dict(this=self.this, alarms=alarms)) + self.assignment = alarms + + def allocate(self, uuid, alarms): + """Accept an incoming alarm allocation.""" + if uuid == self.this.uuid: + LOG.debug(_('%(this)s got allocation: %(alarms)s') % + dict(this=self.this, alarms=alarms)) + self.assignment.extend(alarms) + + def report_presence(self): + """Report the presence of the current partition.""" + LOG.debug(_('%s reporting presence') % self.this) + try: + self.coordination_rpc.presence(self.this.uuid, self.this.priority) + except Exception: + LOG.exception(_('presence reporting failed')) + + def assigned_alarms(self, api_client): + """Return the alarms assigned to the current partition.""" + if not self.assignment: + LOG.debug(_('%s has no assigned alarms to evaluate') % self.this) + return [] + + try: + LOG.debug(_('%(this)s alarms for evaluation: %(alarms)s') % + dict(this=self.this, alarms=self.assignment)) + return [a for a in api_client.alarms.list() + if a.alarm_id in self.assignment] + except Exception: + LOG.exception(_('assignment retrieval failed')) + return [] diff --git a/ceilometer/alarm/rpc.py b/ceilometer/alarm/rpc.py index afe6cc120..c2a6fa09f 100644 --- a/ceilometer/alarm/rpc.py +++ b/ceilometer/alarm/rpc.py @@ -26,6 +26,10 @@ OPTS = [ cfg.StrOpt('notifier_rpc_topic', default='alarm_notifier', help='the topic ceilometer uses for alarm notifier messages'), + cfg.StrOpt('partition_rpc_topic', + default='alarm_partition_coordination', + help='the topic ceilometer uses for alarm partition ' + 'coordination messages'), ] cfg.CONF.register_opts(OPTS, group='alarm') @@ -46,3 +50,28 @@ class RPCAlarmNotifier(rpc_proxy.RpcProxy): 'current': alarm.state, 'reason': reason}) self.cast(context.get_admin_context(), msg) + + +class RPCAlarmPartitionCoordination(rpc_proxy.RpcProxy): + def __init__(self): + super(RPCAlarmPartitionCoordination, self).__init__( + default_version='1.0', + topic=cfg.CONF.alarm.partition_rpc_topic) + + def presence(self, uuid, priority): + msg = self.make_msg('presence', data={ + 'uuid': uuid, + 'priority': priority}) + self.fanout_cast(context.get_admin_context(), msg) + + def assign(self, uuid, alarms): + msg = self.make_msg('assign', data={ + 'uuid': uuid, + 'alarms': alarms}) + return self.fanout_cast(context.get_admin_context(), msg) + + def allocate(self, uuid, alarms): + msg = self.make_msg('allocate', data={ + 'uuid': uuid, + 'alarms': alarms}) + return self.fanout_cast(context.get_admin_context(), msg) diff --git a/ceilometer/alarm/service.py b/ceilometer/alarm/service.py index e350a8fc9..60bba68f8 100644 --- a/ceilometer/alarm/service.py +++ b/ceilometer/alarm/service.py @@ -18,11 +18,15 @@ # License for the specific language governing permissions and limitations # under the License. +import abc + from oslo.config import cfg from stevedore import extension from ceilometer.alarm import rpc as rpc_alarm +from ceilometer.alarm.partition import coordination from ceilometer.service import prepare_service +from ceilometer.openstack.common import importutils from ceilometer.openstack.common import log from ceilometer.openstack.common import network_utils from ceilometer.openstack.common import service as os_service @@ -39,40 +43,36 @@ OPTS = [ ' be >= than configured pipeline interval for' ' collection of underlying metrics.', deprecated_opts=[cfg.DeprecatedOpt( - 'threshold_evaluation_interval', group='alarm')]) + 'threshold_evaluation_interval', group='alarm')]), + cfg.StrOpt('evaluation_service', + default='ceilometer.alarm.service.SingletonAlarmService', + help='Class to launch as alarm evaluation service'), ] cfg.CONF.register_opts(OPTS, group='alarm') cfg.CONF.import_opt('notifier_rpc_topic', 'ceilometer.alarm.rpc', group='alarm') +cfg.CONF.import_opt('partition_rpc_topic', 'ceilometer.alarm.rpc', + group='alarm') LOG = log.getLogger(__name__) -class SingletonAlarmService(os_service.Service): +class AlarmService(object): + + __metaclass__ = abc.ABCMeta EXTENSIONS_NAMESPACE = "ceilometer.alarm.evaluator" - def __init__(self): - super(SingletonAlarmService, self).__init__() - self.api_client = None + def _load_evaluators(self): self.evaluators = extension.ExtensionManager( - self.EXTENSIONS_NAMESPACE, + namespace=self.EXTENSIONS_NAMESPACE, invoke_on_load=True, - invoke_args=(rpc_alarm.RPCAlarmNotifier(),)) + invoke_args=(rpc_alarm.RPCAlarmNotifier(),) + ) self.supported_evaluators = [ext.name for ext in self.evaluators.extensions] - def start(self): - super(SingletonAlarmService, self).start() - interval = cfg.CONF.alarm.evaluation_interval - self.tg.add_timer( - interval, - self._evaluate_all_alarms, - 0) - # Add a dummy thread to have wait() working - self.tg.add_timer(604800, lambda: None) - @property def _client(self): """Construct or reuse an authenticated API client.""" @@ -89,15 +89,15 @@ class SingletonAlarmService(os_service.Service): self.api_client = ceiloclient.get_client(2, **creds) return self.api_client - def _evaluate_all_alarms(self): + def _evaluate_assigned_alarms(self): try: - alarms = self._client.alarms.list() + alarms = self._assigned_alarms() LOG.info(_('initiating evaluation cycle on %d alarms') % len(alarms)) for alarm in alarms: self._evaluate_alarm(alarm) except Exception: - LOG.exception(_('threshold evaluation cycle failed')) + LOG.exception(_('alarm evaluation cycle failed')) def _evaluate_alarm(self, alarm): """Evaluate the alarms assigned to this evaluator.""" @@ -113,15 +113,98 @@ class SingletonAlarmService(os_service.Service): LOG.debug(_('evaluating alarm %s') % alarm.alarm_id) self.evaluators[alarm.type].obj.evaluate(alarm) + @abc.abstractmethod + def _assigned_alarms(self): + pass -def singleton_alarm(): + +class SingletonAlarmService(AlarmService, os_service.Service): + + def __init__(self): + super(SingletonAlarmService, self).__init__() + self._load_evaluators() + self.api_client = None + + def start(self): + super(SingletonAlarmService, self).start() + if self.evaluators: + interval = cfg.CONF.alarm.evaluation_interval + self.tg.add_timer( + interval, + self._evaluate_assigned_alarms, + 0) + # Add a dummy thread to have wait() working + self.tg.add_timer(604800, lambda: None) + + def _assigned_alarms(self): + return self._client.alarms.list() + + +def alarm_evaluator(): prepare_service() - os_service.launch(SingletonAlarmService()).wait() + service = importutils.import_object(cfg.CONF.alarm.evaluation_service) + os_service.launch(service).wait() cfg.CONF.import_opt('host', 'ceilometer.service') +class PartitionedAlarmService(AlarmService, rpc_service.Service): + + def __init__(self): + super(PartitionedAlarmService, self).__init__( + cfg.CONF.host, + cfg.CONF.alarm.partition_rpc_topic, + self + ) + self._load_evaluators() + self.api_client = None + self.partition_coordinator = coordination.PartitionCoordinator() + + def initialize_service_hook(self, service): + LOG.debug('initialize_service_hooks') + self.conn.create_worker( + cfg.CONF.alarm.partition_rpc_topic, + rpc_dispatcher.RpcDispatcher([self]), + 'ceilometer.alarm.' + cfg.CONF.alarm.partition_rpc_topic, + ) + + def start(self): + super(PartitionedAlarmService, self).start() + if self.evaluators: + eval_interval = cfg.CONF.alarm.evaluation_interval + self.tg.add_timer( + eval_interval / 4, + self.partition_coordinator.report_presence, + 0) + self.tg.add_timer( + eval_interval / 2, + self.partition_coordinator.check_mastership, + eval_interval, + *[eval_interval, self._client]) + self.tg.add_timer( + eval_interval, + self._evaluate_assigned_alarms, + eval_interval) + # Add a dummy thread to have wait() working + self.tg.add_timer(604800, lambda: None) + + def _assigned_alarms(self): + return self.partition_coordinator.assigned_alarms(self._client) + + def presence(self, context, data): + self.partition_coordinator.presence(data.get('uuid'), + data.get('priority')) + + def assign(self, context, data): + self.partition_coordinator.assign(data.get('uuid'), + data.get('alarms')) + + def allocate(self, context, data): + self.partition_coordinator.allocate(data.get('uuid'), + data.get('alarms')) + + class AlarmNotifierService(rpc_service.Service): EXTENSIONS_NAMESPACE = "ceilometer.alarm.notifier" diff --git a/etc/ceilometer/ceilometer.conf.sample b/etc/ceilometer/ceilometer.conf.sample index 3abd55b4c..8c935dd19 100644 --- a/etc/ceilometer/ceilometer.conf.sample +++ b/etc/ceilometer/ceilometer.conf.sample @@ -605,6 +605,10 @@ # (string value) #notifier_rpc_topic=alarm_notifier +# the topic ceilometer uses for alarm partition coordination +# messages (string value) +#partition_rpc_topic=alarm_partition_coordination + # # Options defined in ceilometer.alarm.service @@ -615,6 +619,9 @@ # (integer value) #evaluation_interval=60 +# Class to launch as alarm evaluation service (string value) +#evaluation_service=ceilometer.alarm.service.SingletonAlarmService + # # Options defined in ceilometer.api.controllers.v2 diff --git a/setup.cfg b/setup.cfg index cc7168a6e..3487d27fc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -122,7 +122,7 @@ console_scripts = ceilometer-expirer = ceilometer.storage:expirer ceilometer-collector = ceilometer.collector.service:collector ceilometer-collector-udp = ceilometer.collector.service:udp_collector - ceilometer-alarm-singleton = ceilometer.alarm.service:singleton_alarm + ceilometer-alarm-evaluator = ceilometer.alarm.service:alarm_evaluator ceilometer-alarm-notifier = ceilometer.alarm.service:alarm_notifier ceilometer.dispatcher = diff --git a/tests/alarm/partition/__init__.py b/tests/alarm/partition/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/alarm/partition/test_coordination.py b/tests/alarm/partition/test_coordination.py new file mode 100644 index 000000000..4ff717aff --- /dev/null +++ b/tests/alarm/partition/test_coordination.py @@ -0,0 +1,382 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer/alarm/partition/coordination.py +""" +import datetime +import mock +import uuid + +from oslo.config import cfg + +from ceilometer.alarm.partition import coordination +from ceilometer.openstack.common import timeutils +from ceilometer.storage import models +from ceilometer.tests import base + + +class TestCoordinate(base.TestCase): + def setUp(self): + super(TestCoordinate, self).setUp() + self.test_interval = 120 + cfg.CONF.set_override('evaluation_interval', + self.test_interval, + group='alarm') + self.api_client = mock.Mock() + self.override_start = datetime.datetime(2012, 7, 2, 10, 45) + timeutils.utcnow.override_time = self.override_start + self.partition_coordinator = coordination.PartitionCoordinator() + self.partition_coordinator.coordination_rpc = mock.Mock() + + def tearDown(self): + super(TestCoordinate, self).tearDown() + timeutils.utcnow.override_time = None + + def _no_alarms(self): + self.api_client.alarms.list.return_value = [] + + def _some_alarms(self, count): + alarm_ids = [str(uuid.uuid4()) for _ in xrange(count)] + alarms = [self._make_alarm(aid) for aid in alarm_ids] + self.api_client.alarms.list.return_value = alarms + return alarm_ids + + def _current_alarms(self): + return self.api_client.alarms.list.return_value + + def _dump_alarms(self, shave): + alarms = self.api_client.alarms.list.return_value + alarms = alarms[:shave] + alarm_ids = [a.alarm_id for a in alarms] + self.api_client.alarms.list.return_value = alarms + return alarm_ids + + def _add_alarms(self, boost): + new_alarm_ids = [str(uuid.uuid4()) for _ in xrange(boost)] + alarms = self.api_client.alarms.list.return_value + for aid in new_alarm_ids: + alarms.append(self._make_alarm(aid)) + self.api_client.alarms.list.return_value = alarms + return new_alarm_ids + + @staticmethod + def _make_alarm(uuid): + return models.Alarm(name='instance_running_hot', + type='threshold', + user_id='foobar', + project_id='snafu', + enabled=True, + description='', + repeat_actions=False, + state='insufficient data', + state_timestamp=None, + timestamp=None, + ok_actions=[], + alarm_actions=[], + insufficient_data_actions=[], + alarm_id=uuid, + rule=dict( + statistic='avg', + comparison_operator='gt', + threshold=80.0, + evaluation_periods=5, + period=60, + query=[], + )) + + def _advance_time(self, factor): + delta = datetime.timedelta(seconds=self.test_interval * factor) + timeutils.utcnow.override_time += delta + + def _younger_by(self, offset): + return self.partition_coordinator.this.priority + offset + + def _older_by(self, offset): + return self.partition_coordinator.this.priority - offset + + def _check_mastership(self, expected): + self.partition_coordinator.check_mastership(self.test_interval, + self.api_client) + self.assertEqual(expected, self.partition_coordinator.is_master) + + def _new_partition(self, offset): + younger = self._younger_by(offset) + pid = uuid.uuid4() + self.partition_coordinator.presence(pid, younger) + return (pid, younger) + + def _check_assignments(self, others, alarm_ids, per_worker, + expect_uneffected=[]): + rpc = self.partition_coordinator.coordination_rpc + calls = rpc.assign.call_args_list + return self._check_distribution(others, alarm_ids, per_worker, calls, + expect_uneffected) + + def _check_allocation(self, others, alarm_ids, per_worker): + rpc = self.partition_coordinator.coordination_rpc + calls = rpc.allocate.call_args_list + return self._check_distribution(others, alarm_ids, per_worker, calls) + + def _check_distribution(self, others, alarm_ids, per_worker, calls, + expect_uneffected=[]): + uneffected = [pid for pid, _ in others] + uneffected.extend(expect_uneffected) + remainder = list(alarm_ids) + for call in calls: + args, _ = call + target, alarms = args + self.assertTrue(target in uneffected) + uneffected.remove(target) + self.assertEqual(len(alarms), per_worker) + for aid in alarms: + self.assertTrue(aid in remainder) + remainder.remove(aid) + self.assertEqual(set(uneffected), set(expect_uneffected)) + return remainder + + def _forget_assignments(self, expected_assignments): + rpc = self.partition_coordinator.coordination_rpc + self.assertEqual(len(rpc.assign.call_args_list), + expected_assignments) + rpc.reset_mock() + + def test_mastership_not_assumed_during_warmup(self): + self._no_alarms() + + for _ in xrange(7): + # still warming up + self._advance_time(0.25) + self._check_mastership(False) + + # now warmed up + self._advance_time(0.25) + self._check_mastership(True) + + def test_uncontested_mastership_assumed(self): + self._no_alarms() + + self._advance_time(3) + + self._check_mastership(True) + + def test_contested_mastership_assumed(self): + self._no_alarms() + + self._advance_time(3) + + for offset in xrange(1, 5): + younger = self._younger_by(offset) + self.partition_coordinator.presence(uuid.uuid4(), younger) + + self._check_mastership(True) + + def test_bested_mastership_relinquished(self): + self._no_alarms() + + self._advance_time(3) + + self._check_mastership(True) + + older = self._older_by(1) + self.partition_coordinator.presence(uuid.uuid4(), older) + + self._check_mastership(False) + + def _do_test_tie_broken_mastership(self, seed, expect_mastership): + self._no_alarms() + self.partition_coordinator.this.uuid = uuid.UUID(int=1) + + self._advance_time(3) + + self._check_mastership(True) + + tied = self.partition_coordinator.this.priority + self.partition_coordinator.presence(uuid.UUID(int=seed), tied) + + self._check_mastership(expect_mastership) + + def test_tie_broken_mastership_assumed(self): + self._do_test_tie_broken_mastership(2, True) + + def test_tie_broken_mastership_relinquished(self): + self._do_test_tie_broken_mastership(0, False) + + def test_fair_distribution(self): + alarm_ids = self._some_alarms(49) + + self._advance_time(3) + + others = [self._new_partition(i) for i in xrange(1, 5)] + + self._check_mastership(True) + + remainder = self._check_assignments(others, alarm_ids, 10) + self.assertEqual(set(remainder), + set(self.partition_coordinator.assignment)) + + def test_rebalance_on_partition_startup(self): + alarm_ids = self._some_alarms(49) + + self._advance_time(3) + + others = [self._new_partition(i) for i in xrange(1, 5)] + + self._check_mastership(True) + + self. _forget_assignments(4) + + others.append(self._new_partition(5)) + self._check_mastership(True) + + remainder = self._check_assignments(others, alarm_ids, 9) + self.assertEqual(set(remainder), + set(self.partition_coordinator.assignment)) + + def test_rebalance_on_partition_staleness(self): + alarm_ids = self._some_alarms(49) + + self._advance_time(3) + + others = [self._new_partition(i) for i in xrange(1, 5)] + + self._check_mastership(True) + + self. _forget_assignments(4) + + self._advance_time(4) + + stale, _ = others.pop() + for pid, younger in others: + self.partition_coordinator.presence(pid, younger) + + self._check_mastership(True) + + remainder = self._check_assignments(others, alarm_ids, 13, [stale]) + self.assertEqual(set(remainder), + set(self.partition_coordinator.assignment)) + + def test_rebalance_on_sufficient_deletion(self): + alarm_ids = self._some_alarms(49) + + self._advance_time(3) + + others = [self._new_partition(i) for i in xrange(1, 5)] + + self._check_mastership(True) + + self._forget_assignments(4) + + alarm_ids = self._dump_alarms(len(alarm_ids) / 2) + + self._check_mastership(True) + + remainder = self._check_assignments(others, alarm_ids, 5) + self.assertEqual(set(remainder), + set(self.partition_coordinator.assignment)) + + def test_no_rebalance_on_insufficient_deletion(self): + alarm_ids = self._some_alarms(49) + + self._advance_time(3) + + others = [self._new_partition(i) for i in xrange(1, 5)] + + self._check_mastership(True) + + self._forget_assignments(4) + + alarm_ids = self._dump_alarms(45) + + self._check_mastership(True) + + expect_uneffected = [pid for pid, _ in others] + self._check_assignments(others, alarm_ids, 10, expect_uneffected) + + def test_no_rebalance_on_creation(self): + self._some_alarms(49) + + self._advance_time(3) + + others = [self._new_partition(i) for i in xrange(1, 5)] + + self._check_mastership(True) + + self._forget_assignments(4) + + new_alarm_ids = self._add_alarms(8) + + master_assignment = set(self.partition_coordinator.assignment) + self._check_mastership(True) + + remainder = self._check_allocation(others, new_alarm_ids, 2) + self.assertEqual(len(remainder), 0) + self.assertEqual(master_assignment, + set(self.partition_coordinator.assignment)) + + def test_bail_when_overtaken_in_distribution(self): + self._some_alarms(49) + + self._advance_time(3) + + for i in xrange(1, 5): + self._new_partition(i) + + def overtake(*args): + self._new_partition(-1) + + rpc = self.partition_coordinator.coordination_rpc + rpc.assign.side_effect = overtake + + self._check_mastership(False) + + self.assertEqual(len(rpc.assign.call_args_list), 1) + + def test_assigned_alarms_no_assignment(self): + alarms = self.partition_coordinator.assigned_alarms(self.api_client) + self.assertEqual(len(alarms), 0) + + def test_assigned_alarms_assignment(self): + alarm_ids = self._some_alarms(6) + + uuid = self.partition_coordinator.this.uuid + self.partition_coordinator.assign(uuid, alarm_ids) + + alarms = self.partition_coordinator.assigned_alarms(self.api_client) + self.assertEqual(alarms, self._current_alarms()) + + def test_assigned_alarms_allocation(self): + alarm_ids = self._some_alarms(6) + + uuid = self.partition_coordinator.this.uuid + self.partition_coordinator.assign(uuid, alarm_ids) + + new_alarm_ids = self._add_alarms(2) + self.partition_coordinator.allocate(uuid, new_alarm_ids) + + alarms = self.partition_coordinator.assigned_alarms(self.api_client) + self.assertEqual(alarms, self._current_alarms()) + + def test_assigned_alarms_deleted_assignment(self): + alarm_ids = self._some_alarms(6) + + uuid = self.partition_coordinator.this.uuid + self.partition_coordinator.assign(uuid, alarm_ids) + + self._dump_alarms(len(alarm_ids) / 2) + + alarms = self.partition_coordinator.assigned_alarms(self.api_client) + self.assertEqual(alarms, self._current_alarms()) diff --git a/tests/alarm/test_partitioned_alarm_svc.py b/tests/alarm/test_partitioned_alarm_svc.py new file mode 100644 index 000000000..44f37694a --- /dev/null +++ b/tests/alarm/test_partitioned_alarm_svc.py @@ -0,0 +1,102 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 Red Hat, Inc +# +# Author: Eoghan Glynn +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for ceilometer.alarm.service.PartitionedAlarmService. +""" +import mock +from contextlib import nested +from stevedore import extension +from stevedore.tests import manager as extension_tests + +from oslo.config import cfg + +from ceilometer.alarm import service +from ceilometer.tests import base + + +class TestPartitionedAlarmService(base.TestCase): + def setUp(self): + super(TestPartitionedAlarmService, self).setUp() + self.threshold_eval = mock.Mock() + self.api_client = mock.MagicMock() + cfg.CONF.set_override('host', + 'fake_host') + cfg.CONF.set_override('partition_rpc_topic', + 'fake_topic', + group='alarm') + self.partitioned = service.PartitionedAlarmService() + self.partitioned.tg = mock.Mock() + self.partitioned.partition_coordinator = mock.Mock() + self.extension_mgr = extension_tests.TestExtensionManager( + [ + extension.Extension( + 'threshold', + None, + None, + self.threshold_eval, ), + ]) + self.partitioned.extension_manager = self.extension_mgr + + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + def test_start(self): + test_interval = 120 + cfg.CONF.set_override('evaluation_interval', + test_interval, + group='alarm') + get_client = 'ceilometerclient.client.get_client' + create_conn = 'ceilometer.openstack.common.rpc.create_connection' + with nested(mock.patch(get_client, return_value=self.api_client), + mock.patch(create_conn)): + self.partitioned.start() + pc = self.partitioned.partition_coordinator + expected = [ + mock.call(test_interval / 4, + pc.report_presence, + 0), + mock.call(test_interval / 2, + pc.check_mastership, + test_interval, + test_interval, + self.api_client), + mock.call(test_interval, + self.partitioned._evaluate_assigned_alarms, + test_interval), + mock.call(604800, mock.ANY), + ] + actual = self.partitioned.tg.add_timer.call_args_list + self.assertEqual(actual, expected) + + def test_presence_reporting(self): + priority = 42 + self.partitioned.presence(mock.Mock(), + dict(uuid='uuid', priority=priority)) + pc = self.partitioned.partition_coordinator + pc.presence.assert_called_once_with('uuid', priority) + + def test_alarm_assignment(self): + alarms = [mock.Mock()] + self.partitioned.assign(mock.Mock(), + dict(uuid='uuid', alarms=alarms)) + pc = self.partitioned.partition_coordinator + pc.assign.assert_called_once_with('uuid', alarms) + + def test_alarm_allocation(self): + alarms = [mock.Mock()] + self.partitioned.allocate(mock.Mock(), + dict(uuid='uuid', alarms=alarms)) + pc = self.partitioned.partition_coordinator + pc.allocate.assert_called_once_with('uuid', alarms) diff --git a/tests/alarm/test_singleton_alarm_svc.py b/tests/alarm/test_singleton_alarm_svc.py index 87cd4cf18..ee2d31e86 100644 --- a/tests/alarm/test_singleton_alarm_svc.py +++ b/tests/alarm/test_singleton_alarm_svc.py @@ -15,10 +15,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -"""Tests for ceilometer/alarm/service.py +"""Tests for ceilometer.alarm.service.SingletonAlarmService. """ import mock +from oslo.config import cfg + from stevedore import extension from stevedore.tests import manager as extension_tests @@ -45,12 +47,16 @@ class TestSingletonAlarmService(base.TestCase): self.singleton.supported_evaluators = ['threshold'] def test_start(self): + test_interval = 120 + cfg.CONF.set_override('evaluation_interval', + test_interval, + group='alarm') with mock.patch('ceilometerclient.client.get_client', return_value=self.api_client): self.singleton.start() expected = [ - mock.call(60, - self.singleton._evaluate_all_alarms, + mock.call(test_interval, + self.singleton._evaluate_assigned_alarms, 0), mock.call(604800, mock.ANY), ] @@ -63,7 +69,7 @@ class TestSingletonAlarmService(base.TestCase): self.api_client.alarms.list.return_value = [alarm] with mock.patch('ceilometerclient.client.get_client', return_value=self.api_client): - self.singleton._evaluate_all_alarms() + self.singleton._evaluate_assigned_alarms() self.threshold_eval.evaluate.assert_called_once_with(alarm) def test_disabled_is_skipped(self): @@ -78,7 +84,7 @@ class TestSingletonAlarmService(base.TestCase): with mock.patch('ceilometerclient.client.get_client', return_value=self.api_client): self.singleton.start() - self.singleton._evaluate_all_alarms() + self.singleton._evaluate_assigned_alarms() self.threshold_eval.evaluate.assert_called_once_with(alarms[1]) def test_unknown_extention_skipped(self): @@ -91,5 +97,5 @@ class TestSingletonAlarmService(base.TestCase): with mock.patch('ceilometerclient.client.get_client', return_value=self.api_client): self.singleton.start() - self.singleton._evaluate_all_alarms() + self.singleton._evaluate_assigned_alarms() self.threshold_eval.evaluate.assert_called_once_with(alarms[1])