Added test to alarm_crud.py of changing state via API and then having it change back to original state via the incoming metrics.

Added more output explaining what alarm_crud.py is doing

Enhanced check of Alarm history in utils.py

Moved find_notifications to utils.py
This commit is contained in:
Craig Bryant 2014-06-11 09:36:57 -06:00
parent 55e7e7810b
commit 0da507c473
6 changed files with 59 additions and 33 deletions

View File

@ -53,6 +53,7 @@ def main():
# Add Alarm
alarm_id = cli_wrapper.create_alarm(alarm_name, expression,
description=description)
print('Created Alarm with id %s' % alarm_id)
# Ensure it is created in the right state
initial_state = 'UNDETERMINED'
@ -82,11 +83,13 @@ def main():
states.append('ALARM')
# Modify Alarm by adding new expression that will cause it to go OK
print('Modify Alarm expression so it will go to OK')
new_metric_name = 'other_metric'
new_dimension = 'dim=42'
new_expression = '%s and max(%s{%s}) > 100' % (expression,
new_metric_name,
new_dimension)
alarm_json = cli_wrapper.patch_alarm(alarm_id, '--expression',
new_expression)
if alarm_json['expression'] != new_expression:
@ -96,6 +99,7 @@ def main():
# Output metrics that will cause it to go OK
# Wait for it to change to OK
if not output_metrics(alarm_id, 'OK', [[metric_name, base_dimension],
[new_metric_name, new_dimension]]):
return 1
@ -103,10 +107,27 @@ def main():
states.append('OK')
# Modify Alarm by deleting expression that will cause Alarm to go ALARM
print('Delete Alarm sub expression so it will go to ALARM')
cli_wrapper.patch_alarm(alarm_id, '--expression', expression)
# Output metrics that will cause it to go ALARM
# Wait for it to change to ALARM
print('Output extra dimensions to make sure match occurs')
extra_dimension = base_dimension + ',Extra=More'
if not output_metrics(alarm_id, 'ALARM',
[[metric_name, extra_dimension]]):
return 1
states.append('ALARM')
# Modify Alarm by setting alarm state to OK
print('Set Alarm to OK, wait for transition back to ALARM')
cli_wrapper.change_alarm_state(alarm_id, 'OK')
states.append('OK')
# Output metrics that will cause it to go back to ALARM
# Wait for it to change to ALARM
if not output_metrics(alarm_id, 'ALARM',
[[metric_name, base_dimension],
[new_metric_name, new_dimension]]):
@ -116,6 +137,7 @@ def main():
# Query History
# Delete ALARM
print('Delete alarm')
cli_wrapper.run_mon_cli(['alarm-delete', alarm_id], useJson=False)
# Ensure it can't be queried

View File

@ -3,23 +3,6 @@ from __future__ import print_function
"""
Utility methods for notifications
"""
import sys
import json
import subprocess
def find_notifications(alarm_id, user):
args = ['sudo', 'cat', '/var/mail/' + user]
result = []
try:
stdout = subprocess.check_output(args)
except subprocess.CalledProcessError as e:
print(e, file=sys.stderr)
sys.exit(1)
for line in stdout.splitlines():
if alarm_id in line:
result.append(json.loads(line)['state'])
return result
def create(mon_client, name, email):

View File

@ -21,7 +21,7 @@ def cycle_states(mon_client, alarm_id, states):
def check_notification(alarm_id, user, expected_state, existing):
for i in range(0, 20):
notifications = notification.find_notifications(alarm_id, user)
notifications = utils.find_notifications(alarm_id, user)
if len(notifications) > existing:
break
time.sleep(1)

View File

@ -42,7 +42,7 @@ def main():
initial_state = alarm.get_state(mon_client, alarm_id)
state = initial_state
existing_notifications = notification.find_notifications(alarm_id, user)
existing_notifications = utils.find_notifications(alarm_id, user)
notifications_sent = num_cycles * 2
for _ in range(0, notifications_sent):
if state == 'OK':
@ -56,7 +56,7 @@ def main():
((time.time() - start_time), num_cycles * 2))
for i in range(0, 30):
notifications = notification.find_notifications(alarm_id, user)
notifications = utils.find_notifications(alarm_id, user)
notifications_found = len(notifications) - len(existing_notifications)
if notifications_found >= notifications_sent:
break

View File

@ -20,7 +20,6 @@ import subprocess
import time
import cli_wrapper
import utils
from notification import find_notifications
# export OS_AUTH_TOKEN=82510970543135
# export OS_NO_CLIENT_AUTH=1
@ -63,7 +62,7 @@ def check_notifications(alarm_id, state_changes):
' skipping Notifications test',
file=sys.stderr)
return True
notifications = find_notifications(alarm_id, "root")
notifications = utils.find_notifications(alarm_id, "root")
if len(notifications) != len(state_changes):
print('Expected %d notifications but only found %d' %
(len(state_changes), len(notifications)), file=sys.stderr)

View File

@ -2,6 +2,8 @@ from __future__ import print_function
import sys
import time
import os
import json
import subprocess
import cli_wrapper
from monclient import client
@ -21,31 +23,37 @@ def check_alarm_history(alarm_id, states):
time.sleep(4)
result = True
if not check_expected(transitions, len(result_json),
'number of history entries'):
if transitions != len(result_json):
print('Wrong number of history entries, expected %d but was %d' %
(transitions, len(result_json)), file=sys.stderr)
return False
result_json.sort(key=lambda x: x['timestamp'])
# Alarm history is reverse sorted by date
index = transitions - 1
for i in range(0, transitions):
old_state = states[i]
new_state = states[i+1]
alarm_json = result_json[i]
if not check_expected(old_state, alarm_json['old_state'], 'old_state'):
alarm_json = result_json[index]
if not check_expected(old_state, alarm_json['old_state'], 'old_state',
i):
result = False
if not check_expected(new_state, alarm_json['new_state'], 'new_state'):
if not check_expected(new_state, alarm_json['new_state'], 'new_state',
i):
result = False
if not check_expected(alarm_id, alarm_json['alarm_id'], 'alarm_id'):
if not check_expected(alarm_id, alarm_json['alarm_id'], 'alarm_id',
i):
result = False
index = index - 1
if result:
print('Alarm History is OK')
return result
def check_expected(expected, actual, what):
def check_expected(expected, actual, what, index):
if (expected == actual):
return True
print("Incorrect value for alarm history %s expected '%s' but was '%s'" %
(what, str(expected), str(actual)), file=sys.stderr)
print('Wrong %s for alarm history expected %s but was %s transition %d' %
(what, expected, actual, index+1), file=sys.stderr)
return False
@ -89,4 +97,18 @@ def ensure_has_notification_engine():
print('Must be run on a VM with Notification Engine installed',
file=sys.stderr)
return False
return True
return True
def find_notifications(alarm_id, user):
args = ['sudo', 'cat', '/var/mail/' + user]
result = []
try:
stdout = subprocess.check_output(args)
except subprocess.CalledProcessError as e:
print(e, file=sys.stderr)
sys.exit(1)
for line in stdout.splitlines():
if alarm_id in line:
result.append(json.loads(line)['state'])
return result