Merge "use Cotyledon lib"
This commit is contained in:
commit
08bda02c0a
@ -15,7 +15,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_service import service as os_service
|
||||
import cotyledon
|
||||
|
||||
from aodh import evaluator as evaluator_svc
|
||||
from aodh import event as event_svc
|
||||
@ -25,17 +25,23 @@ from aodh import service
|
||||
|
||||
def notifier():
|
||||
conf = service.prepare_service()
|
||||
os_service.launch(conf, notifier_svc.AlarmNotifierService(conf),
|
||||
workers=conf.notifier.workers).wait()
|
||||
sm = cotyledon.ServiceManager()
|
||||
sm.add(notifier_svc.AlarmNotifierService,
|
||||
workers=conf.notifier.workers, args=(conf,))
|
||||
sm.run()
|
||||
|
||||
|
||||
def evaluator():
|
||||
conf = service.prepare_service()
|
||||
os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf),
|
||||
workers=conf.evaluator.workers).wait()
|
||||
sm = cotyledon.ServiceManager()
|
||||
sm.add(evaluator_svc.AlarmEvaluationService,
|
||||
workers=conf.evaluator.workers, args=(conf,))
|
||||
sm.run()
|
||||
|
||||
|
||||
def listener():
|
||||
conf = service.prepare_service()
|
||||
os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf),
|
||||
workers=conf.listener.workers).wait()
|
||||
sm = cotyledon.ServiceManager()
|
||||
sm.add(event_svc.EventAlarmEvaluationService,
|
||||
workers=conf.listener.workers, args=(conf,))
|
||||
sm.run()
|
||||
|
@ -20,11 +20,11 @@ import json
|
||||
import threading
|
||||
|
||||
from concurrent import futures
|
||||
import cotyledon
|
||||
import croniter
|
||||
from futurist import periodics
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_service import service as os_service
|
||||
from oslo_utils import timeutils
|
||||
import pytz
|
||||
import six
|
||||
@ -182,68 +182,34 @@ class Evaluator(object):
|
||||
"""
|
||||
|
||||
|
||||
class AlarmEvaluationService(os_service.Service):
|
||||
class AlarmEvaluationService(cotyledon.Service):
|
||||
|
||||
PARTITIONING_GROUP_NAME = "alarm_evaluator"
|
||||
EVALUATOR_EXTENSIONS_NAMESPACE = "aodh.evaluator"
|
||||
|
||||
def __init__(self, conf):
|
||||
super(AlarmEvaluationService, self).__init__()
|
||||
def __init__(self, worker_id, conf):
|
||||
super(AlarmEvaluationService, self).__init__(worker_id)
|
||||
self.conf = conf
|
||||
|
||||
@property
|
||||
def _storage_conn(self):
|
||||
if not self.storage_conn:
|
||||
self.storage_conn = storage.get_connection_from_config(self.conf)
|
||||
return self.storage_conn
|
||||
ef = lambda: futures.ThreadPoolExecutor(max_workers=10)
|
||||
self.periodic = periodics.PeriodicWorker.create(
|
||||
[], executor_factory=ef)
|
||||
|
||||
def _load_evaluators(self):
|
||||
self.evaluators = extension.ExtensionManager(
|
||||
namespace=self.EVALUATOR_EXTENSIONS_NAMESPACE,
|
||||
invoke_on_load=True,
|
||||
invoke_args=(self.conf,)
|
||||
)
|
||||
self.storage_conn = storage.get_connection_from_config(self.conf)
|
||||
|
||||
def _evaluate_assigned_alarms(self):
|
||||
try:
|
||||
alarms = self._assigned_alarms()
|
||||
LOG.info(_('initiating evaluation cycle on %d alarms') %
|
||||
len(alarms))
|
||||
for alarm in alarms:
|
||||
self._evaluate_alarm(alarm)
|
||||
except Exception:
|
||||
LOG.exception(_('alarm evaluation cycle failed'))
|
||||
|
||||
def _evaluate_alarm(self, alarm):
|
||||
"""Evaluate the alarms assigned to this evaluator."""
|
||||
if alarm.type not in self.evaluators:
|
||||
LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id)
|
||||
return
|
||||
|
||||
LOG.debug('evaluating alarm %s', alarm.alarm_id)
|
||||
try:
|
||||
self.evaluators[alarm.type].obj.evaluate(alarm)
|
||||
except Exception:
|
||||
LOG.exception(_('Failed to evaluate alarm %s'), alarm.alarm_id)
|
||||
|
||||
def start(self):
|
||||
super(AlarmEvaluationService, self).start()
|
||||
|
||||
self.storage_conn = None
|
||||
self._load_evaluators()
|
||||
self.partition_coordinator = coordination.PartitionCoordinator(
|
||||
self.conf)
|
||||
|
||||
self.partition_coordinator.start()
|
||||
self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME)
|
||||
|
||||
# allow time for coordination if necessary
|
||||
delay_start = self.partition_coordinator.is_active()
|
||||
|
||||
ef = lambda: futures.ThreadPoolExecutor(max_workers=10)
|
||||
self.periodic = periodics.PeriodicWorker.create(
|
||||
[], executor_factory=ef)
|
||||
|
||||
if self.evaluators:
|
||||
@periodics.periodic(spacing=self.conf.evaluation_interval,
|
||||
run_immediately=not delay_start)
|
||||
@ -267,25 +233,39 @@ class AlarmEvaluationService(os_service.Service):
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
# NOTE(sileht): We have to drop eventlet to drop this last eventlet
|
||||
# thread
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
def terminate(self):
|
||||
self.periodic.stop()
|
||||
self.partition_coordinator.stop()
|
||||
self.periodic.wait()
|
||||
|
||||
def stop(self):
|
||||
if getattr(self, 'periodic', None):
|
||||
self.periodic.stop()
|
||||
self.periodic.wait()
|
||||
if getattr(self, 'partition_coordinator', None):
|
||||
self.partition_coordinator.stop()
|
||||
super(AlarmEvaluationService, self).stop()
|
||||
def _evaluate_assigned_alarms(self):
|
||||
try:
|
||||
alarms = self._assigned_alarms()
|
||||
LOG.info(_('initiating evaluation cycle on %d alarms') %
|
||||
len(alarms))
|
||||
for alarm in alarms:
|
||||
self._evaluate_alarm(alarm)
|
||||
except Exception:
|
||||
LOG.exception(_('alarm evaluation cycle failed'))
|
||||
|
||||
def _evaluate_alarm(self, alarm):
|
||||
"""Evaluate the alarms assigned to this evaluator."""
|
||||
if alarm.type not in self.evaluators:
|
||||
LOG.debug('skipping alarm %s: type unsupported', alarm.alarm_id)
|
||||
return
|
||||
|
||||
LOG.debug('evaluating alarm %s', alarm.alarm_id)
|
||||
try:
|
||||
self.evaluators[alarm.type].obj.evaluate(alarm)
|
||||
except Exception:
|
||||
LOG.exception(_('Failed to evaluate alarm %s'), alarm.alarm_id)
|
||||
|
||||
def _assigned_alarms(self):
|
||||
# NOTE(r-mibu): The 'event' type alarms will be evaluated by the
|
||||
# event-driven alarm evaluator, so this periodical evaluator skips
|
||||
# those alarms.
|
||||
all_alarms = self._storage_conn.get_alarms(enabled=True,
|
||||
exclude=dict(type='event'))
|
||||
all_alarms = self.storage_conn.get_alarms(enabled=True,
|
||||
exclude=dict(type='event'))
|
||||
all_alarms = list(all_alarms)
|
||||
all_alarm_ids = [a.alarm_id for a in all_alarms]
|
||||
selected = self.partition_coordinator.extract_my_subset(
|
||||
|
@ -13,10 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import cotyledon
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_service import service
|
||||
|
||||
from aodh.evaluator import event
|
||||
from aodh import messaging
|
||||
@ -51,14 +51,10 @@ class EventAlarmEndpoint(object):
|
||||
self.evaluator.evaluate_events(notification['payload'])
|
||||
|
||||
|
||||
class EventAlarmEvaluationService(service.Service):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(EventAlarmEvaluationService, self).__init__()
|
||||
class EventAlarmEvaluationService(cotyledon.Service):
|
||||
def __init__(self, worker_id, conf):
|
||||
super(EventAlarmEvaluationService, self).__init__(worker_id)
|
||||
self.conf = conf
|
||||
|
||||
def start(self):
|
||||
super(EventAlarmEvaluationService, self).start()
|
||||
self.storage_conn = storage.get_connection_from_config(self.conf)
|
||||
self.evaluator = event.EventAlarmEvaluator(self.conf)
|
||||
self.listener = messaging.get_batch_notification_listener(
|
||||
@ -69,11 +65,7 @@ class EventAlarmEvaluationService(service.Service):
|
||||
self.conf.listener.batch_size,
|
||||
self.conf.listener.batch_timeout)
|
||||
self.listener.start()
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
|
||||
def stop(self):
|
||||
if getattr(self, 'listener', None):
|
||||
self.listener.stop()
|
||||
self.listener.wait()
|
||||
super(EventAlarmEvaluationService, self).stop()
|
||||
def terminate(self):
|
||||
self.listener.stop()
|
||||
self.listener.wait()
|
||||
|
@ -15,10 +15,10 @@
|
||||
|
||||
import abc
|
||||
|
||||
import cotyledon
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import oslo_messaging
|
||||
from oslo_service import service as os_service
|
||||
from oslo_utils import netutils
|
||||
import six
|
||||
from stevedore import extension
|
||||
@ -66,15 +66,11 @@ class AlarmNotifier(object):
|
||||
"""
|
||||
|
||||
|
||||
class AlarmNotifierService(os_service.Service):
|
||||
class AlarmNotifierService(cotyledon.Service):
|
||||
NOTIFIER_EXTENSIONS_NAMESPACE = "aodh.notifier"
|
||||
|
||||
def __init__(self, conf):
|
||||
super(AlarmNotifierService, self).__init__()
|
||||
def __init__(self, worker_id, conf):
|
||||
self.conf = conf
|
||||
|
||||
def start(self):
|
||||
super(AlarmNotifierService, self).start()
|
||||
transport = messaging.get_transport(self.conf)
|
||||
self.notifiers = extension.ExtensionManager(
|
||||
self.NOTIFIER_EXTENSIONS_NAMESPACE,
|
||||
@ -86,14 +82,10 @@ class AlarmNotifierService(os_service.Service):
|
||||
transport, [target], [AlarmEndpoint(self.notifiers)], False,
|
||||
self.conf.notifier.batch_size, self.conf.notifier.batch_timeout)
|
||||
self.listener.start()
|
||||
# Add a dummy thread to have wait() working
|
||||
self.tg.add_timer(604800, lambda: None)
|
||||
|
||||
def stop(self):
|
||||
if getattr(self, 'listener', None):
|
||||
self.listener.stop()
|
||||
self.listener.wait()
|
||||
super(AlarmNotifierService, self).stop()
|
||||
def terminate(self):
|
||||
self.listener.stop()
|
||||
self.listener.wait()
|
||||
|
||||
|
||||
class AlarmEndpoint(object):
|
||||
|
@ -18,6 +18,7 @@ import time
|
||||
|
||||
import mock
|
||||
from oslo_config import fixture as fixture_config
|
||||
from oslotest import mockpatch
|
||||
from stevedore import extension
|
||||
|
||||
from aodh import evaluator
|
||||
@ -34,7 +35,9 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
self.setup_messaging(self.CONF)
|
||||
|
||||
self.threshold_eval = mock.MagicMock()
|
||||
self.evaluators = extension.ExtensionManager.make_test_instance(
|
||||
self._fake_conn = mock.Mock()
|
||||
self._fake_pc = mock.Mock()
|
||||
self._fake_em = extension.ExtensionManager.make_test_instance(
|
||||
[
|
||||
extension.Extension(
|
||||
'threshold',
|
||||
@ -44,14 +47,23 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
]
|
||||
)
|
||||
|
||||
self.svc = evaluator.AlarmEvaluationService(self.CONF)
|
||||
self.svc.tg = mock.Mock()
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'stevedore.extension.ExtensionManager',
|
||||
return_value=self._fake_em
|
||||
))
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'aodh.coordination.PartitionCoordinator',
|
||||
return_value=self._fake_pc
|
||||
))
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'aodh.storage.get_connection_from_config',
|
||||
return_value=self._fake_conn
|
||||
))
|
||||
|
||||
@mock.patch('aodh.storage.get_connection_from_config',
|
||||
mock.MagicMock())
|
||||
def _do_test_start(self, test_interval=120,
|
||||
coordination_heartbeat=1.0,
|
||||
coordination_active=False):
|
||||
|
||||
self.CONF.set_override('evaluation_interval',
|
||||
test_interval)
|
||||
self.CONF.set_override('heartbeat',
|
||||
@ -59,18 +71,14 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
group='coordination',
|
||||
enforce_type=True)
|
||||
|
||||
with mock.patch('aodh.coordination.PartitionCoordinator') as m_pc:
|
||||
m_pc.return_value.is_active.return_value = coordination_active
|
||||
self._fake_pc.is_active.return_value = coordination_active
|
||||
|
||||
self.svc.start()
|
||||
self.svc.stop()
|
||||
|
||||
self.svc.partition_coordinator.start.assert_called_once_with()
|
||||
self.svc.partition_coordinator.join_group.assert_called_once_with(
|
||||
self.svc.PARTITIONING_GROUP_NAME)
|
||||
|
||||
actual = self.svc.tg.add_timer.call_args_list
|
||||
self.assertEqual([mock.call(604800, mock.ANY)], actual)
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
svc.terminate()
|
||||
svc.partition_coordinator.start.assert_called_once_with()
|
||||
svc.partition_coordinator.join_group.assert_called_once_with(
|
||||
svc.PARTITIONING_GROUP_NAME)
|
||||
|
||||
def test_start_singleton(self):
|
||||
self._do_test_start(coordination_active=False)
|
||||
@ -82,78 +90,62 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
self._do_test_start(coordination_active=True, test_interval=10,
|
||||
coordination_heartbeat=5)
|
||||
|
||||
@mock.patch('stevedore.extension.ExtensionManager')
|
||||
@mock.patch('aodh.storage.get_connection_from_config')
|
||||
@mock.patch('aodh.coordination.PartitionCoordinator')
|
||||
def test_evaluation_cycle(self, m_pc, m_conn, m_em):
|
||||
def test_evaluation_cycle(self):
|
||||
alarm = mock.Mock(type='threshold', alarm_id="alarm_id1")
|
||||
m_pc.return_value.extract_my_subset.return_value = ["alarm_id1"]
|
||||
m_pc.return_value.is_active.return_value = False
|
||||
m_conn.return_value.get_alarms.return_value = [alarm]
|
||||
m_em.return_value = self.evaluators
|
||||
self._fake_pc.extract_my_subset.return_value = ["alarm_id1"]
|
||||
self._fake_pc.is_active.return_value = False
|
||||
self._fake_conn.get_alarms.return_value = [alarm]
|
||||
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
|
||||
|
||||
self.addCleanup(self.svc.stop)
|
||||
self.svc.start()
|
||||
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
time.sleep(1)
|
||||
|
||||
target = self.svc.partition_coordinator.extract_my_subset
|
||||
target.assert_called_once_with(self.svc.PARTITIONING_GROUP_NAME,
|
||||
target = svc.partition_coordinator.extract_my_subset
|
||||
target.assert_called_once_with(svc.PARTITIONING_GROUP_NAME,
|
||||
["alarm_id1"])
|
||||
self.threshold_eval.evaluate.assert_called_once_with(alarm)
|
||||
|
||||
@mock.patch('stevedore.extension.ExtensionManager')
|
||||
@mock.patch('aodh.coordination.PartitionCoordinator')
|
||||
def test_evaluation_cycle_with_bad_alarm(self, m_pc, m_em):
|
||||
m_pc.return_value.is_active.return_value = False
|
||||
m_em.return_value = self.evaluators
|
||||
def test_evaluation_cycle_with_bad_alarm(self):
|
||||
|
||||
alarms = [
|
||||
mock.Mock(type='threshold', name='bad'),
|
||||
mock.Mock(type='threshold', name='good'),
|
||||
mock.Mock(type='threshold', name='bad', alarm_id='a'),
|
||||
mock.Mock(type='threshold', name='good', alarm_id='b'),
|
||||
]
|
||||
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
|
||||
|
||||
with mock.patch.object(self.svc, '_assigned_alarms',
|
||||
return_value=alarms):
|
||||
self.addCleanup(self.svc.stop)
|
||||
self.svc.start()
|
||||
|
||||
time.sleep(1)
|
||||
self._fake_pc.is_active.return_value = False
|
||||
self._fake_pc.extract_my_subset.return_value = ['a', 'b']
|
||||
self._fake_conn.get_alarms.return_value = alarms
|
||||
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
time.sleep(1)
|
||||
self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])],
|
||||
self.threshold_eval.evaluate.call_args_list)
|
||||
|
||||
@mock.patch('stevedore.extension.ExtensionManager')
|
||||
def test_unknown_extension_skipped(self, m_em):
|
||||
m_em.return_value = self.evaluators
|
||||
def test_unknown_extension_skipped(self):
|
||||
alarms = [
|
||||
mock.Mock(type='not_existing_type'),
|
||||
mock.Mock(type='threshold')
|
||||
mock.Mock(type='not_existing_type', alarm_id='a'),
|
||||
mock.Mock(type='threshold', alarm_id='b')
|
||||
]
|
||||
|
||||
with mock.patch.object(self.svc, '_assigned_alarms',
|
||||
return_value=alarms):
|
||||
self.addCleanup(self.svc.stop)
|
||||
self.svc.start()
|
||||
time.sleep(1)
|
||||
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
|
||||
|
||||
@mock.patch('stevedore.extension.ExtensionManager')
|
||||
@mock.patch('aodh.coordination.PartitionCoordinator')
|
||||
@mock.patch('aodh.storage.get_connection_from_config')
|
||||
def test_check_alarm_query_constraints(self, m_conn, m_pc, m_em):
|
||||
m_conn.return_value.get_alarms.return_value = []
|
||||
m_pc.return_value.extract_my_subset.return_value = []
|
||||
m_pc.return_value.is_active.return_value = False
|
||||
m_em.return_value = self.evaluators
|
||||
|
||||
self.addCleanup(self.svc.stop)
|
||||
self.svc.start()
|
||||
self._fake_pc.is_active.return_value = False
|
||||
self._fake_pc.extract_my_subset.return_value = ['a', 'b']
|
||||
self._fake_conn.get_alarms.return_value = alarms
|
||||
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
time.sleep(1)
|
||||
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
|
||||
|
||||
def test_check_alarm_query_constraints(self):
|
||||
self._fake_conn.get_alarms.return_value = []
|
||||
self._fake_pc.extract_my_subset.return_value = []
|
||||
self._fake_pc.is_active.return_value = False
|
||||
|
||||
svc = evaluator.AlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
time.sleep(1)
|
||||
expected = [({'enabled': True, 'exclude': {'type': 'event'}},)]
|
||||
self.assertEqual(expected,
|
||||
self.svc.storage_conn.get_alarms.call_args_list)
|
||||
svc.storage_conn.get_alarms.call_args_list)
|
||||
|
@ -18,7 +18,6 @@ import time
|
||||
|
||||
from oslo_config import fixture as fixture_config
|
||||
import oslo_messaging
|
||||
from oslo_messaging import server
|
||||
|
||||
from aodh import event
|
||||
from aodh import service
|
||||
@ -29,41 +28,21 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestEventAlarmEvaluationService, self).setUp()
|
||||
|
||||
conf = service.prepare_service(argv=[], config_files=[])
|
||||
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
|
||||
self.CONF.set_override("batch_size", 2, 'listener')
|
||||
self.setup_messaging(self.CONF)
|
||||
self.service = event.EventAlarmEvaluationService(self.CONF)
|
||||
self._msg_notifier = oslo_messaging.Notifier(
|
||||
|
||||
@mock.patch('aodh.storage.get_connection_from_config',
|
||||
mock.MagicMock())
|
||||
@mock.patch('aodh.event.EventAlarmEndpoint.sample')
|
||||
def test_batch_event_listener(self, mocked):
|
||||
msg_notifier = oslo_messaging.Notifier(
|
||||
self.transport, topics=['alarm.all'], driver='messaging',
|
||||
publisher_id='test-publisher')
|
||||
|
||||
@mock.patch('aodh.storage.get_connection_from_config',
|
||||
mock.MagicMock())
|
||||
def test_start_and_stop_service(self):
|
||||
self.addCleanup(self.service.stop)
|
||||
self.service.start()
|
||||
self.assertIsInstance(self.service.listener,
|
||||
server.MessageHandlingServer)
|
||||
|
||||
@mock.patch('aodh.storage.get_connection_from_config',
|
||||
mock.MagicMock())
|
||||
def test_listener_start_called(self):
|
||||
listener = mock.Mock()
|
||||
with mock.patch('aodh.messaging.get_batch_notification_listener',
|
||||
return_value=listener):
|
||||
self.addCleanup(self.service.stop)
|
||||
self.service.start()
|
||||
self.assertTrue(listener.start.called)
|
||||
|
||||
@mock.patch('aodh.event.EventAlarmEndpoint.sample')
|
||||
def test_batch_event_listener(self, mocked):
|
||||
received_events = []
|
||||
mocked.side_effect = lambda msg: received_events.append(msg)
|
||||
self.CONF.set_override("batch_size", 2, 'listener')
|
||||
with mock.patch('aodh.storage.get_connection_from_config'):
|
||||
self.svc = event.EventAlarmEvaluationService(self.CONF)
|
||||
self.svc.start()
|
||||
event1 = {'event_type': 'compute.instance.update',
|
||||
'traits': ['foo', 'bar'],
|
||||
'message_id': '20d03d17-4aba-4900-a179-dba1281a3451',
|
||||
@ -72,9 +51,12 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase):
|
||||
'traits': ['foo', 'bar'],
|
||||
'message_id': '20d03d17-4aba-4900-a179-dba1281a3452',
|
||||
'generated': '2016-04-23T06:50:23.622739'}
|
||||
self._msg_notifier.sample({}, 'event', event1)
|
||||
self._msg_notifier.sample({}, 'event', event2)
|
||||
msg_notifier.sample({}, 'event', event1)
|
||||
msg_notifier.sample({}, 'event', event2)
|
||||
|
||||
svc = event.EventAlarmEvaluationService(0, self.CONF)
|
||||
self.addCleanup(svc.terminate)
|
||||
|
||||
time.sleep(1)
|
||||
self.assertEqual(1, len(received_events))
|
||||
self.assertEqual(2, len(received_events[0]))
|
||||
self.svc.stop()
|
||||
|
@ -51,9 +51,8 @@ class TestAlarmNotifierService(tests_base.BaseTestCase):
|
||||
self.setup_messaging(self.CONF)
|
||||
|
||||
def test_init_host_queue(self):
|
||||
self.service = notifier.AlarmNotifierService(self.CONF)
|
||||
self.service.start()
|
||||
self.service.stop()
|
||||
self.service = notifier.AlarmNotifierService(0, self.CONF)
|
||||
self.service.terminate()
|
||||
|
||||
|
||||
class TestAlarmNotifier(tests_base.BaseTestCase):
|
||||
@ -69,9 +68,8 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'aodh.notifier.zaqar.ZaqarAlarmNotifier.get_zaqar_client',
|
||||
return_value=self.zaqar))
|
||||
self.service = notifier.AlarmNotifierService(self.CONF)
|
||||
self.service.start()
|
||||
self.addCleanup(self.service.stop)
|
||||
self.service = notifier.AlarmNotifierService(0, self.CONF)
|
||||
self.addCleanup(self.service.terminate)
|
||||
|
||||
def test_notify_alarm(self):
|
||||
data = {
|
||||
@ -120,11 +118,11 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
|
||||
'reason': 'Everything is fine',
|
||||
'reason_data': {'fine': 'fine'}
|
||||
}
|
||||
self.service.stop()
|
||||
self.service.terminate()
|
||||
self.CONF.set_override("batch_size", 2, 'notifier')
|
||||
# Init a new service with new configuration
|
||||
self.svc = notifier.AlarmNotifierService(self.CONF)
|
||||
self.svc.start()
|
||||
self.svc = notifier.AlarmNotifierService(0, self.CONF)
|
||||
self.addCleanup(self.svc.terminate)
|
||||
self._msg_notifier.sample({}, 'alarm.update', data1)
|
||||
self._msg_notifier.sample({}, 'alarm.update', data2)
|
||||
time.sleep(1)
|
||||
@ -150,7 +148,6 @@ class TestAlarmNotifier(tests_base.BaseTestCase):
|
||||
notifications[1])
|
||||
self.assertEqual(mock.call('Received %s messages in batch.', 2),
|
||||
logger.call_args_list[0])
|
||||
self.svc.stop()
|
||||
|
||||
@staticmethod
|
||||
def _notification(action):
|
||||
|
@ -8,5 +8,4 @@ namespace = oslo.log
|
||||
namespace = oslo.messaging
|
||||
namespace = oslo.middleware.cors
|
||||
namespace = oslo.policy
|
||||
namespace = oslo.service.service
|
||||
namespace = keystonemiddleware.auth_token
|
||||
|
@ -15,7 +15,6 @@ oslo.config>=2.6.0 # Apache-2.0
|
||||
oslo.i18n>=1.5.0 # Apache-2.0
|
||||
oslo.log>=1.2.0 # Apache-2.0
|
||||
oslo.policy>=0.5.0 # Apache-2.0
|
||||
oslo.service>=0.1.0 # Apache-2.0
|
||||
PasteDeploy>=1.5.0
|
||||
pbr<2.0,>=0.11
|
||||
pecan>=0.8.0
|
||||
@ -33,3 +32,4 @@ tooz>=1.28.0 # Apache-2.0
|
||||
Werkzeug>=0.7 # BSD License
|
||||
WebOb>=1.2.3
|
||||
WSME>=0.8
|
||||
cotyledon
|
||||
|
Loading…
x
Reference in New Issue
Block a user