Enable aodh service to be multi-processes

aodh-notifier is stateless, it is naturally can be multi-processes.
aodh-evaluator can be multi-processes because it by default runs
in workload_partition mode.
aodh-listener shares nothing between workers, an event can only
be consumed by a single worker, and sql provides transaction ability
hence it is safe to use multi-processes as well.

Change-Id: I5a8051909be8626cd9d037992132740d03110892
This commit is contained in:
ZhiQiang Fan 2016-04-28 03:49:57 +08:00
parent 267ecbbabf
commit 2f5cfb6002
9 changed files with 151 additions and 80 deletions

View File

@ -25,14 +25,17 @@ from aodh import service
def notifier():
conf = service.prepare_service()
os_service.launch(conf, notifier_svc.AlarmNotifierService(conf)).wait()
os_service.launch(conf, notifier_svc.AlarmNotifierService(conf),
workers=conf.notifier.workers).wait()
def evaluator():
conf = service.prepare_service()
os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf)).wait()
os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf),
workers=conf.evaluator.workers).wait()
def listener():
conf = service.prepare_service()
os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf)).wait()
os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf),
workers=conf.listener.workers).wait()

View File

@ -187,10 +187,6 @@ class AlarmEvaluationService(os_service.Service):
def __init__(self, conf):
super(AlarmEvaluationService, self).__init__()
self.conf = conf
self.storage_conn = None
self._load_evaluators()
self.partition_coordinator = coordination.PartitionCoordinator(
conf.coordination.backend_url)
@property
def _storage_conn(self):
@ -229,6 +225,12 @@ class AlarmEvaluationService(os_service.Service):
def start(self):
super(AlarmEvaluationService, self).start()
self.storage_conn = None
self._load_evaluators()
self.partition_coordinator = coordination.PartitionCoordinator(
self.conf.coordination.backend_url)
self.partition_coordinator.start()
self.partition_coordinator.join_group(self.PARTITIONING_GROUP_NAME)
@ -268,9 +270,11 @@ class AlarmEvaluationService(os_service.Service):
self.tg.add_timer(604800, lambda: None)
def stop(self):
self.periodic.stop()
self.periodic.wait()
self.partition_coordinator.stop()
if getattr(self, 'periodic', None):
self.periodic.stop()
self.periodic.wait()
if getattr(self, 'partition_coordinator', None):
self.partition_coordinator.stop()
super(AlarmEvaluationService, self).stop()
def _assigned_alarms(self):

View File

@ -44,11 +44,11 @@ class EventAlarmEvaluationService(service.Service):
def __init__(self, conf):
super(EventAlarmEvaluationService, self).__init__()
self.conf = conf
self.storage_conn = storage.get_connection_from_config(self.conf)
self.evaluator = event.EventAlarmEvaluator(self.conf)
def start(self):
super(EventAlarmEvaluationService, self).start()
self.storage_conn = storage.get_connection_from_config(self.conf)
self.evaluator = event.EventAlarmEvaluator(self.conf)
self.listener = messaging.get_notification_listener(
messaging.get_transport(self.conf),
[oslo_messaging.Target(topic=self.conf.event_alarm_topic)],
@ -58,6 +58,7 @@ class EventAlarmEvaluationService(service.Service):
self.tg.add_timer(604800, lambda: None)
def stop(self):
self.listener.stop()
self.listener.wait()
if getattr(self, 'listener', None):
self.listener.stop()
self.listener.wait()
super(EventAlarmEvaluationService, self).stop()

View File

@ -58,26 +58,28 @@ class AlarmNotifierService(os_service.Service):
def __init__(self, conf):
super(AlarmNotifierService, self).__init__()
transport = messaging.get_transport(conf)
self.notifiers = extension.ExtensionManager(
self.NOTIFIER_EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(conf,))
target = oslo_messaging.Target(topic=conf.notifier_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[AlarmEndpoint(self.notifiers)])
self.conf = conf
def start(self):
super(AlarmNotifierService, self).start()
transport = messaging.get_transport(self.conf)
self.notifiers = extension.ExtensionManager(
self.NOTIFIER_EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(self.conf,))
target = oslo_messaging.Target(topic=self.conf.notifier_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[AlarmEndpoint(self.notifiers)])
self.listener.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
def stop(self):
self.listener.stop()
self.listener.wait()
if getattr(self, 'listener', None):
self.listener.stop()
self.listener.wait()
super(AlarmNotifierService, self).stop()

View File

@ -62,6 +62,9 @@ def list_opts():
])),
('coordination', aodh.coordination.OPTS),
('database', aodh.storage.OPTS),
('evaluator', aodh.service.EVALUATOR_OPTS),
('listener', aodh.service.LISTENER_OPTS),
('notifier', aodh.service.NOTIFIER_OPTS),
('service_credentials', aodh.keystone_client.OPTS),
]

View File

@ -44,6 +44,30 @@ OPTS = [
' collection of underlying meters.'),
]
EVALUATOR_OPTS = [
cfg.IntOpt('workers',
default=1,
min=1,
help='Number of workers for evaluator service. '
'default value is 1.')
]
NOTIFIER_OPTS = [
cfg.IntOpt('workers',
default=1,
min=1,
help='Number of workers for notifier service. '
'default value is 1.')
]
LISTENER_OPTS = [
cfg.IntOpt('workers',
default=1,
min=1,
help='Number of workers for evaluator service. '
'default value is 1.')
]
def prepare_service(argv=None, config_files=None):
conf = cfg.ConfigOpts()

View File

@ -14,6 +14,8 @@
# under the License.
"""Tests for aodh.evaluator.AlarmEvaluationService.
"""
import time
import mock
from oslo_config import fixture as fixture_config
from stevedore import extension
@ -28,9 +30,10 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
super(TestAlarmEvaluationService, self).setUp()
conf = service.prepare_service(argv=[], config_files=[])
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
self.CONF.set_override('workers', 1, 'evaluator')
self.setup_messaging(self.CONF)
self.threshold_eval = mock.Mock()
self.threshold_eval = mock.MagicMock()
self.evaluators = extension.ExtensionManager.make_test_instance(
[
extension.Extension(
@ -41,15 +44,11 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
]
)
self.storage_conn = mock.MagicMock()
self.svc = evaluator.AlarmEvaluationService(self.CONF)
self.svc.tg = mock.Mock()
self.svc.partition_coordinator = mock.MagicMock()
p_coord = self.svc.partition_coordinator
p_coord.extract_my_subset.side_effect = lambda _, x: x
self.svc.evaluators = self.evaluators
self.svc.supported_evaluators = ['threshold']
@mock.patch('aodh.storage.get_connection_from_config',
mock.MagicMock())
def _do_test_start(self, test_interval=120,
coordination_heartbeat=1.0,
coordination_active=False):
@ -59,18 +58,19 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
coordination_heartbeat,
group='coordination',
enforce_type=True)
with mock.patch('aodh.storage.get_connection_from_config',
return_value=self.storage_conn):
p_coord_mock = self.svc.partition_coordinator
p_coord_mock.is_active.return_value = coordination_active
with mock.patch('aodh.coordination.PartitionCoordinator') as m_pc:
m_pc.return_value.is_active.return_value = coordination_active
self.addCleanup(self.svc.stop)
self.svc.start()
self.svc.partition_coordinator.start.assert_called_once_with()
self.svc.partition_coordinator.join_group.assert_called_once_with(
self.svc.PARTITIONING_GROUP_NAME)
actual = self.svc.tg.add_timer.call_args_list
self.assertEqual([mock.call(604800, mock.ANY)], actual)
self.svc.partition_coordinator.start.assert_called_once_with()
self.svc.partition_coordinator.join_group.assert_called_once_with(
self.svc.PARTITIONING_GROUP_NAME)
actual = self.svc.tg.add_timer.call_args_list
self.assertEqual([mock.call(604800, mock.ANY)], actual)
def test_start_singleton(self):
self._do_test_start(coordination_active=False)
@ -82,55 +82,80 @@ class TestAlarmEvaluationService(tests_base.BaseTestCase):
self._do_test_start(coordination_active=True, test_interval=10,
coordination_heartbeat=5)
def test_evaluation_cycle(self):
@mock.patch('stevedore.extension.ExtensionManager')
@mock.patch('aodh.storage.get_connection_from_config')
@mock.patch('aodh.coordination.PartitionCoordinator')
def test_evaluation_cycle(self, m_pc, m_conn, m_em):
alarm = mock.Mock(type='threshold')
self.storage_conn.get_alarms.return_value = [alarm]
with mock.patch('aodh.storage.get_connection_from_config',
return_value=self.storage_conn):
p_coord_mock = self.svc.partition_coordinator
p_coord_mock.extract_my_subset.return_value = [alarm]
m_pc.return_value.extract_my_subset.return_value = [alarm]
m_pc.return_value.is_active.return_value = False
m_conn.return_value.get_alarms.return_value = [alarm]
m_em.return_value = self.evaluators
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
self.svc._evaluate_assigned_alarms()
self.addCleanup(self.svc.stop)
self.svc.start()
p_coord_mock.extract_my_subset.assert_called_once_with(
self.svc.PARTITIONING_GROUP_NAME, [alarm])
self.threshold_eval.evaluate.assert_called_once_with(alarm)
time.sleep(1)
target = self.svc.partition_coordinator.extract_my_subset
target.assert_called_once_with(self.svc.PARTITIONING_GROUP_NAME,
[alarm])
self.threshold_eval.evaluate.assert_called_once_with(alarm)
@mock.patch('stevedore.extension.ExtensionManager')
@mock.patch('aodh.coordination.PartitionCoordinator')
def test_evaluation_cycle_with_bad_alarm(self, m_pc, m_em):
m_pc.return_value.is_active.return_value = False
m_em.return_value = self.evaluators
def test_evaluation_cycle_with_bad_alarm(self):
alarms = [
mock.Mock(type='threshold', name='bad'),
mock.Mock(type='threshold', name='good'),
]
self.storage_conn.get_alarms.return_value = alarms
self.threshold_eval.evaluate.side_effect = [Exception('Boom!'), None]
with mock.patch('aodh.storage.get_connection_from_config',
return_value=self.storage_conn):
p_coord_mock = self.svc.partition_coordinator
p_coord_mock.extract_my_subset.return_value = alarms
self.svc._evaluate_assigned_alarms()
self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])],
self.threshold_eval.evaluate.call_args_list)
with mock.patch.object(self.svc, '_assigned_alarms',
return_value=alarms):
self.addCleanup(self.svc.stop)
self.svc.start()
def test_unknown_extension_skipped(self):
time.sleep(1)
self.assertEqual([mock.call(alarms[0]), mock.call(alarms[1])],
self.threshold_eval.evaluate.call_args_list)
@mock.patch('stevedore.extension.ExtensionManager')
def test_unknown_extension_skipped(self, m_em):
m_em.return_value = self.evaluators
alarms = [
mock.Mock(type='not_existing_type'),
mock.Mock(type='threshold')
]
self.storage_conn.get_alarms.return_value = alarms
with mock.patch('aodh.storage.get_connection_from_config',
return_value=self.storage_conn):
with mock.patch.object(self.svc, '_assigned_alarms',
return_value=alarms):
self.addCleanup(self.svc.stop)
self.svc.start()
self.svc._evaluate_assigned_alarms()
time.sleep(1)
self.threshold_eval.evaluate.assert_called_once_with(alarms[1])
def test_check_alarm_query_constraints(self):
self.storage_conn.get_alarms.return_value = []
with mock.patch('aodh.storage.get_connection_from_config',
return_value=self.storage_conn):
self.svc.start()
self.svc._evaluate_assigned_alarms()
expected = [({'enabled': True, 'exclude': {'type': 'event'}},)]
self.assertEqual(expected,
self.storage_conn.get_alarms.call_args_list)
@mock.patch('stevedore.extension.ExtensionManager')
@mock.patch('aodh.coordination.PartitionCoordinator')
@mock.patch('aodh.storage.get_connection_from_config')
def test_check_alarm_query_constraints(self, m_conn, m_pc, m_em):
m_conn.return_value.get_alarms.return_value = []
m_pc.return_value.extract_my_subset.return_value = []
m_pc.return_value.is_active.return_value = False
m_em.return_value = self.evaluators
self.addCleanup(self.svc.start)
self.svc.start()
time.sleep(1)
expected = [({'enabled': True, 'exclude': {'type': 'event'}},)]
self.assertEqual(expected,
self.svc.storage_conn.get_alarms.call_args_list)

View File

@ -30,21 +30,23 @@ class TestEventAlarmEvaluationService(tests_base.BaseTestCase):
conf = service.prepare_service(argv=[], config_files=[])
self.CONF = self.useFixture(fixture_config.Config(conf)).conf
self.storage_conn = mock.MagicMock()
self.setup_messaging(self.CONF)
with mock.patch('aodh.storage.get_connection_from_config',
return_value=self.storage_conn):
self.service = event.EventAlarmEvaluationService(self.CONF)
self.service = event.EventAlarmEvaluationService(self.CONF)
@mock.patch('aodh.storage.get_connection_from_config',
mock.MagicMock())
def test_start_and_stop_service(self):
self.addCleanup(self.service.stop)
self.service.start()
self.assertIsInstance(self.service.listener,
server.MessageHandlingServer)
self.service.stop()
@mock.patch('aodh.storage.get_connection_from_config',
mock.MagicMock())
def test_listener_start_called(self):
listener = mock.Mock()
with mock.patch('aodh.messaging.get_notification_listener',
return_value=listener):
self.addCleanup(self.service.stop)
self.service.start()
self.assertTrue(listener.start.called)

View File

@ -0,0 +1,7 @@
---
features:
- Enable aodh services, including aodh-evalutor, aodh-listener and
aodh-notifier to run in multiple worker mode.
New options are introduced corresponsively as [evaluator]workers,
[listener]workers and [notifier]workers. They all default to 1.