Zabbix datasource unit tests (additional changes)

Change-Id: I46e3112477a62f9f29dfa946c2146b07d39e782f
This commit is contained in:
Alexey Weyl 2016-06-13 14:28:18 +03:00
parent 5485292397
commit e23ada8f4d
3 changed files with 412 additions and 412 deletions

View File

@ -37,18 +37,18 @@ class MockZabbixDriver(ZabbixDriver):
def __init__(self, conf):
super(MockZabbixDriver, self).__init__(conf)
self.service_datas = None
self.alarm_datas = None
def set_service_datas(self, service_datas):
self.service_datas = service_datas
def set_alarm_datas(self, alarm_datas):
self.alarm_datas = alarm_datas
def _get_alarms(self):
alarms = []
for service_data in self.service_datas:
for alarm_data in self.alarm_datas:
generators = mock_driver.simple_zabbix_alarm_generators(
host_num=1,
events_num=1,
snap_vals=service_data)
snap_vals=alarm_data)
alarms.append(
mock_driver.generate_sequential_events_list(generators)[0])

View File

@ -47,44 +47,44 @@ class ZabbixDriverTest(ZabbixBaseTest):
# Setup
zabbix_driver = MockZabbixDriver(self.conf)
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 1',
ZabbixProps.IS_ALARM_DISABLED: '1',
ZabbixProps.IS_ALARM_ON: '0',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 2',
ZabbixProps.IS_ALARM_DISABLED: '1',
ZabbixProps.IS_ALARM_ON: '1',
ZabbixProps.SEVERITY: '1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 3',
ZabbixProps.IS_ALARM_DISABLED: '0',
ZabbixProps.IS_ALARM_ON: '1',
ZabbixProps.SEVERITY: '1'}
service_data4 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 4',
ZabbixProps.IS_ALARM_DISABLED: '0',
ZabbixProps.IS_ALARM_ON: '0',
ZabbixProps.SEVERITY: '1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 1',
ZabbixProps.IS_ALARM_DISABLED: '1',
ZabbixProps.IS_ALARM_ON: '0',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 2',
ZabbixProps.IS_ALARM_DISABLED: '1',
ZabbixProps.IS_ALARM_ON: '1',
ZabbixProps.SEVERITY: '1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 3',
ZabbixProps.IS_ALARM_DISABLED: '0',
ZabbixProps.IS_ALARM_ON: '1',
ZabbixProps.SEVERITY: '1'}
alarm_data4 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 4',
ZabbixProps.IS_ALARM_DISABLED: '0',
ZabbixProps.IS_ALARM_ON: '0',
ZabbixProps.SEVERITY: '1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3,
service_data4])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3,
alarm_data4])
expected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 3',
ZabbixProps.SEVERITY: '1'}
expected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization 3',
ZabbixProps.SEVERITY: '1'}
# Action
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
# Services with status OK should not be returned
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(expected_service1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(expected_alarm1, alarms)
def test_get_all(self):
"""Check get_all functionality.
@ -98,119 +98,119 @@ class ZabbixDriverTest(ZabbixBaseTest):
zabbix_driver = MockZabbixDriver(self.conf)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
# Services with status OK should not be returned
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(0, len(services))
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(0, len(alarms))
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_service2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_alarm2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(2, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(2, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
excpected_service2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
excpected_alarm2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
# The services of service_data1/2 should be returned although their
# The alarms of alarm_data1/2 should be returned although their
# status is OK, because they were not OK earlier
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(2, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(2, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
# Action
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
# Calling get_services again should not return anything, since all
# services are still OK
self.assertIsNotNone(services, 'services is None')
self.assertEqual(0, len(services))
# Calling get_alarms again should not return anything, since all
# alarms are still OK
self.assertIsNotNone(alarms, 'alarms is None')
self.assertEqual(0, len(alarms))
def test_get_changes(self):
"""Check get_changes functionality.
@ -223,141 +223,141 @@ class ZabbixDriverTest(ZabbixBaseTest):
zabbix_driver = MockZabbixDriver(self.conf)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
# Services with status OK should not be returned
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(0, len(services))
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(0, len(alarms))
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_service2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_alarm2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(2, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(2, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(excpected_service1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
excpected_service2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
excpected_alarm2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(2, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(2, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
# Action
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'services is None')
self.assertEqual(0, len(services))
self.assertIsNotNone(alarms, 'alarms is None')
self.assertEqual(0, len(alarms))
def test_get_changes_and_get_all(self):
"""Check get_changes and get_all functionalities """
@ -366,282 +366,282 @@ class ZabbixDriverTest(ZabbixBaseTest):
zabbix_driver = MockZabbixDriver(self.conf)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
# Action
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
# Calling get_changes for the second time should return nothing
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(0, len(services))
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(0, len(alarms))
# Action
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
# Action
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
# Calling get_all for the second time should return the same results
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_service2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_alarm2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(2, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(2, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
# Action
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
# Calling get_changes after get_all should return nothing
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(0, len(services))
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(0, len(alarms))
# Action
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
# Calling get_all for the second time should return the same results
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(2, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(2, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_service2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_alarm2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(2, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(2, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
expected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
expected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(expected_service1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(expected_alarm1, alarms)
excpected_service1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
excpected_service2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_service3 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
excpected_alarm1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
excpected_alarm2 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '4'}
excpected_alarm3 = {ZabbixProps.RESOURCE_NAME: 'host2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '4'}
# Action
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'services is None')
self.assertEqual(0, len(services))
self.assertIsNotNone(alarms, 'alarms is None')
self.assertEqual(0, len(alarms))
# Action
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
# Calling get_all for the second time should return the same results
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(3, len(services))
self._assert_contains(excpected_service1, services)
self._assert_contains(excpected_service2, services)
self._assert_contains(excpected_service3, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(3, len(alarms))
self._assert_contains(excpected_alarm1, alarms)
self._assert_contains(excpected_alarm2, alarms)
self._assert_contains(excpected_alarm3, alarms)
def test_delete_service(self):
"""Check get_all and get_changes with a deleted service"""
def test_delete_alarm(self):
"""Check get_all and get_changes with a deleted alarm"""
# Setup
zabbix_driver = MockZabbixDriver(self.conf)
# Action
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
service_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
alarm_data3 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'Uptime',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1,
service_data2,
service_data3])
zabbix_driver.set_alarm_datas([alarm_data1,
alarm_data2,
alarm_data3])
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
# Action - delete a service that was OK
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
# Action - delete a alarm that was OK
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1, service_data2])
zabbix_driver.set_alarm_datas([alarm_data1, alarm_data2])
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
# Action - delete a service that was not OK
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
# Action - delete a alarm that was not OK
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data2])
zabbix_driver.set_alarm_datas([alarm_data2])
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
self.assertEqual(EventAction.DELETE_ENTITY,
services[0][DSProps.EVENT_TYPE])
alarms[0][DSProps.EVENT_TYPE])
# Action - get changes, should not return the deleted alarm again
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'services is None')
self.assertEqual(0, len(services))
self.assertIsNotNone(alarms, 'alarms is None')
self.assertEqual(0, len(alarms))
# Action - "undelete" the service that was OK
service_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
# Action - "undelete" the alarm that was OK
alarm_data1 = {ZabbixProps.RESOURCE_NAME: 'compute-1',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '1'}
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data1, service_data2])
zabbix_driver.set_alarm_datas([alarm_data1, alarm_data2])
services = zabbix_driver._get_all_alarms()
alarms = zabbix_driver._get_all_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertFalse(DSProps.EVENT_TYPE in services[0])
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
self.assertFalse(DSProps.EVENT_TYPE in alarms[0])
# Action - delete a service that was not OK and call get_changes
service_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
# Action - delete a alarm that was not OK and call get_changes
alarm_data2 = {ZabbixProps.RESOURCE_NAME: 'compute-2',
ZabbixProps.DESCRIPTION: 'CPU utilization',
ZabbixProps.SEVERITY: '-1'}
zabbix_driver.set_service_datas([service_data2])
zabbix_driver.set_alarm_datas([alarm_data2])
services = zabbix_driver._get_changed_alarms()
alarms = zabbix_driver._get_changed_alarms()
# Test assertions
self.assertIsNotNone(services, 'No services returned')
self.assertEqual(1, len(services))
self._assert_contains(service_data1, services)
self.assertIsNotNone(alarms, 'No alarms returned')
self.assertEqual(1, len(alarms))
self._assert_contains(alarm_data1, alarms)
self.assertEqual(EventAction.DELETE_ENTITY,
services[0][DSProps.EVENT_TYPE])
alarms[0][DSProps.EVENT_TYPE])

View File

@ -18,19 +18,19 @@ from vitrage.tests import base
class ZabbixBaseTest(base.BaseTest):
def _assert_contains(self, expected_serv, services):
for service in services:
if service[ZabbixProps.RESOURCE_NAME] == \
def _assert_contains(self, expected_serv, alarms):
for alarm in alarms:
if alarm[ZabbixProps.RESOURCE_NAME] == \
expected_serv[ZabbixProps.RESOURCE_NAME] and \
service[ZabbixProps.DESCRIPTION] == \
alarm[ZabbixProps.DESCRIPTION] == \
expected_serv[ZabbixProps.DESCRIPTION]:
self._assert_expected_service(expected_serv, service)
self._assert_expected_alarm(expected_serv, alarm)
return
self.fail("service not found: %(resource_name)s %(service_name)s" %
self.fail("alarm not found: %(resource_name)s %(alarm_name)s" %
{'resource_name': expected_serv[ZabbixProps.RESOURCE_NAME],
'service_name': expected_serv[ZabbixProps.SERVICE]})
'alarm_name': expected_serv[ZabbixProps.DESCRIPTION]})
def _assert_expected_service(self, expected_service, service):
for key, value in expected_service.items():
self.assertEqual(value, service[key], 'wrong value for ' + key)
def _assert_expected_alarm(self, expected_alarm, alarm):
for key, value in expected_alarm.items():
self.assertEqual(value, alarm[key], 'wrong value for ' + key)